Salut,
patch looks good but next time wait for the review and then commit :-)
Great work! See minor comment inline
-- Jeanfrancois
gustav trede wrote:
> Hello,
>
> Please feedback on patch for moving socket close and attachment
> triggered IO from selector thread to workers at postSelect.
> The SelectionKey.cancel() is of course still done in the selecting thread.
> Currently no Runnable wrappers are needed per task due to the affected
> attachment objects was made Runnable.
>
> --
> regards
> gustav trede
>
>
>
> Property changes on: .
> ___________________________________________________________________
> Added: svn:ignore
> + target
> ..merge.diff
>
>
>
> Property changes on: contribs/bundles
> ___________________________________________________________________
> Added: svn:ignore
> + target
>
>
>
> Property changes on: contribs/bundles/grizzly-httpservice-bundle
> ___________________________________________________________________
> Added: svn:ignore
> + target
>
>
> Index: contribs/grizzly-config/src/main/java/com/sun/grizzly/config/GrizzlyConfig.java
> ===================================================================
> --- contribs/grizzly-config/src/main/java/com/sun/grizzly/config/GrizzlyConfig.java (revision 2838)
> +++ contribs/grizzly-config/src/main/java/com/sun/grizzly/config/GrizzlyConfig.java (working copy)
> @@ -40,6 +40,7 @@
> import com.sun.grizzly.config.dom.NetworkConfig;
> import com.sun.grizzly.config.dom.NetworkListener;
> import com.sun.grizzly.util.LoggerUtils;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import org.jvnet.hk2.component.Habitat;
>
> import java.io.IOException;
> @@ -79,7 +80,7 @@
> grizzlyListener.configure(listener, true, habitat);
>
> listeners.add(grizzlyListener);
> - final Thread thread = new Thread(new ListenerRunnable(grizzlyListener));
> + final Thread thread = new WorkerThreadImpl(new ListenerRunnable(grizzlyListener));
> thread.setDaemon(true);
> thread.start();
> }
> Index: modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
> ===================================================================
> --- modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java (revision 2838)
> +++ modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java (working copy)
> @@ -75,6 +75,7 @@
>
> import com.sun.grizzly.util.LinkedTransferQueue;
> import com.sun.grizzly.util.LoggerUtils;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import com.sun.grizzly.util.res.StringManager;
> import java.io.File;
> import javax.management.ObjectName;
> @@ -996,7 +997,7 @@
> * Declare this method to save backwards compatibility
> */
> public void start() {
> - new Thread(this, "SelectorThread-" + port).start();
> + new WorkerThreadImpl("SelectorThread-" + port,this).start();
> }
>
> /**
> Index: modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java
> ===================================================================
> --- modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (revision 2838)
> +++ modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (working copy)
> @@ -83,18 +83,12 @@
>
> @Override
> public void doRegisterKey(SelectionKey key, int ops, long currentTime) {
> - Object attachment = key.attachment();
> - if (attachment instanceof KeepAliveThreadAttachment) {
> - if (!key.isValid()) {
> - selectorThread.cancelKey(key);
> - return;
> - }
> - KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment;
> - k.setTimeout(currentTime);
> - } else {
> - addExpirationStamp(key);
> + if (!key.isValid()){
> + selectorHandler.addPendingKeyCancel(key);
> + }else{
> + key.interestOps(key.interestOps() | ops);
> + addExpirationStamp(key,currentTime);
> }
> - key.interestOps(key.interestOps() | ops);
> }
>
> /**
> @@ -123,31 +117,14 @@
> if (idleLimit != -1 && currentTime - expire >= idleLimit &&
> (!(attachment instanceof SelectionKeyAttachment) ||
> ((SelectionKeyAttachment)attachment).timedOut(key))){
> -
> - cancel(key);
> - }
> + selectorHandler.addPendingKeyCancel(key);
> + }
> }
> }
> }
> }
>
> /**
> - * Gets expiration timeout stamp from the {_at_link SelectionKey}
> - * depending on its attachment
> - *
> - * @param {_at_link SelectionKey}
> - */
> - protected long getExpirationStamp(Object attachment) {
> - if (attachment instanceof Long) {
> - return (Long) attachment;
> - }
> - if (attachment instanceof SelectionKeyAttachment) {
> - return ((SelectionKeyAttachment) attachment).getTimeout();
> - }
> - return SelectionKeyAttachment.UNLIMITED_TIMEOUT;
> - }
> -
> - /**
> * returns idle limit
> * @param attachment
> * @return
> Index: modules/rcm/src/test/java/com/sun/grizzly/rcm/RCMTest.java
> ===================================================================
> --- modules/rcm/src/test/java/com/sun/grizzly/rcm/RCMTest.java (revision 2838)
> +++ modules/rcm/src/test/java/com/sun/grizzly/rcm/RCMTest.java (working copy)
> @@ -49,6 +49,7 @@
> import com.sun.grizzly.util.ByteBufferInputStream;
> import com.sun.grizzly.util.OutputWriter;
> import com.sun.grizzly.util.WorkerThread;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.BufferedReader;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> @@ -200,7 +201,7 @@
> });
>
> controller.addStateListener(this);
> - new Thread(controller).start();
> + new WorkerThreadImpl(controller).start();
>
> try {
> startLatch.await();
> Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java
> ===================================================================
> --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (revision 2838)
> +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (working copy)
> @@ -38,6 +38,7 @@
>
> package com.sun.enterprise.web.connector.grizzly.comet;
>
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.nio.channels.SelectionKey;
> import java.util.Iterator;
> @@ -205,9 +206,8 @@
> }
> task.callInterrupt = true;
> task.interruptFlushAPT = flushAPT;
> - //((WorkerThreadImpl)Thread.currentThread()).
> - // getPendingIOhandler().addPendingIO(task);
> - task.run();
> + ((WorkerThreadImpl)Thread.currentThread()).
> + getPendingIOhandler().addPendingIO(task);
>
> }else{
> interrupt0(task, notifyInterrupt, flushAPT, cancelkey);
> Index: modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java
> ===================================================================
> --- modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (revision 2838)
> +++ modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (working copy)
> @@ -943,9 +943,8 @@
> @Override
> public boolean timedOut(SelectionKey Key) {
> Key.attach(null);
> - run();
> - //((WorkerThreadImpl)Thread.currentThread()).
> - // getPendingIOhandler().addPendingIO(this);
> + ((WorkerThreadImpl)Thread.currentThread()).
> + getPendingIOhandler().addPendingIO(this);
> return false;
> }
>
> Index: modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java
> ===================================================================
> --- modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (revision 2838)
> +++ modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (working copy)
> @@ -1,9 +1,9 @@
> /*
> - *
> + *
> * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
> - *
> + *
> * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
> - *
> + *
> * The contents of this file are subject to the terms of either the GNU
> * General Public License Version 2 only ("GPL") or the Common Development
> * and Distribution License("CDDL") (collectively, the "License"). You
> @@ -11,7 +11,7 @@
> * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
> * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
> * language governing permissions and limitations under the License.
> - *
> + *
> * When distributing the software, include this License Header Notice in each
> * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
> * Sun designates this particular file as subject to the "Classpath" exception
> @@ -20,9 +20,9 @@
> * Header, with the fields enclosed by brackets [] replaced by your own
> * identifying information: "Portions Copyrighted [year]
> * [name of copyright owner]"
> - *
> + *
> * Contributor(s):
> - *
> + *
> * If you wish your version of this file to be governed by only the CDDL or
> * only the GPL Version 2, indicate your decision by adding "[Contributor]
> * elects to include this software in this distribution under the [CDDL or GPL
> @@ -51,7 +51,7 @@
> * @author Jeanfrancois Arcand
> * @author Alexey Stashok
> */
> -public class ThreadAttachment extends SelectionKeyActionAttachment
> +public class ThreadAttachment extends SelectionKeyActionAttachment
> implements AttributeHolder {
>
> /**
> @@ -69,18 +69,18 @@
> public static int SSL_ARTIFACTS = 28;
> public static int STORE_ALL = 31;
> };
> -
> +
> private ReentrantLock threadLock = new ReentrantLock();
> -
> +
> private String threadName;
> -
> -
> - private Map<String, Object> attributes;
> -
> -
> +
> +
> + private Map<String, Object> attributes;
> +
> +
> private ByteBuffer byteBuffer;
> -
> -
> +
> +
> /**
> * The encrypted ByteBuffer used for handshaking and reading request bytes.
> */
> @@ -97,19 +97,19 @@
> * The{_at_link SSLEngine} used to manage the SSL over NIO request.
> */
> private SSLEngine sslEngine;
> -
> +
> /**
> * ThreadAttachment store mode
> */
> private int mode;
>
> -
> +
> /**
> * The current {_at_link ThreadFactory} used to execute this instance.
> */
> private Thread activeThread = null;
> -
> -
> +
> +
> public ThreadAttachment(){
> attributes = new HashMap<String,Object>();
> }
> @@ -122,21 +122,21 @@
> this.mode = mode;
> }
>
> -
> +
> public void setAttribute(String key, Object value){
> attributes.put(key,value);
> }
>
> -
> +
> public Object getAttribute(String key){
> return attributes.get(key);
> }
> -
> -
> +
> +
> public Object removeAttribute(String key){
> return attributes.remove(key);
> }
> -
> +
> public void setAttributes(Map<String, Object> attributes) {
> this.attributes = attributes;
> }
> @@ -144,23 +144,23 @@
> public Map<String, Object> getAttributes() {
> return attributes;
> }
> -
> +
> /**
> * Set the {_at_link ByteBuffer} shared this thread
> */
> public void setByteBuffer(ByteBuffer byteBuffer){
> this.byteBuffer = byteBuffer;
> }
> -
> -
> +
> +
> /**
> * Return the {_at_link ByteBuffer} shared this thread
> */
> public ByteBuffer getByteBuffer(){
> return byteBuffer;
> }
> -
> -
> +
> +
> /**
> * Return the encrypted {_at_link ByteBuffer} used to handle request.
> * @return {_at_link ByteBuffer}
> @@ -168,35 +168,35 @@
> public ByteBuffer getInputBB(){
> return inputBB;
> }
> -
> -
> +
> +
> /**
> * Set the encrypted {_at_link ByteBuffer} used to handle request.
> * @param inputBB {_at_link ByteBuffer}
> - */
> + */
> public void setInputBB(ByteBuffer inputBB){
> this.inputBB = inputBB;
> }
> -
> -
> +
> +
> /**
> * Return the encrypted {_at_link ByteBuffer} used to handle response.
> * @return {_at_link ByteBuffer}
> - */
> + */
> public ByteBuffer getOutputBB(){
> return outputBB;
> }
> -
> -
> +
> +
> /**
> * Set the encrypted {_at_link ByteBuffer} used to handle response.
> * @param outputBB {_at_link ByteBuffer}
> - */
> + */
> public void setOutputBB(ByteBuffer outputBB){
> this.outputBB = outputBB;
> }
> -
> -
> +
> +
> /**
> * Set the{_at_link SSLEngine}.
> * @return{_at_link SSLEngine}
> @@ -205,16 +205,16 @@
> return sslEngine;
> }
>
> -
> +
> /**
> * Get the{_at_link SSLEngine}.
> * @param sslEngine{_at_link SSLEngine}
> */
> public void setSSLEngine(SSLEngine sslEngine) {
> this.sslEngine = sslEngine;
> - }
> + }
>
> -
> +
> /**
> * Return the name of the Thread on which this instance is binded.
> */
> @@ -222,7 +222,7 @@
> return threadName;
> }
>
> -
> +
> /**
> * Set the Thread's name on which this instance is binded.
> */
> @@ -235,7 +235,7 @@
> */
> public void associate() {
> if (!threadLock.isHeldByCurrentThread()) {
> - threadLock.lock();
> + threadLock.lock();
> }
> }
>
> @@ -270,15 +270,15 @@
> byteBuffer = null;
> sslEngine = null;
> inputBB = null;
> - outputBB = null;
> + outputBB = null;
> activeThreadTimeout = Long.MIN_VALUE;
> }
> -
> +
> @Override
> public void release(SelectionKey selectionKey) {
> attributes.clear();
> reset();
> -
> +
> deassociate();
> super.release(selectionKey);
> }
> @@ -297,7 +297,7 @@
> sb.append(']');
> return sb.toString();
> }
> -
> +
> /**
> * Return the current {_at_link Thread} which is executing this object.
> * @return the current {_at_link Thread} which is executing this object.
> @@ -305,7 +305,7 @@
> public Thread activeThread(){
> return activeThread;
> }
> -
> +
> /**
> * Set the time, in milliseconds, this object can be attached to a {_at_link Thread}
> * @param the time, in milliseconds, this object can be attached to a {_at_link Thread}
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (revision 2838)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (working copy)
> @@ -46,7 +46,6 @@
> import java.util.Iterator;
> import java.util.Set;
> import java.util.concurrent.ConcurrentHashMap;
> -import java.util.logging.Level;
> import java.util.logging.Logger;
>
> /**
> Index: modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java (working copy)
> @@ -49,6 +49,7 @@
> import com.sun.grizzly.util.DefaultThreadPool;
> import com.sun.grizzly.util.InputReader;
> import com.sun.grizzly.util.OutputWriter;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.net.Socket;
> import java.net.SocketAddress;
> @@ -351,7 +352,7 @@
>
> callbackHandler = new DefaultCallbackHandler(this,false);
>
> - new Thread(controller, "GrizzlyTCPConnectorHandler-Controller").start();
> + new WorkerThreadImpl("GrizzlyTCPConnectorHandler-Controller",controller).start();
>
> try {
> latch.await();
> Index: modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandlerRunner.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandlerRunner.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandlerRunner.java (working copy)
> @@ -38,9 +38,12 @@
>
> package com.sun.grizzly;
>
> +import com.sun.grizzly.tcp.PendingIOhandler;
> import com.sun.grizzly.util.State;
> import com.sun.grizzly.util.StateHolder;
> import com.sun.grizzly.util.StateHolder.ConditionListener;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> +import java.nio.channels.SelectionKey;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.TimeUnit;
>
> @@ -65,6 +68,9 @@
> }
>
> public void run() {
> +
> + ((WorkerThreadImpl)Thread.currentThread()).setPendingIOhandler(selectorHandler);
> +
> StateHolder<State> controllerStateHolder = controller.getStateHolder();
> StateHolder<State> selectorHandlerStateHolder = selectorHandler.getStateHolder();
>
> Index: modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java (working copy)
> @@ -114,20 +114,7 @@
> stoppedSelectorHandlerCounter = new AtomicInteger(selectorHandlerCount);
>
> for (SelectorHandler selectorHandler : selectorHandlers) {
> - Runnable selectorRunner = new SelectorHandlerRunner(this, selectorHandler);
> - // check if there is java.nio.Selector already open,
> - // if so, just notify the controller onReady() listeners
> - if (selectorHandler.getSelector() != null) {
> - notifyReady();
> - }
> -
> - if (selectorHandlerCount > 1) {
> - // if there are more than 1 selector handler - run it in separate thread
> - new Thread(selectorRunner, "GrizzlySelectorRunner-read-" + selectorHandler.protocol()).start();
> - } else {
> - // else run it in current thread
> - selectorRunner.run();
> - }
> + startSelectorHandlerRunner(selectorHandler, selectorHandlerCount>0);
> }
>
> waitUntilSeletorHandlersStop();
> Index: modules/grizzly/src/main/java/com/sun/grizzly/suspendable/SuspendableMonitor.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/suspendable/SuspendableMonitor.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/suspendable/SuspendableMonitor.java (working copy)
> @@ -49,6 +49,7 @@
>
> import com.sun.grizzly.suspendable.SuspendableFilter.KeyHandler;
> import com.sun.grizzly.util.LinkedTransferQueue;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.nio.channels.ClosedChannelException;
> import java.nio.channels.SelectableChannel;
>
> @@ -83,7 +84,7 @@
> }
>
> public void start() {
> - new Thread("GrizzlySuspendableMonitor") {
> + new WorkerThreadImpl(null,"GrizzlySuspendableMonitor") {
>
> {
> setDaemon(true);
> Index: modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java (working copy)
> @@ -49,6 +49,7 @@
> import com.sun.grizzly.async.ByteBufferCloner;
> import com.sun.grizzly.util.DefaultThreadPool;
> import com.sun.grizzly.util.InputReader;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.net.SocketAddress;
> import java.nio.ByteBuffer;
> @@ -287,7 +288,7 @@
> }
> });
> callbackHandler = new DefaultCallbackHandler(this, false);
> - new Thread(controller, "GrizzlyUDPConnectorHandler-Controller").start();
> + new WorkerThreadImpl("GrizzlyUDPConnectorHandler-Controller",controller).start();
>
> try {
> latch.await();
> Index: modules/grizzly/src/main/java/com/sun/grizzly/DefaultSelectionKeyHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/DefaultSelectionKeyHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/DefaultSelectionKeyHandler.java (working copy)
> @@ -45,7 +45,6 @@
> import java.nio.channels.SelectionKey;
> import java.nio.channels.Selector;
> import java.util.Iterator;
> -import java.util.logging.Level;
>
>
> /**
> @@ -139,13 +138,11 @@
> */
> protected void doRegisterKey(SelectionKey key, int selectionKeyOps,
> long currentTime) {
> - if (!key.isValid()) {
> - return;
> - }
> -
> + if (key.isValid()) {
> key.interestOps(key.interestOps() | selectionKeyOps);
> addExpirationStamp(key);
> }
> + }
>
> /**
> * {_at_inheritDoc}
> @@ -257,8 +254,7 @@
> *
> * @param {_at_link SelectionKey}
> */
> - protected void addExpirationStamp(SelectionKey key) {
> - long currentTime = System.currentTimeMillis();
> + protected void addExpirationStamp(SelectionKey key, long currentTime) {
> Object attachment = key.attachment();
> if (attachment == null) {
> key.attach(currentTime);
> @@ -266,35 +262,24 @@
> ((SelectionKeyAttachment) attachment).setTimeout(currentTime);
> }
> }
> -
> +
> + protected void addExpirationStamp(SelectionKey key) {
> + addExpirationStamp(key,System.currentTimeMillis());
> + }
> +
> /**
> - * Gets expiration timeout stamp from the {_at_link SelectionKey}
> + * Gets expiration timeout stamp from the {_at_link SelectionKey}
> * depending on its attachment
> - *
> + *
> * @param {_at_link SelectionKey}
> */
> - private long getExpirationStamp(SelectionKey key) {
> - Object attachment = key.attachment();
> - if (attachment != null) {
> - try {
> -
> - // This is extremely bad to invoke instanceof here but
> - // since the framework expose the SelectionKey, an application
> - // can always attach an object on the SelectionKey and we
> - // can't predict the type of the attached object.
> - if (attachment instanceof Long) {
> - return (Long) attachment;
> - } else if (attachment instanceof SelectionKeyAttachment) {
> - return ((SelectionKeyAttachment) attachment).getTimeout();
> - }
> - } catch (ClassCastException ex) {
> - if (logger.isLoggable(Level.FINEST)) {
> - logger.log(Level.FINEST,
> - "Invalid SelectionKey attachment", ex);
> - }
> - }
> + protected long getExpirationStamp(Object attachment) {
> + if (attachment instanceof Long) {
> + return (Long) attachment;
> }
> -
> + if (attachment instanceof SelectionKeyAttachment) {
> + return ((SelectionKeyAttachment) attachment).getTimeout();
> + }
> return SelectionKeyAttachment.UNLIMITED_TIMEOUT;
> }
> }
> Index: modules/grizzly/src/main/java/com/sun/grizzly/SSLConnectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/SSLConnectorHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/SSLConnectorHandler.java (working copy)
> @@ -50,6 +50,7 @@
> import com.sun.grizzly.util.OutputWriter;
> import com.sun.grizzly.util.SSLOutputWriter;
> import com.sun.grizzly.util.SSLUtils;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.EOFException;
> import java.io.IOException;
> import java.net.SocketAddress;
> @@ -433,7 +434,7 @@
> }
> });
> callbackHandler = new DefaultCallbackHandler(this,false);
> - new Thread(controller, "GrizzlySSLConnectorHandler-Controller").start();
> + new WorkerThreadImpl("GrizzlySSLConnectorHandler-Controller",controller).start();
>
> try {
> latch.await();
> Index: modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java (working copy)
> @@ -60,21 +60,21 @@
> * @author Jeanfrancois Arcand
> */
> public interface SelectorHandler extends Handler, Copyable,
> - AttributeHolder, SupportStateHolder<State> {
> + AttributeHolder, SupportStateHolder<State>, PendingIOhandler {
>
> /**
> * enqueues runnable for later execution in postSelect <br>
> * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.<br>
> * @param runnable
> -
> + */
> public void addPendingIO(Runnable runnable);
>
> -
> + /**
> * enqueues SlectionKey for later cancel and close .<br>
> * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.<br>
> - * @param runnable
> -
> - public void addPendingKeyCancel(SelectionKey key);*/
> + * @param key
> + */
> + public void addPendingKeyCancel(SelectionKey key);
>
> /**
> * A token decribing the protocol supported by an implementation of this
> Index: modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (working copy)
> @@ -380,7 +380,7 @@
> }
>
> if (!key.isValid()){
> - selectorHandler.getSelectionKeyHandler().close(key);
> + selectorHandler.addPendingKeyCancel(key);
> continue;
> }
> final int readyOps = key.readyOps();
> @@ -1099,8 +1099,7 @@
> }
>
> for (int i=0; i < readThreadControllers.length; i++) {
> - // TODO Get a Thread from a Pool instead.
> - new Thread(readThreadControllers[i], "GrizzlyReadController-" + i).start();
> + new WorkerThreadImpl("GrizzlyReadController-" + i,readThreadControllers[i]).start();
> }
> }
>
> @@ -1136,21 +1135,20 @@
> * @param isRunAsync if true - <code>SelectorHandlerRunner</code> will be run
> * in separate <code>Thread</code>, if false - in current <code>Thread</code>
> */
> - private void startSelectorHandlerRunner(SelectorHandler selectorHandler,
> - boolean isRunAsync) {
> -
> + protected void startSelectorHandlerRunner(SelectorHandler selectorHandler, boolean async) {
> + if (selectorHandler.getThreadPool() == null){
> + selectorHandler.setThreadPool(threadPool);
> + }
> + Runnable selectorRunner = new SelectorHandlerRunner(this, selectorHandler);
> // check if there is java.nio.Selector already open,
> // if so, just notify the controller onReady() listeners
> if (selectorHandler.getSelector() != null) {
> notifyReady();
> }
> - Runnable selectorRunner = new SelectorHandlerRunner(this, selectorHandler);
> - if (isRunAsync) {
> - // if there are more than 1 selector handler - run it in separate thread
> - //_at_TODO Take Thread from ThreadPool?
> - new Thread(selectorRunner, "GrizzlySelectorRunner-" + selectorHandler.protocol()).start();
> - } else {
> - // else run it in current thread
> + if (async){
> + new WorkerThreadImpl("GrizzlySelectorRunner-" + selectorHandler.protocol(),
> + selectorRunner).start();
> + }else{
> selectorRunner.run();
> }
> }
> Index: modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java (working copy)
> @@ -42,9 +42,10 @@
> import com.sun.grizzly.async.AsyncQueueReader;
> import com.sun.grizzly.async.AsyncQueueReaderContextTask;
> import com.sun.grizzly.async.AsyncQueueWriter;
> +import com.sun.grizzly.async.AsyncQueueWriterContextTask;
> import com.sun.grizzly.async.TCPAsyncQueueWriter;
> -import com.sun.grizzly.async.AsyncQueueWriterContextTask;
> import com.sun.grizzly.async.TCPAsyncQueueReader;
> +import com.sun.grizzly.util.AttributeHolder;
> import com.sun.grizzly.util.Cloner;
> import com.sun.grizzly.util.Copyable;
> import com.sun.grizzly.util.LinkedTransferQueue;
> @@ -53,6 +54,7 @@
> import com.sun.grizzly.util.StateHolder;
> import java.io.IOException;
> import java.net.BindException;
> +import java.net.DatagramSocket;
> import java.net.InetAddress;
> import java.net.ServerSocket;
> import java.net.Socket;
> @@ -66,8 +68,10 @@
> import java.nio.channels.Selector;
> import java.nio.channels.ServerSocketChannel;
> import java.nio.channels.SocketChannel;
> +import java.util.ArrayList;
> import java.util.ConcurrentModificationException;
> import java.util.HashMap;
> +import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.concurrent.Callable;
> @@ -123,7 +127,19 @@
> */
> private final LinkedTransferQueue<SelectionKey> readWriteOpToRegister
> = new LinkedTransferQueue<SelectionKey>();
> +
> /**
> + * enqueued events from selectionkey attachment logic.
> + */
> + private List pendingIO = new ArrayList();
> +
> +
> + /**
> + * max number of pendingIO tasks that will be executed per worker thread.
> + */
Uppercase M for Max
> + private int pendingIOlimitPerThread = 100;
> +
> + /**
> * The socket tcpDelay.
> *
> * Default value for tcpNoDelay is disabled (set to true).
> @@ -398,33 +414,28 @@
>
> SelectionKeyOP operation;
> while((operation = opToRegister.poll()) != null) {
> - if ((operation.getOp() & SelectionKey.OP_CONNECT) != 0) {
> + int op = operation.getOp();
> + if ((op & SelectionKey.OP_CONNECT) != 0) {
> onConnectOp(ctx, (SelectionKeyOP.ConnectSelectionKeyOP) operation);
> } else{
> - if (operation.getChannel().isOpen()){
> - selectionKeyHandler.register(operation.getChannel(),operation.getOp());
> + SelectableChannel channel = operation.getChannel();
> + if (channel.isOpen()){
> + selectionKeyHandler.register(channel,op);
> }
> }
> }
>
> SelectionKey key;
> while((key=readWriteOpToRegister.poll()) != null){
> - if (key.isValid()){
> - selectionKeyHandler.
> - register(key, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
> - }
> + selectionKeyHandler.register(key, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
> }
> -
> +
> while((key=writeOpToRegister.poll()) != null){
> - if (key.isValid()){
> - selectionKeyHandler.register(key, SelectionKey.OP_WRITE);
> - }
> + selectionKeyHandler.register(key, SelectionKey.OP_WRITE);
> }
>
> while((key=readOpToRegister.poll()) != null){
> - if (key.isValid()){
> - selectionKeyHandler.register(key, SelectionKey.OP_READ);
> - }
> + selectionKeyHandler.register(key, SelectionKey.OP_READ);
> }
> }
>
> @@ -481,12 +492,68 @@
> * @param ctx {_at_link Context}
> */
> public void postSelect(Context ctx) {
> - //Selector.keys() performs isOpen() so we dont need to do it a second time.
> selectionKeyHandler.expire(keys().iterator());
> + executePendingIO();
> }
>
> + /**
> + * executes the pending IOs
> + */
Uppercase E
> + private void executePendingIO(){
> + if (pendingIO.size() > 0 ){
> + final List tasks = pendingIO;
> + // tests with upto 10K selectionkeys was faster with ArrayList then linkedlist
> + // (did only test up to 10k)
> + pendingIO = new ArrayList();
> + int size = tasks.size();
> + for (int x=0;x<size;){
> + doExecutePendiongIO(tasks,x,Math.min(x=+pendingIOlimitPerThread, size));
> + }
> + }
> + }
>
Need javadoc :-)
> + private void doExecutePendiongIO(final List tasks, final int start, final int end){
> + Runnable r = new Runnable(){
> + public void run() {
> + for (int i=start;i<end;i++){
> + Object obj = tasks.get(i);
> + try{
> + if (obj instanceof SelectionKey){
> + selectionKeyHandler.close(((SelectionKey)obj));
> + }else{
> + ((Runnable)obj).run();
> + }
> + }catch(Throwable t){
> + logger.log(Level.FINEST, "doExecutePendiongIO failed.", t);
> + }
> + }
> + }
> + };
> + threadPool.execute(r);
> + }
> +
> /**
> + * {_at_inheritDoc}
> + */
> + @Override
> + public void addPendingIO(Runnable runnable){
> + pendingIO.add(runnable);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + @Override
> + public void addPendingKeyCancel(SelectionKey key){
> + if (key.isValid()){
> + //we want to do this in the selector.select thread hence we do it now
> + // saving us the extra parsing for SelectionKey later
> + key.cancel();
> + }
> + pendingIO.add(key);
> + }
> +
> + /**
> * Register a SelectionKey to this Selector.
> */
> public void register(SelectionKey key, int ops) {
> @@ -586,7 +653,7 @@
> for(SelectionKey selectionKey : selector.keys()) {
> selectionKeyHandler.close(selectionKey);
> }
> -
> +
> isContinue = false;
> } catch (ConcurrentModificationException e) {
> // ignore
> @@ -641,11 +708,8 @@
> /**
> * {_at_inheritDoc}
> */
> - public SelectableChannel acceptWithoutRegistration(SelectionKey key)
> - throws IOException {
> - ServerSocketChannel server = (ServerSocketChannel) key.channel();
> - SocketChannel channel = server.accept();
> - return channel;
> + public SelectableChannel acceptWithoutRegistration(SelectionKey key) throws IOException {
> + return ((ServerSocketChannel) key.channel()).accept();
> }
>
> /**
> @@ -845,57 +909,36 @@
> */
> public void configureChannel(SelectableChannel channel) throws IOException{
> Socket socket = ((SocketChannel) channel).socket();
> -
> channel.configureBlocking(false);
> -
> - if (!channel.isOpen()){
> - return;
> + if(socketTimeout > 0 ) {
> + socket.setSoTimeout(socketTimeout);
> }
> -
> - try{
> - if(socketTimeout >= 0 ) {
> - socket.setSoTimeout(socketTimeout);
> - }
> - } catch (SocketException ex){
> - if (logger.isLoggable(Level.FINE)){
> - logger.log(Level.FINE,
> - "setSoTimeout exception ",ex);
> - }
> + if(linger > 0 ) {
> + socket.setSoLinger( true, linger);
> }
> + socket.setTcpNoDelay(tcpNoDelay);
> + socket.setReuseAddress(reuseAddress);
> + }
>
> - try{
> - if(linger >= 0 ) {
> - socket.setSoLinger( true, linger);
> - }
> - } catch (SocketException ex){
> - if (logger.isLoggable(Level.FINE)){
> - logger.log(Level.FINE,
> - "setSoLinger exception ",ex);
> - }
> - }
>
> - try{
> - socket.setTcpNoDelay(tcpNoDelay);
> - } catch (SocketException ex){
> - if (logger.isLoggable(Level.FINE)){
> - logger.log(Level.FINE,
> - "setTcpNoDelay exception ",ex);
> - }
> - }
> + // ------------------------------------------------------ Properties -----//
>
> - try{
> - socket.setReuseAddress(reuseAddress);
> - } catch (SocketException ex){
> - if (logger.isLoggable(Level.FINE)){
> - logger.log(Level.FINE,
> - "setReuseAddress exception ",ex);
> - }
> - }
> + /**
> + * max number of pendingIO tasks that will be executed per worker thread.
Uppercase M
> + * @return
Return ...
> + */
> + public int getPendingIOlimitPerThread() {
> + return pendingIOlimitPerThread;
> }
>
> + /**
> + * max number of pendingIO tasks that will be executed per worker thread.
> + * @param pendingIOlimitPerThread
> + */
Uppercase M
> + public void setPendingIOlimitPerThread(int pendingIOlimitPerThread) {
> + this.pendingIOlimitPerThread = pendingIOlimitPerThread;
> + }
>
> - // ------------------------------------------------------ Properties -----//
> -
> public final Selector getSelector() {
> return selector;
> }
> Index: modules/grizzly/src/main/java/com/sun/grizzly/BaseSelectionKeyHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/BaseSelectionKeyHandler.java (revision 2838)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/BaseSelectionKeyHandler.java (working copy)
> @@ -163,10 +163,9 @@
> * Registers {_at_link SelectionKey} to handle certain operations
> */
> protected void doRegisterKey(SelectionKey key, int selectionKeyOps) {
> - if (!key.isValid()) {
> - return;
> - }
> - key.interestOps(key.interestOps() | selectionKeyOps);
> + if (key.isValid()) {
> + key.interestOps(key.interestOps() | selectionKeyOps);
> + }
> }
>
>
Thanks!!!
-- Jeanfrancois
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net