diff --git a/src/org/jruby/RubyIO.java b/src/org/jruby/RubyIO.java index a737f4a..9f25c08 100644 --- a/src/org/jruby/RubyIO.java +++ b/src/org/jruby/RubyIO.java @@ -49,9 +49,11 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -1036,38 +1038,43 @@ public class RubyIO extends RubyObject { if (channel == null || !(channel instanceof SelectableChannel)) { return false; } - + + SelectableChannel selectable = (SelectableChannel)channel; Selector selector = null; - try { - selector = Selector.open(); + synchronized (selectable.blockingLock()) { + boolean oldBlocking = selectable.isBlocking(); + try { + selector = Selector.open(); - ((SelectableChannel) channel).configureBlocking(false); - int real_ops = ((SelectableChannel) channel).validOps() & SelectionKey.OP_WRITE; - SelectionKey key = ((SelectableChannel) channel).keyFor(selector); + selectable.configureBlocking(false); + int real_ops = selectable.validOps() & SelectionKey.OP_WRITE; + SelectionKey key = selectable.keyFor(selector); - if (key == null) { - ((SelectableChannel) channel).register(selector, real_ops, descriptor); - } else { - key.interestOps(key.interestOps()|real_ops); - } + if (key == null) { + selectable.register(selector, real_ops, descriptor); + } else { + key.interestOps(key.interestOps()|real_ops); + } - while(selector.select() == 0); + while(selector.select() == 0); - for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) { - SelectionKey skey = (SelectionKey) i.next(); - if ((skey.interestOps() & skey.readyOps() & (SelectionKey.OP_WRITE)) != 0) { - if(skey.attachment() == descriptor) { - return true; + for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) { + SelectionKey skey = (SelectionKey) i.next(); + if ((skey.interestOps() & skey.readyOps() & (SelectionKey.OP_WRITE)) != 0) { + if(skey.attachment() == descriptor) { + return true; + } } } - } - return false; - } finally { - if (selector != null) { - try { - selector.close(); - } catch (Exception e) { + return false; + } finally { + if (selector != null) { + try { + selector.close(); + } catch (Exception e) { + } } + selectable.configureBlocking(oldBlocking); } } } @@ -1077,38 +1084,43 @@ public class RubyIO extends RubyObject { if (channel == null || !(channel instanceof SelectableChannel)) { return false; } - + + SelectableChannel selectable = (SelectableChannel)channel; Selector selector = null; - try { - selector = Selector.open(); + synchronized (selectable.blockingLock()) { + boolean oldBlocking = selectable.isBlocking(); + try { + selector = Selector.open(); - ((SelectableChannel) channel).configureBlocking(false); - int real_ops = ((SelectableChannel) channel).validOps() & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT); - SelectionKey key = ((SelectableChannel) channel).keyFor(selector); + selectable.configureBlocking(false); + int real_ops = selectable.validOps() & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT); + SelectionKey key = selectable.keyFor(selector); - if (key == null) { - ((SelectableChannel) channel).register(selector, real_ops, descriptor); - } else { - key.interestOps(key.interestOps()|real_ops); - } + if (key == null) { + selectable.register(selector, real_ops, descriptor); + } else { + key.interestOps(key.interestOps()|real_ops); + } - while(selector.select() == 0); + while(selector.select() == 0); - for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) { - SelectionKey skey = (SelectionKey) i.next(); - if ((skey.interestOps() & skey.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) { - if(skey.attachment() == descriptor) { - return true; + for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) { + SelectionKey skey = (SelectionKey) i.next(); + if ((skey.interestOps() & skey.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0) { + if(skey.attachment() == descriptor) { + return true; + } } } - } - return false; - } finally { - if (selector != null) { - try { - selector.close(); - } catch (Exception e) { + return false; + } finally { + if (selector != null) { + try { + selector.close(); + } catch (Exception e) { + } } + selectable.configureBlocking(oldBlocking); } } } @@ -2223,7 +2235,7 @@ public class RubyIO extends RubyObject { } RubyString str = null; - + return readNotAll(context, myOpenFile, length, str); } @@ -2683,6 +2695,8 @@ public class RubyIO extends RubyObject { Set pending = new HashSet(); Set unselectable_reads = new HashSet(); Set unselectable_writes = new HashSet(); + Map blocking = new HashMap(); + selector = Selector.open(); if (!args[0].isNil()) { // read @@ -2690,6 +2704,10 @@ public class RubyIO extends RubyObject { for (Iterator i = ((RubyArray)args[0]).getList().iterator(); i.hasNext();) { IRubyObject obj = (IRubyObject)i.next(); RubyIO ioObj = convertToIO(context, obj); + + // save blocking state + if (ioObj.getChannel() instanceof SelectableChannel) blocking.put(ioObj, ((SelectableChannel)ioObj.getChannel()).isBlocking()); + if (registerSelect(context, selector, obj, ioObj, SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) { if (ioObj.writeDataBuffered()) { pending.add(obj); @@ -2708,6 +2726,10 @@ public class RubyIO extends RubyObject { for (Iterator i = ((RubyArray)args[1]).getList().iterator(); i.hasNext();) { IRubyObject obj = (IRubyObject)i.next(); RubyIO ioObj = convertToIO(context, obj); + + // save blocking state + if (!blocking.containsKey(ioObj) && ioObj.getChannel() instanceof SelectableChannel) blocking.put(ioObj, ((SelectableChannel)ioObj.getChannel()).isBlocking()); + if (!registerSelect(context, selector, obj, ioObj, SelectionKey.OP_WRITE)) { if ((ioObj.openFile.getMode() & OpenFile.WRITABLE) != 0) { unselectable_writes.add(obj); @@ -2777,16 +2799,11 @@ public class RubyIO extends RubyObject { w.addAll(unselectable_writes); // make all sockets blocking as configured again - Set keys = selector.keys(); // get keys before close selector.close(); // close unregisters all channels, so we can safely reset blocking modes - for (SelectionKey key : keys) { - SelectableChannel channel = key.channel(); + for (Map.Entry blockingEntry : blocking.entrySet()) { + SelectableChannel channel = (SelectableChannel)((RubyIO)blockingEntry.getKey()).getChannel(); synchronized (channel.blockingLock()) { - RubyIO originalIO = (RubyIO)TypeConverter.convertToType( - (IRubyObject)key.attachment(), runtime.getIO(), "to_io"); - boolean blocking = originalIO.getBlocking(); - key.cancel(); - channel.configureBlocking(blocking); + channel.configureBlocking((Boolean)blockingEntry.getValue()); } } diff --git a/src/org/jruby/ext/NetProtocolBufferedIO.java b/src/org/jruby/ext/NetProtocolBufferedIO.java index e041619..05d6423 100644 --- a/src/org/jruby/ext/NetProtocolBufferedIO.java +++ b/src/org/jruby/ext/NetProtocolBufferedIO.java @@ -100,27 +100,32 @@ public class NetProtocolBufferedIO { NativeImpl nim = (NativeImpl)recv.dataGetStruct(); Selector selector = null; - try { - selector = Selector.open(); - nim.channel.configureBlocking(false); - SelectionKey key = nim.channel.register(selector, SelectionKey.OP_READ); - int n = selector.select(timeout); - - if(n > 0) { - IRubyObject readItems = io.read(new IRubyObject[]{recv.getRuntime().newFixnum(1024*16)}); - return buf.concat(readItems); - } else { - RubyClass exc = (RubyClass)(recv.getRuntime().getModule("Timeout").getConstant("Error")); - throw new RaiseException(RubyException.newException(recv.getRuntime(), exc, "execution expired"),false); - } - } catch(IOException exception) { - throw recv.getRuntime().newIOErrorFromException(exception); - } finally { - if (selector != null) { - try { - selector.close(); - } catch (Exception e) { + synchronized (nim.channel.blockingLock()) { + boolean oldBlocking = nim.channel.isBlocking(); + + try { + selector = Selector.open(); + nim.channel.configureBlocking(false); + SelectionKey key = nim.channel.register(selector, SelectionKey.OP_READ); + int n = selector.select(timeout); + + if(n > 0) { + IRubyObject readItems = io.read(new IRubyObject[]{recv.getRuntime().newFixnum(1024*16)}); + return buf.concat(readItems); + } else { + RubyClass exc = (RubyClass)(recv.getRuntime().getModule("Timeout").getConstant("Error")); + throw new RaiseException(RubyException.newException(recv.getRuntime(), exc, "execution expired"),false); + } + } catch(IOException exception) { + throw recv.getRuntime().newIOErrorFromException(exception); + } finally { + if (selector != null) { + try { + selector.close(); + } catch (Exception e) { + } } + try {nim.channel.configureBlocking(oldBlocking);} catch (IOException ioe) {} } } } diff --git a/src/org/jruby/ext/socket/RubyTCPServer.java b/src/org/jruby/ext/socket/RubyTCPServer.java index 8d08196..597b2c5 100644 --- a/src/org/jruby/ext/socket/RubyTCPServer.java +++ b/src/org/jruby/ext/socket/RubyTCPServer.java @@ -179,30 +179,35 @@ public class RubyTCPServer extends RubyTCPSocket { public IRubyObject accept_nonblock(ThreadContext context) { RubyTCPSocket socket = new RubyTCPSocket(context.getRuntime(), context.getRuntime().fastGetClass("TCPSocket")); Selector selector = null; - try { - ssc.configureBlocking(false); - selector = Selector.open(); - SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT); - - int selected = selector.selectNow(); - if (selected == 0) { - // no connection immediately accepted, let them try again - throw context.getRuntime().newErrnoEAGAINError("Resource temporarily unavailable"); - } else { + synchronized (ssc.blockingLock()) { + boolean oldBlocking = ssc.isBlocking(); + + try { + ssc.configureBlocking(false); + selector = Selector.open(); + SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT); + + int selected = selector.selectNow(); + if (selected == 0) { + // no connection immediately accepted, let them try again + throw context.getRuntime().newErrnoEAGAINError("Resource temporarily unavailable"); + } else { + try { + // otherwise one key has been selected (ours) so we get the channel and hand it off + socket.initSocket(context.getRuntime(), new ChannelDescriptor(ssc.accept(), RubyIO.getNewFileno(), new ModeFlags(ModeFlags.RDWR), new FileDescriptor())); + } catch (InvalidValueException ex) { + throw context.getRuntime().newErrnoEINVALError(); + } + return socket; + } + } catch(IOException e) { + throw sockerr(context.getRuntime(), "problem when accepting"); + } finally { try { - // otherwise one key has been selected (ours) so we get the channel and hand it off - socket.initSocket(context.getRuntime(), new ChannelDescriptor(ssc.accept(), RubyIO.getNewFileno(), new ModeFlags(ModeFlags.RDWR), new FileDescriptor())); - } catch (InvalidValueException ex) { - throw context.getRuntime().newErrnoEINVALError(); + if (selector != null) selector.close(); + } catch (Exception e) { } - return socket; - } - } catch(IOException e) { - throw sockerr(context.getRuntime(), "problem when accepting"); - } finally { - try { - if (selector != null) selector.close(); - } catch (Exception e) { + try {ssc.configureBlocking(oldBlocking);} catch (IOException ioe) {} } } } diff --git a/src/org/jruby/util/io/ChannelStream.java b/src/org/jruby/util/io/ChannelStream.java index 6b385de..812dab3 100644 --- a/src/org/jruby/util/io/ChannelStream.java +++ b/src/org/jruby/util/io/ChannelStream.java @@ -862,23 +862,23 @@ public class ChannelStream implements Stream, Finalizable { int ready_stat = 0; java.nio.channels.Selector sel = java.nio.channels.Selector.open(); SelectableChannel selchan = (SelectableChannel)descriptor.getChannel(); - boolean is_block = selchan.isBlocking(); - try { - synchronized (selchan.blockingLock()) { + synchronized (selchan.blockingLock()) { + boolean is_block = selchan.isBlocking(); + try { selchan.configureBlocking(false); selchan.register(sel, java.nio.channels.SelectionKey.OP_READ); ready_stat = sel.selectNow(); sel.close(); - selchan.configureBlocking(is_block); - } - } catch (Throwable ex) { - ex.printStackTrace(); - } finally { - if (sel != null) { - try { - sel.close(); - } catch (Exception e) { + } catch (Throwable ex) { + ex.printStackTrace(); + } finally { + if (sel != null) { + try { + sel.close(); + } catch (Exception e) { + } } + selchan.configureBlocking(is_block); } } return ready_stat;