Index: com/sun/grizzly/ComplexSelectorHandler.java =================================================================== --- com/sun/grizzly/ComplexSelectorHandler.java (revision 3397) +++ com/sun/grizzly/ComplexSelectorHandler.java (working copy) @@ -39,6 +39,8 @@ import com.sun.grizzly.Controller.Protocol; +import java.nio.channels.SelectionKey; + /** * A ComplexSelectorHandler handles all java.nio.channels.Selector operations * similar way {@link SelectorHandler} does. @@ -54,5 +56,14 @@ * @param protocol Network protocol name * @return true if protocol is supported, false otherwise */ - public boolean supportsProtocol(Protocol protocol); + public boolean supportsProtocol(Protocol protocol); + + /** + * Checks if the selected key with protocol is supported by RoundRobinSelectorHandler + * + * @param protocol Network protocol name + * @param key {@link SelectionKey}. the key can be null. + * @return true if protocol is supported, false otherwise + */ + public boolean supportsProtocol(Protocol protocol, SelectionKey key); } Index: com/sun/grizzly/SelectorHandlerRunner.java =================================================================== --- com/sun/grizzly/SelectorHandlerRunner.java (revision 3397) +++ com/sun/grizzly/SelectorHandlerRunner.java (working copy) @@ -311,14 +311,22 @@ // might be processed just after or during the next // Selector.select() invocation. if ((readyOps & SelectionKey.OP_READ) != 0) { - if (isLogLevelFine) { - dolog("OP_READ on ", key); + if (controller.getReadThreadsCount() > 0 && + controller.multiReadThreadSelectorHandler.supportsProtocol(selectorHandler.protocol(), key)) { + if (isLogLevelFine) { + dolog("OP_READ passed to multi readthread handler on ", key); + } + delegateToWorker = controller.multiReadThreadSelectorHandler.onReadInterest(key, serverCtx); + } else { + if (isLogLevelFine) { + dolog("OP_READ on ", key); + } + delegateToWorker = selectorHandler.onReadInterest(key, serverCtx); } - delegateToWorker = selectorHandler.onReadInterest(key, serverCtx); - if (delegateToWorker) { + if( delegateToWorker ) { opType = OpType.OP_READ; } - if (!controller.isHandleReadWriteConcurrently()) { + if( !controller.isHandleReadWriteConcurrently() ) { skipOpWrite = true; } } Index: com/sun/grizzly/RoundRobinSelectorHandler.java =================================================================== --- com/sun/grizzly/RoundRobinSelectorHandler.java (revision 3397) +++ com/sun/grizzly/RoundRobinSelectorHandler.java (working copy) @@ -40,9 +40,11 @@ import com.sun.grizzly.Controller.Protocol; import com.sun.grizzly.util.Copyable; +import com.sun.grizzly.util.SelectionKeyAttachment; import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.Set; /** @@ -61,8 +63,8 @@ implements ComplexSelectorHandler { private ReadController[] rrControllers; private int roundRobinCounter; - private Set customProtocols; - + private Set customProtocols = new CopyOnWriteArraySet(); + public RoundRobinSelectorHandler() {} public RoundRobinSelectorHandler(ReadController[] rrControllers) { @@ -82,48 +84,87 @@ ReadController auxController = nextController(); SelectorHandler protocolSelectorHandler = context.getSelectorHandler(); SelectableChannel channel = protocolSelectorHandler.acceptWithoutRegistration(key); - if (channel != null) { protocolSelectorHandler.configureChannel(channel); - SelectorHandler relativeSelectorHandler = - auxController.getSelectorHandlerClone(protocolSelectorHandler); - - if (relativeSelectorHandler == null) { - // Clone was not found - take correspondent protocol SelectorHandler - relativeSelectorHandler = - auxController.getSelectorHandler(protocolSelectorHandler.protocol()); - - if (relativeSelectorHandler == null) { - throw new IOException("Can not get correct SelectorHandler"); - } - } - - auxController.addChannel(channel, relativeSelectorHandler); + addChannel(auxController, protocolSelectorHandler, channel); } return false; } - + + @Override + public boolean onReadInterest( final SelectionKey key, final Context ctx ) throws IOException { + // disable OP_READ on key before doing anything else + key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) ); + ReadController auxController = nextController(); + SelectorHandler protocolSelectorHandler = ctx.getSelectorHandler(); + SelectableChannel channel = key.channel(); + if( channel != null ) + addChannel( auxController, protocolSelectorHandler, channel ); + return false; + } + /** + * Add a {@link SelectableChannel} to be processed by {@link ReadController}'s {@link SelectorHandler} + * + * @param auxController next aux. ReadController to process an accepted connection + * @param selectorHandler the current {@link SelectorHandler} + * @param channel the current {@link SelectableChannel} + * @throws IOException if a relative {@link SelectorHandler} could not be found + */ + private void addChannel( ReadController auxController, SelectorHandler selectorHandler, SelectableChannel channel ) throws IOException { + if( auxController == null || selectorHandler == null || channel == null ) + return; + SelectorHandler relativeSelectorHandler = auxController.getSelectorHandlerClone( selectorHandler ); + if( relativeSelectorHandler == null ) { + // Clone was not found - take correspondent protocol SelectorHandler + relativeSelectorHandler = auxController.getSelectorHandler( selectorHandler.protocol() ); + if( relativeSelectorHandler == null ) + throw new IOException( "Can not get correct SelectorHandler" ); + } + auxController.addChannel( channel, relativeSelectorHandler ); + } + + /** * Add custom protocol support * @param customProtocol custom {@link Controller.Protocol} */ public void addProtocolSupport(Protocol customProtocol) { customProtocols.add(customProtocol); } - + /** * {@inheritDoc} */ public boolean supportsProtocol(Protocol protocol) { - return protocol == Protocol.TCP || protocol == Protocol.TLS || - customProtocols.contains(protocol); + return supportsProtocol( protocol, null ); } - + /** + * {@inheritDoc} + */ + public boolean supportsProtocol( Protocol protocol, SelectionKey key ) { + if( key != null && key.isReadable() ) { + return protocol == Protocol.UDP && + // only server-side supports round robin + !( SelectionKeyAttachment.getAttachment( key ) instanceof CallbackHandler ); + } else { + return protocol == Protocol.TCP || + protocol == Protocol.TLS || + customProtocols.contains( protocol ); + } + } + + /** * Return next aux. ReadController to process an accepted connection * @return{@link ReadController} */ private ReadController nextController() { return rrControllers[((roundRobinCounter++) & 0x7fffffff) % rrControllers.length]; } + + @Override + public void shutdown() { + super.shutdown(); + customProtocols.clear(); + } }