Coverage Report - com.sun.grizzly.Controller
 
Classes in this File Line Coverage Branch Coverage Complexity
Controller
75 %
269/357
64 %
119/186
0
Controller$1
100 %
2/2
N/A
0
Controller$2
57 %
4/7
N/A
0
Controller$Protocol
100 %
1/1
N/A
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 
 39  
 package com.sun.grizzly;
 40  
 
 41  
 import com.sun.grizzly.util.AttributeHolder;
 42  
 import com.sun.grizzly.util.Cloner;
 43  
 import com.sun.grizzly.util.ConcurrentLinkedQueuePool;
 44  
 import com.sun.grizzly.util.Copyable;
 45  
 import com.sun.grizzly.util.State;
 46  
 import com.sun.grizzly.util.StateHolder;
 47  
 import com.sun.grizzly.util.SupportStateHolder;
 48  
 import java.io.IOException;
 49  
 import java.nio.channels.ClosedChannelException;
 50  
 import java.nio.channels.ClosedSelectorException;
 51  
 import java.nio.channels.SelectionKey;
 52  
 import java.nio.channels.Selector;
 53  
 import java.util.Collection;
 54  
 import java.util.HashMap;
 55  
 import java.util.Iterator;
 56  
 import java.util.Map;
 57  
 import java.util.Set;
 58  
 import java.util.concurrent.Callable;
 59  
 import java.util.concurrent.ConcurrentLinkedQueue;
 60  
 import java.util.concurrent.CountDownLatch;
 61  
 import java.util.concurrent.atomic.AtomicInteger;
 62  
 import java.util.logging.Level;
 63  
 import java.util.logging.Logger;
 64  
 
 65  
 import static com.sun.grizzly.Context.OpType;
 66  
 
 67  
 /**
 68  
  * <p>
 69  
  * Main entry point when using the Grizzly Framework. A Controller is composed
 70  
  * of Handlers, ProtocolChain and Pipeline. All of those components are
 71  
  * configurable by client using the Grizzly Framework.
 72  
  * </p>
 73  
  *
 74  
  * <p>
 75  
  * A Pipeline is a wrapper around a Thread pool.
 76  
  * </p>
 77  
  * <p>
 78  
  * A ProtocolChain implement the "Chain of Responsibility" pattern (for more info,
 79  
  * take a look at the classic "Gang of Four" design patterns book). Towards
 80  
  * that end, the Chain API models a computation as a series of "protocol filter"
 81  
  * that can be combined into a "protocol chain".
 82  
  * </p>
 83  
  * <p>
 84  
  * An Handler is a interface that can be implemented
 85  
  * by implemented by client of the Grizzly Framework to used to help handling
 86  
  * NIO operations. The Grizzly Framework define three Handlers:
 87  
  * </p>
 88  
  * <p><pre><code>
 89  
  * (1) SelectorHandler: A SelectorHandler handles all java.nio.channels.Selector
 90  
  *                     operations. One or more instance of a Selector are
 91  
  *                     handled by SelectorHandler. The logic for processing of
 92  
  *                     SelectionKey interest (OP_ACCEPT,OP_READ, etc.) is usually
 93  
  *                     defined using an instance of SelectorHandler.
 94  
  * (2) SelectionKeyHandler: A SelectionKeyHandler is used to handle the life
 95  
  *                          life cycle of a SelectionKey. Operations like canceling,
 96  
  *                          registering or closing are handled by SelectionKeyHandler.
 97  
  * (3) ProtocolChainInstanceHandler: An ProtocolChainInstanceHandler is where one or several ProtocolChain
 98  
  *                      are created and cached. An ProtocolChainInstanceHandler decide if
 99  
  *                      a stateless or statefull ProtocolChain needs to be created.
 100  
  * </code></pre></p>
 101  
  * <p>
 102  
  * By default, the Grizzly Framework bundles implementation for TCP
 103  
  * and UPD transport. The TCPSelectorHandler is instanciated by default. As an
 104  
  * example, supporting the HTTP protocol should only consist of adding the
 105  
  * appropriate ProtocolFilter like:
 106  
  * </p>
 107  
  * <p><pre><code>
 108  
  *       Controller sel = new Controller();
 109  
  *       sel.setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler(){
 110  
  *           public ProtocolChain poll() {
 111  
  *               ProtocolChain protocolChain = protocolChains.poll();
 112  
  *               if (protocolChain == null){
 113  
  *                   protocolChain = new DefaultProtocolChain();
 114  
  *                   protocolChain.addFilter(new ReadFilter());
 115  
  *                   protocolChain.addFilter(new HTTPParserFilter());
 116  
  *               }
 117  
  *               return protocolChain;
 118  
  *           }
 119  
  *       });
 120  
  *
 121  
  * </code></pre></p>
 122  
  * <p>
 123  
  * In the example above, a pool of ProtocolChain will be created, and all instance
 124  
  * of ProtocolChain will have their instance of ProtocolFilter. Hence the above
 125  
  * implementation can be called statefull. A stateless implementation would
 126  
  * instead consist of sharing the ProtocolFilter among ProtocolChain:
 127  
  * </p>
 128  
  * <p><pre><code>
 129  
  *       final Controller sel = new Controller();
 130  
  *       final ReadFilter readFilter = new ReadFilter();
 131  
  *       final LogFilter logFilter = new LogFilter();
 132  
  *
 133  
  *       sel.setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler(){
 134  
  *           public ProtocolChain poll() {
 135  
  *               ProtocolChain protocolChain = protocolChains.poll();
 136  
  *               if (protocolChain == null){
 137  
  *                   protocolChain = new DefaultProtocolChain();
 138  
  *                   protocolChain.addFilter(readFilter);
 139  
  *                   protocolChain.addFilter(logFilter);
 140  
  *               }
 141  
  *               return protocolChain;
 142  
  *           }
 143  
  *       });
 144  
  * </code></pre></p>
 145  
  * @author Jeanfrancois Arcand
 146  
  */
 147  
 public class Controller implements Runnable, Lifecycle, Copyable, 
 148  
         ConnectorHandlerPool, AttributeHolder, SupportStateHolder<State> {
 149  
 
 150  5
     public enum Protocol { UDP, TCP , TLS, CUSTOM }
 151  
     
 152  
     
 153  
     /**
 154  
      * A cached list of Context. Context are by default stateless.
 155  
      */
 156  
     private ConcurrentLinkedQueuePool<Context> contexts;
 157  
     
 158  
     
 159  
     /**
 160  
      * The ProtocolChainInstanceHandler used by this instance. If not set, and instance
 161  
      * of the DefaultInstanceHandler will be created.
 162  
      */
 163  
     protected ProtocolChainInstanceHandler instanceHandler;
 164  
     
 165  
     
 166  
     /**
 167  
      * The SelectionKey Handler used by this instance. If not set, and instance
 168  
      * of the DefaultSelectionKeyHandler will be created.
 169  
      */
 170  
     protected SelectionKeyHandler selectionKeyHandler;
 171  
     
 172  
     
 173  
     /**
 174  
      * The SelectorHandler, which will manage connection accept,
 175  
      * if readThreadsCount > 0 and spread connection processing between
 176  
      * different read threads
 177  
      */
 178  85
     protected ComplexSelectorHandler multiReadThreadSelectorHandler = null;
 179  
     
 180  
     
 181  
     /**
 182  
      * The ConnectorHandlerPool, which is responsible for creating/caching
 183  
      * ConnectorHandler instances.
 184  
      */
 185  85
     protected ConnectorHandlerPool connectorHandlerPool = null;
 186  
     
 187  
     
 188  
     /**
 189  
      * The set of {@link SelectorHandler}s used by this instance. If not set, the instance
 190  
      * of the TCPSelectorHandler will be added by default.
 191  
      */
 192  
     protected ConcurrentLinkedQueue<SelectorHandler> selectorHandlers;
 193  
     
 194  
     
 195  
     /**
 196  
      * Current {@link Controller} state
 197  
      */
 198  
     protected StateHolder<State> stateHolder;
 199  
     
 200  
     
 201  
     /**
 202  
      * The number of read threads
 203  
      */
 204  85
     protected int readThreadsCount = 0;
 205  
     
 206  
     
 207  
     /**
 208  
      * The array of {@link Controller}s to be used for reading
 209  
      */
 210  
     protected ReadController[] readThreadControllers;
 211  
     
 212  
     
 213  
     /**
 214  
      * Default Logger.
 215  
      */
 216  1
     private static Logger logger = Logger.getLogger("grizzly");
 217  
     
 218  
     
 219  
     /**
 220  
      * Default Thread Pool (called Pipeline).If not set, and instance
 221  
      * of the DefaultPipeline will be created.
 222  
      */
 223  
     private Pipeline<Callable> pipeline;
 224  
     
 225  
     
 226  
     /**
 227  
      * Collection of {@link Controller} state listeners, which
 228  
      * will are notified on {@link Controller} state change.
 229  
      */
 230  85
     protected Collection<ControllerStateListener> stateListeners = 
 231  
             new ConcurrentLinkedQueue<ControllerStateListener>();
 232  
     
 233  
     
 234  
     /**
 235  
      * Internal countdown counter of {@link SelectorHandler}s, which 
 236  
      * are ready to process
 237  
      */
 238  
     protected AtomicInteger readySelectorHandlerCounter;
 239  
 
 240  
     /**
 241  
      * Internal countdown counter of {@link SelectorHandler}s, which stopped
 242  
      */
 243  
     protected AtomicInteger stoppedSelectorHandlerCounter;
 244  
     
 245  
     
 246  
     /**
 247  
      * <tt>true</tt> if OP_READ and OP_WRITE can be handled concurrently.
 248  
      */
 249  85
     private boolean handleReadWriteConcurrently = true;
 250  
     
 251  
     
 252  
     /**
 253  
      * Attributes, associated with the {@link Controller} instance
 254  
      */
 255  
     protected Map<String, Object> attributes;
 256  
     
 257  
     
 258  
     /** 
 259  
      * The current Controller instance.
 260  
      * 
 261  
      */
 262  1
     private final static ConcurrentLinkedQueue<Controller> controllers = 
 263  
             new ConcurrentLinkedQueue<Controller>();
 264  
     
 265  
     
 266  
     // -------------------------------------------------------------------- //
 267  
     
 268  
     /**
 269  
      * Controller constructor
 270  
      */
 271  85
     public Controller() {
 272  85
         contexts = new ConcurrentLinkedQueuePool<Context>() {
 273  
             @Override
 274  
             public Context newInstance() {
 275  261
                 return new Context();
 276  
             }
 277  
         };
 278  
         
 279  85
         stateHolder = new StateHolder<State>(true);
 280  85
         initializeDefaults();
 281  85
     }
 282  
     
 283  
     
 284  
     /**
 285  
      * This method initializes this Controller's default Pipeline,
 286  
      * default ProtocolChainInstanceHandler, default SelectorHandler(s)
 287  
      * and default ConnectorHandlerPool.  These defaults can be overridden
 288  
      * after this Controller constructor is called and before calling 
 289  
      * Controller.start() using this Controller's mutator methods to
 290  
      * set a different Pipeline, ProtocolChainInstanceHandler, 
 291  
      * SelectorHandler(s) or ConnectorHandlerPool.
 292  
      */
 293  
     private void initializeDefaults() {
 294  85
         if (pipeline == null) {
 295  85
             pipeline = new DefaultPipeline();
 296  
         }
 297  85
         if (instanceHandler == null) {
 298  85
             instanceHandler = new DefaultProtocolChainInstanceHandler();
 299  
         }
 300  85
         if (selectorHandlers == null){
 301  85
             selectorHandlers = new ConcurrentLinkedQueue<SelectorHandler>();
 302  
         }
 303  85
         if (connectorHandlerPool == null) {
 304  85
             connectorHandlerPool = new DefaultConnectorHandlerPool(this);
 305  
         }
 306  85
         controllers.add(this);
 307  85
     }
 308  
 
 309  
     
 310  
     /**
 311  
      * This method handle the processing of all Selector's interest op
 312  
      * (OP_ACCEPT,OP_READ,OP_WRITE,OP_CONNECT) by delegating to its Handler.
 313  
      * By default, all java.nio.channels.Selector operations are implemented
 314  
      * using SelectorHandler. All SelectionKey operations are implemented by
 315  
      * SelectionKeyHandler. Finally, ProtocolChain creation/re-use are implemented
 316  
      * by InstanceHandler.
 317  
      * @param selectorHandler - the {@link SelectorHandler}
 318  
      */
 319  
     protected void doSelect(SelectorHandler selectorHandler){
 320  249800
         SelectionKey key = null;
 321  
         Set<SelectionKey> readyKeys;
 322  
         Iterator<SelectionKey> iterator;
 323  
         int selectorState;
 324  249800
         boolean delegateToWorkerThread = false;
 325  249800
         Context serverCtx = contexts.poll();
 326  249800
         serverCtx.setController(this);
 327  
         
 328  249800
         serverCtx.setSelectorHandler(selectorHandler);
 329  
         
 330  
         try {
 331  249800
             selectorState = 0;
 332  
             
 333  
             // Set the SelectionKeyHandler only if the SelectorHandler doesn't
 334  
             // define one.
 335  249800
             if (selectorHandler.getSelectionKeyHandler() == null){
 336  123
                 if (logger.isLoggable(Level.FINE)) {
 337  0
                     logger.log(Level.FINE, "Set DefaultSelectionKeyHandler to SelectorHandler: " + selectorHandler);
 338  
                 }
 339  
                 
 340  
                 
 341  123
                 SelectionKeyHandler assgnSelectionKeyHandler = null;
 342  
                 
 343  123
                 if (selectorHandler.getPreferredSelectionKeyHandler() != null) {
 344  123
                     Class<? extends SelectionKeyHandler> keyHandlerClass =
 345  
                                 selectorHandler.getPreferredSelectionKeyHandler();
 346  
                     try {
 347  123
                         assgnSelectionKeyHandler = keyHandlerClass.newInstance();
 348  123
                         assgnSelectionKeyHandler.setSelectorHandler(selectorHandler);
 349  0
                     } catch (Exception e) {
 350  0
                         if (logger.isLoggable(Level.WARNING)) {
 351  0
                             logger.log(Level.WARNING, 
 352  
                                     "Exception initializing preffered SelectionKeyHandler '" + 
 353  
                                     keyHandlerClass + "' for the SelectorHandler '" + 
 354  
                                     selectorHandler + "'");
 355  
                         }
 356  123
                     }
 357  
                 }
 358  
                 
 359  123
                 if (assgnSelectionKeyHandler == null) {
 360  0
                     assgnSelectionKeyHandler = 
 361  
                             new DefaultSelectionKeyHandler(selectorHandler);
 362  
                 }
 363  
 
 364  123
                 selectorHandler.setSelectionKeyHandler(assgnSelectionKeyHandler);
 365  
             }
 366  
             
 367  249800
             selectorHandler.preSelect(serverCtx);
 368  
             
 369  249800
             readyKeys = selectorHandler.select(serverCtx);
 370  249800
             selectorState = readyKeys.size();
 371  249800
             if (stateHolder.getState(false) == State.STARTED && 
 372  
                     selectorHandler.getStateHolder().getState(false) == State.STARTED &&
 373  
                     selectorState != 0) {
 374  134163
                 iterator = readyKeys.iterator();
 375  290307
                 while (iterator.hasNext()) {
 376  156144
                     key = iterator.next();
 377  156144
                     iterator.remove();
 378  156144
                     OpType opType = null;
 379  156144
                     boolean skipOpWrite = false;
 380  156144
                     delegateToWorkerThread = false;
 381  156144
                     if (key.isValid()) {
 382  156144
                         if ((key.readyOps() & SelectionKey.OP_ACCEPT)
 383  
                                 == SelectionKey.OP_ACCEPT){
 384  191
                             if (readThreadsCount > 0 &&
 385  
                                     multiReadThreadSelectorHandler.supportsProtocol(selectorHandler.protocol())) {
 386  28
                                 if (logger.isLoggable(Level.FINE)) {
 387  0
                                     logger.log(Level.FINE, "OP_ACCEPT on " + key + " passed to multi readthread handler");
 388  
                                 }
 389  28
                                 delegateToWorkerThread = multiReadThreadSelectorHandler.
 390  
                                         onAcceptInterest(key, serverCtx);
 391  
                             } else {
 392  163
                                 if (logger.isLoggable(Level.FINE)) {
 393  0
                                     logger.log(Level.FINE, "OP_ACCEPT on " + key);
 394  
                                 }
 395  
                            
 396  163
                                 delegateToWorkerThread = selectorHandler.
 397  
                                         onAcceptInterest(key, serverCtx);
 398  
                             }
 399  163
                             continue;
 400  
                         } 
 401  
                                                 
 402  155953
                         if ((key.readyOps() & SelectionKey.OP_CONNECT)
 403  
                                 == SelectionKey.OP_CONNECT) {
 404  166
                             if (logger.isLoggable(Level.FINE)) {
 405  0
                                 logger.log(Level.FINE, "OP_CONNECT on " + key);
 406  
                             }
 407  
 
 408  166
                             delegateToWorkerThread = selectorHandler.
 409  
                                     onConnectInterest(key, serverCtx);
 410  166
                             continue;
 411  
                         }
 412  
                         
 413  
                         // OP_READ will always be processed first, then 
 414  
                         // based on the handleReadWriteConcurrently, the OP_WRITE
 415  
                         // might be processed just after or during the next
 416  
                         // Selector.select() invocation.
 417  155787
                         if ((key.readyOps() & SelectionKey.OP_READ)
 418  
                                 == SelectionKey.OP_READ) {
 419  153611
                             if (logger.isLoggable(Level.FINE)) {
 420  0
                                 logger.log(Level.FINE, "OP_READ on " + key);
 421  
                             }
 422  153611
                             delegateToWorkerThread = selectorHandler.
 423  
                                     onReadInterest(key,serverCtx);
 424  
                                                                                 
 425  153611
                             if (delegateToWorkerThread) {
 426  82614
                                 opType = OpType.OP_READ;
 427  
                             }
 428  
                             
 429  153611
                             if (!handleReadWriteConcurrently){
 430  245
                                 skipOpWrite = true;
 431  
                             }                        
 432  
                         } 
 433  
 
 434  
                         // The OP_READ processing might have closed the 
 435  
                         // Selection, hence we must make sure the
 436  
                         // SelectionKey is still valid.
 437  155787
                         if (!skipOpWrite && key.isValid() 
 438  
                                 && (key.readyOps() & SelectionKey.OP_WRITE)
 439  
                                 == SelectionKey.OP_WRITE) {
 440  2401
                             if (logger.isLoggable(Level.FINE)) {
 441  0
                                 logger.log(Level.FINE, "OP_WRITE on " + key);
 442  
                             }
 443  2401
                             boolean opWriteDelegateToWorkerThread = selectorHandler.
 444  
                                     onWriteInterest(key,serverCtx);
 445  2401
                             delegateToWorkerThread |= opWriteDelegateToWorkerThread;
 446  2401
                             if (opWriteDelegateToWorkerThread) {
 447  28
                                 if (opType == OpType.OP_READ) {
 448  23
                                     opType = OpType.OP_READ_WRITE;
 449  
                                 } else {
 450  5
                                     opType = OpType.OP_WRITE;
 451  
                                 }
 452  
                             }
 453  
                         } 
 454  
 
 455  155787
                         if (delegateToWorkerThread){
 456  82619
                             Context ctx = pollContext(key, opType);
 457  82619
                             configureContext(ctx,selectorHandler);
 458  82619
                             ctx.execute(ProtocolChainContextTask.poll());
 459  82619
                         }
 460  
                     } else {
 461  0
                         selectorHandler.getSelectionKeyHandler().cancel(key);
 462  
                     }
 463  155787
                 }
 464  
             }
 465  
             
 466  249800
             delegateToWorkerThread = false;
 467  249800
             selectorHandler.postSelect(serverCtx);
 468  249800
             contexts.offer(serverCtx);
 469  0
         } catch (ClosedSelectorException e) {
 470  
             // TODO: This could indicate that the Controller is
 471  
             //       shutting down. Hence, we need to handle this Exception
 472  
             //       appropriately. Perhaps check the state before logging
 473  
             //       what's happening ?
 474  0
             if (stateHolder.getState() == State.STARTED &&
 475  
                     selectorHandler.getStateHolder().getState() == State.STARTED) {
 476  0
                 logger.log(Level.SEVERE, "Selector was unexpectedly closed.");
 477  0
                 notifyException(e);
 478  
             } else {
 479  0
                 logger.log(Level.FINE, "doSelect Selector closed");
 480  
             }
 481  0
         } catch (ClosedChannelException e) {
 482  
             // Don't use stateLock. This case is not strict
 483  0
             if (stateHolder.getState() == State.STARTED &&
 484  
                     selectorHandler.getStateHolder().getState() == State.STARTED) {
 485  0
                 logger.log(Level.WARNING, "Channel was unexpectedly closed");
 486  0
                 if (key != null){
 487  0
                     selectorHandler.getSelectionKeyHandler().cancel(key);
 488  
                 }
 489  
                 
 490  0
                 notifyException(e);
 491  
             }
 492  0
         } catch (Throwable t) {
 493  
             try{
 494  0
                 if (key != null){
 495  0
                     selectorHandler.getSelectionKeyHandler().cancel(key);
 496  
                 }
 497  
 
 498  0
                 notifyException(t);
 499  0
                 logger.log(Level.SEVERE,"doSelect exception",t);
 500  0
             } catch (Throwable t2){
 501  
                 // An unexpected exception occured, most probably caused by 
 502  
                 // a bad logger. Since logger can be externally configurable,
 503  
                 // just output the exception on the screen and continue the
 504  
                 // normal execution.
 505  0
                 t2.printStackTrace();
 506  0
             }
 507  249800
         }
 508  249800
     }
 509  
 
 510  
     
 511  
     /**
 512  
      * Register a SelectionKey.
 513  
      * @param key <tt>SelectionKey</tt> to register
 514  
      */
 515  
     public void registerKey(SelectionKey key){
 516  0
         registerKey(key,SelectionKey.OP_READ);
 517  0
     }
 518  
     
 519  
     
 520  
     /**
 521  
      * Register a SelectionKey on the first SelectorHandler that was added
 522  
      * using the addSelectorHandler().
 523  
      * @param key <tt>SelectionKey</tt> to register
 524  
      * @param ops - the interest op to register
 525  
      */
 526  
     public void registerKey(SelectionKey key, int ops){
 527  0
         registerKey(key, ops, selectorHandlers.peek().protocol());
 528  0
     }
 529  
     
 530  
     
 531  
     /**
 532  
      * Register a SelectionKey.
 533  
      * @param key <tt>SelectionKey</tt> to register
 534  
      * @param ops - the interest op to register
 535  
      * @param protocol specified protocol SelectorHandler key should be registered on
 536  
      */
 537  
     public void registerKey(SelectionKey key, int ops, Protocol protocol){
 538  21
         if (stateHolder.getState() == State.STOPPED) {
 539  0
             return;
 540  
         }
 541  
         
 542  21
         getSelectorHandler(protocol).register(key,ops);
 543  21
     }
 544  
     
 545  
     
 546  
     /**
 547  
      * Cancel a SelectionKey
 548  
      * @param key <tt>SelectionKey</tt> to cancel
 549  
      * @deprecated
 550  
      */
 551  
     public void cancelKey(SelectionKey key){
 552  0
         if (stateHolder.getState() == State.STOPPED) {
 553  0
             return;
 554  
         }
 555  
         
 556  0
         SelectorHandler selectorHandler = getSelectorHandler(key.selector());
 557  0
         if (selectorHandler != null) {
 558  0
             selectorHandler.getSelectionKeyHandler().cancel(key);
 559  
         } else {
 560  0
             throw new IllegalStateException("SelectionKey is not associated " +
 561  
                     "with known SelectorHandler");
 562  
         }
 563  0
     }
 564  
 
 565  
     /**
 566  
      * Get an instance of a {@link Context}
 567  
      * @param key {@link SelectionKey}
 568  
      * @return {@link Context}
 569  
      */    
 570  
     /* package */ public Context pollContext(SelectionKey key) {
 571  12
         return pollContext(key,null);
 572  
     }
 573  
     
 574  
     
 575  
     /**
 576  
      * Get an instance of a {@link Context}
 577  
      * @param key {@link SelectionKey}
 578  
      * @param opType the current SelectionKey op.
 579  
      * @return {@link Context}
 580  
      */    
 581  
     /* package */ public Context pollContext(SelectionKey key, OpType opType) {
 582  156198
         Context ctx = contexts.poll();
 583  156198
         ctx.setController(this);
 584  156198
         ctx.setSelectionKey(key);
 585  156198
         if (opType != null) {
 586  156186
             ctx.setCurrentOpType(opType);
 587  
         } else {
 588  12
             ctx.configureOpType(key);
 589  
         }
 590  
             
 591  156198
         return ctx;
 592  
     }
 593  
     
 594  
     /* package */ public void configureContext(Context ctx,SelectorHandler selectorHandler){
 595  82622
         ctx.setSelectorHandler(selectorHandler);
 596  82622
         ctx.setPipeline(selectorHandler.pipeline());
 597  82622
         ctx.setAsyncQueueReader(selectorHandler.getAsyncQueueReader());
 598  82622
         ctx.setAsyncQueueWriter(selectorHandler.getAsyncQueueWriter());
 599  82622
     }
 600  
     
 601  
     /**
 602  
      * Return a Context to the pool
 603  
      * @param ctx - the {@link Context}
 604  
      */
 605  
     public void returnContext(Context ctx){
 606  156188
         ctx.recycle();
 607  156189
         contexts.offer(ctx);
 608  156189
     }
 609  
     
 610  
     
 611  
     /**
 612  
      * Return the current <code>Logger</code> used by this Controller.
 613  
      */
 614  
     public static Logger logger() {
 615  1077115
         return logger;
 616  
     }
 617  
     
 618  
     
 619  
     /**
 620  
      * Set the Logger single instance to use.
 621  
      */
 622  
     public static void setLogger(Logger l){
 623  0
         logger = l;
 624  0
     }
 625  
     
 626  
     // ------------------------------------------------------ Handlers ------//
 627  
     
 628  
     
 629  
     /**
 630  
      * Set the {@link ProtocolChainInstanceHandler} to use for 
 631  
      * creating instance of {@link ProtocolChain}.
 632  
      */
 633  
     public void setProtocolChainInstanceHandler(ProtocolChainInstanceHandler 
 634  
             instanceHandler){
 635  39
         this.instanceHandler = instanceHandler;
 636  39
     }
 637  
     
 638  
     /**
 639  
      * Return the {@link ProtocolChainInstanceHandler}
 640  
      */
 641  
     public ProtocolChainInstanceHandler getProtocolChainInstanceHandler(){
 642  312378
         return instanceHandler;
 643  
     }
 644  
     
 645  
     
 646  
     /**
 647  
      * @deprecated
 648  
      * Set the {@link SelectionKeyHandler} to use for managing the life
 649  
      * cycle of SelectionKey.
 650  
      * Method is deprecated. Use SelectorHandler.setSelectionKeyHandler() instead
 651  
      */
 652  
     public void setSelectionKeyHandler(SelectionKeyHandler selectionKeyHandler){
 653  0
         this.selectionKeyHandler = selectionKeyHandler;
 654  0
     }
 655  
     
 656  
     
 657  
     /**
 658  
      * @deprecated
 659  
      * Return the {@link SelectionKeyHandler}
 660  
      * Method is deprecated. Use SelectorHandler.getSelectionKeyHandler() instead
 661  
      */
 662  
     public SelectionKeyHandler getSelectionKeyHandler(){
 663  0
         return selectionKeyHandler;
 664  
     }
 665  
     
 666  
     
 667  
     /**
 668  
      * Add a {@link SelectorHandler}
 669  
      * @param selectorHandler - the {@link SelectorHandler}
 670  
      */
 671  
     public void addSelectorHandler(SelectorHandler selectorHandler) {
 672  134
         selectorHandlers.add(selectorHandler);
 673  134
         if (stateHolder.getState(false) != null && 
 674  
                 !State.STOPPED.equals(stateHolder.getState())) {
 675  40
             addSelectorHandlerOnReadControllers(selectorHandler);
 676  40
             if (readySelectorHandlerCounter != null) {
 677  40
                 readySelectorHandlerCounter.incrementAndGet();
 678  
             }
 679  40
             if (stoppedSelectorHandlerCounter != null) {
 680  40
                 stoppedSelectorHandlerCounter.incrementAndGet();
 681  
             }
 682  40
             startSelectorHandlerRunner(selectorHandler, true);
 683  
         }
 684  134
     }
 685  
     
 686  
     
 687  
     /**
 688  
      * Set the first {@link SelectorHandler}
 689  
      * @param selectorHandler - the {@link SelectorHandler}
 690  
      */
 691  
     public void setSelectorHandler(SelectorHandler selectorHandler){
 692  68
         addSelectorHandler(selectorHandler);
 693  68
     }
 694  
     
 695  
     
 696  
     /**
 697  
      * Return the {@link SelectorHandler} associated with the protocol.
 698  
      * @param protocol - the {@link Controller.Protocol}
 699  
      * @return {@link SelectorHandler}
 700  
      */
 701  
     public SelectorHandler getSelectorHandler(Protocol protocol){
 702  559
         for (SelectorHandler selectorHandler: selectorHandlers){
 703  559
             if (selectorHandler.protocol() == protocol){
 704  559
                 return selectorHandler;
 705  
             }
 706  
         }
 707  0
         return null;
 708  
     }
 709  
     
 710  
     /**
 711  
      * Return the {@link SelectorHandler} associated 
 712  
      * with the {@link Selector}.
 713  
      * @param selector - the {@link Selector}
 714  
      * @return {@link SelectorHandler}
 715  
      */
 716  
     public SelectorHandler getSelectorHandler(Selector selector){
 717  0
         for (SelectorHandler selectorHandler: selectorHandlers){
 718  0
             if (selectorHandler.getSelector() == selector){
 719  0
                 return selectorHandler;
 720  
             }
 721  
         }
 722  0
         return null;
 723  
     }
 724  
     
 725  
     /**
 726  
      * Return the list {@link SelectorHandler}
 727  
      * @return {@link ConcurrentLinkedQueue}
 728  
      */
 729  
     public ConcurrentLinkedQueue getSelectorHandlers(){
 730  7297
         return selectorHandlers;
 731  
     }
 732  
     
 733  
     
 734  
     /**
 735  
      * Shuts down {@link SelectorHandler} and removes it from this 
 736  
      * {@link Controller} list
 737  
      * @param {@link SelectorHandler} to remove
 738  
      */
 739  
     public void removeSelectorHandler(SelectorHandler selectorHandler) {
 740  1
         if (selectorHandlers.remove(selectorHandler)) {
 741  1
             removeSelectorHandlerOnReadControllers(selectorHandler);
 742  1
             selectorHandler.shutdown();
 743  
         }
 744  1
     }
 745  
     
 746  
     
 747  
     /**
 748  
      * Return the {@link Pipeline} (Thread Pool) used by this Controller.
 749  
      */
 750  
     public Pipeline getPipeline() {
 751  82704
         return pipeline;
 752  
     }
 753  
     
 754  
     
 755  
     /**
 756  
      * Set the {@link Pipeline} (Thread Pool).
 757  
      */
 758  
     public void setPipeline(Pipeline<Callable> pipeline) {
 759  34
         this.pipeline = pipeline;
 760  34
     }
 761  
     
 762  
     
 763  
     /**
 764  
      * Return the number of Reader threads count.
 765  
      */
 766  
     public int getReadThreadsCount() {
 767  0
         return readThreadsCount;
 768  
     }
 769  
     
 770  
     
 771  
     /**
 772  
      * Set the number of Reader threads count.
 773  
      */
 774  
     public void setReadThreadsCount(int readThreadsCount) {
 775  21
         this.readThreadsCount = readThreadsCount;
 776  21
     }
 777  
     
 778  
     
 779  
     /**
 780  
      * Return the <code>ConnectorHandlerPool</code> used.
 781  
      */
 782  
     public ConnectorHandlerPool getConnectorHandlerPool() {
 783  0
         return connectorHandlerPool;
 784  
     }
 785  
      
 786  
     
 787  
     /**
 788  
      * Set the <code>ConnectorHandlerPool</code> used.
 789  
      */
 790  
     public void setConnectorHandlerPool(ConnectorHandlerPool connectorHandlerPool) {
 791  1
         this.connectorHandlerPool = connectorHandlerPool;
 792  1
     }
 793  
     
 794  
     // ------------------------------------------------------ Runnable -------//
 795  
     
 796  
     
 797  
     /**
 798  
      * Execute this Controller.
 799  
      */
 800  
     public void run() {
 801  
         try{
 802  84
             start();
 803  0
         } catch(IOException e){
 804  0
             notifyException(e);
 805  0
             throw new RuntimeException(e.getCause());
 806  84
         }
 807  84
     }
 808  
     
 809  
     // -------------------------------------------------------- Copyable ----//
 810  
     
 811  
     
 812  
     /**
 813  
      * Copy this Controller state to another instance of a Controller.
 814  
      */
 815  
     public void copyTo(Copyable copy) {
 816  17
         Controller copyController = (Controller) copy;
 817  17
         copyController.contexts = contexts;
 818  17
         copyController.attributes = attributes;
 819  17
         copyController.instanceHandler = instanceHandler;
 820  17
         copyController.pipeline = pipeline;
 821  17
         copyController.readThreadControllers = readThreadControllers;
 822  17
         copyController.readThreadsCount = readThreadsCount;
 823  17
         copyController.selectionKeyHandler = selectionKeyHandler;
 824  17
         copyController.stateHolder = stateHolder;
 825  17
     }
 826  
     
 827  
     // -------------------------------------------------------- Lifecycle ----//
 828  
     
 829  
     /**
 830  
      * Add controller state listener
 831  
      */
 832  
     public void addStateListener(ControllerStateListener stateListener) {
 833  139
         stateListeners.add(stateListener);
 834  139
     }
 835  
     
 836  
     /**
 837  
      * Remove controller state listener
 838  
      */
 839  
     public void removeStateListener(ControllerStateListener stateListener) {
 840  69
         stateListeners.remove(stateListener);
 841  69
     }
 842  
     
 843  
     /**
 844  
      * Notify controller started
 845  
      */
 846  
     void notifyStarted() {
 847  86
         for(ControllerStateListener stateListener : stateListeners) {
 848  70
             stateListener.onStarted();
 849  
         }
 850  86
     }
 851  
     
 852  
     
 853  
     /**
 854  
      * Notify controller is ready
 855  
      */
 856  
     void notifyReady() {
 857  136
         if (readySelectorHandlerCounter.decrementAndGet() == 0) {
 858  87
             for (ControllerStateListener stateListener : stateListeners) {
 859  72
                 stateListener.onReady();
 860  
             }
 861  
         }
 862  136
     }
 863  
 
 864  
     
 865  
     /**
 866  
      * Notify controller stopped
 867  
      */
 868  
     void notifyStopped() {
 869  136
         if (stoppedSelectorHandlerCounter.decrementAndGet() == 0) {
 870  
             // Notify internal listeners
 871  87
             synchronized(stoppedSelectorHandlerCounter) {
 872  87
                 stoppedSelectorHandlerCounter.notifyAll();
 873  87
             }            
 874  
         }
 875  136
     }
 876  
     
 877  
     
 878  
     /**
 879  
      * Notify exception occured
 880  
      */
 881  
     void notifyException(Throwable e) {
 882  0
         for(ControllerStateListener stateListener : stateListeners) {
 883  0
             stateListener.onException(e);
 884  
         }
 885  0
     }
 886  
     
 887  
     
 888  
     /**
 889  
      * Start the Controller. If the Pipeline and/or Handler has not been
 890  
      * defined, the default will be used.
 891  
      */
 892  
     public void start() throws IOException {
 893  117
         stateHolder.getStateLocker().writeLock().lock();
 894  117
         boolean isUnlocked = false;
 895  
         try {
 896  117
             if (stateHolder.getState(false) == null ||
 897  
                     stateHolder.getState(false) == State.STOPPED) {
 898  
                 // if selectorHandlers were not set by user explicitly,
 899  
                 // add TCPSelectorHandler by default
 900  69
                 if (selectorHandlers.isEmpty()) {
 901  2
                     SelectorHandler selectorHandler = new TCPSelectorHandler();
 902  2
                     selectorHandlers.add(selectorHandler);
 903  
                 }
 904  
 
 905  69
                 pipeline.initPipeline();
 906  69
                 pipeline.startPipeline();
 907  
 
 908  69
                 if (readThreadsCount > 0) {
 909  6
                     initReadThreads();
 910  6
                     multiReadThreadSelectorHandler =
 911  
                             new RoundRobinSelectorHandler(readThreadControllers);
 912  
                 }
 913  
 
 914  69
                 stateHolder.setState(State.STARTED, false);
 915  69
                 notifyStarted();
 916  
 
 917  69
                 int selectorHandlerCount = selectorHandlers.size();
 918  69
                 readySelectorHandlerCounter = new AtomicInteger(selectorHandlerCount);
 919  69
                 stoppedSelectorHandlerCounter = new AtomicInteger(selectorHandlerCount);
 920  
 
 921  
                 
 922  69
                 Iterator<SelectorHandler> it = selectorHandlers.iterator();
 923  69
                 if (selectorHandlerCount > 1) {
 924  12
                     for (; it.hasNext() && selectorHandlerCount-- > 0;) {
 925  11
                         SelectorHandler selectorHandler = it.next();
 926  11
                         startSelectorHandlerRunner(selectorHandler, true);
 927  11
                     }
 928  68
                 } else if (it.hasNext()) {
 929  68
                     SelectorHandler selectorHandler = it.next();
 930  68
                     stateHolder.getStateLocker().writeLock().unlock();
 931  68
                     isUnlocked = true;
 932  68
                     startSelectorHandlerRunner(selectorHandler, false);
 933  
                 }
 934  
             }
 935  
         } finally {
 936  117
             if (!isUnlocked) {
 937  49
                 stateHolder.getStateLocker().writeLock().unlock();
 938  
             }            
 939  
         }
 940  
         
 941  117
         waitUntilSeletorHandlersStop();
 942  
 
 943  117
         if (readThreadsCount > 0) {
 944  6
             multiReadThreadSelectorHandler.shutdown();
 945  6
             multiReadThreadSelectorHandler = null;
 946  
 
 947  21
             for (Controller readController : readThreadControllers) {
 948  
                 try {
 949  15
                     readController.stop();
 950  0
                 } catch (IOException e) {
 951  0
                     logger.log(Level.WARNING, "Exception occured when stopping read Controller!", e);
 952  15
                 }
 953  
             }
 954  
 
 955  6
             readThreadControllers = null;
 956  
         }
 957  
 
 958  117
         selectorHandlers.clear();
 959  117
         pipeline.stopPipeline();
 960  117
         attributes = null;
 961  
 
 962  
         // Notify Controller listeners
 963  117
         for (ControllerStateListener stateListener : stateListeners) {
 964  138
             stateListener.onStopped();
 965  
         }
 966  117
     }
 967  
     
 968  
     
 969  
     /**
 970  
      * Stop the Controller by canceling all the registered keys.
 971  
      */
 972  
     public void stop() throws IOException {
 973  70
         stop(false);
 974  70
     }
 975  
     
 976  
     
 977  
     /**
 978  
      * Stop the Controller by canceling all the registered keys.
 979  
      * @param isAsync, true if controller should be stopped asynchrounously and control
 980  
      *                 returned immediately. If false - control will be returned
 981  
      *                 after Controller will be completely stoped.
 982  
      */
 983  
     public void stop(boolean isAsync) throws IOException {
 984  70
         final CountDownLatch latch = new CountDownLatch(1);
 985  70
         stateHolder.getStateLocker().writeLock().lock();
 986  
         try {
 987  70
             if (stateHolder.getState(false) == State.STOPPED) {
 988  1
                 logger.log(Level.FINE, "Controller is already in stopped state");
 989  
                 return;
 990  
             }
 991  
             
 992  69
             if (!isAsync) {
 993  69
                 addStateListener(new ControllerStateListenerAdapter() {
 994  
                     @Override
 995  
                     public void onException(Throwable e) {
 996  0
                         removeStateListener(this);
 997  0
                         latch.countDown();
 998  0
                     }
 999  
 
 1000  
                     @Override
 1001  
                     public void onStopped() {
 1002  69
                         removeStateListener(this);
 1003  69
                         latch.countDown();
 1004  69
                     }
 1005  
                     
 1006  
                 });
 1007  
             }
 1008  69
             stateHolder.setState(State.STOPPED, false);
 1009  
         } finally {
 1010  70
             stateHolder.getStateLocker().writeLock().unlock();
 1011  69
         }
 1012  
         
 1013  69
         if (!isAsync) {
 1014  
             try {
 1015  69
                 latch.await();
 1016  69
             } catch(InterruptedException e) {}
 1017  
         }
 1018  69
     }
 1019  
     
 1020  
     
 1021  
     /**
 1022  
      * Pause this {@link Controller} and associated {@link SelectorHandler}s
 1023  
      */
 1024  
     public void pause() throws IOException {
 1025  0
         stateHolder.setState(State.PAUSED);
 1026  0
     }
 1027  
     
 1028  
     
 1029  
     /**
 1030  
      * Resume this {@link Controller} and associated {@link SelectorHandler}s
 1031  
      */
 1032  
     public void resume() throws IOException {
 1033  0
         if (!State.PAUSED.equals(stateHolder.getState(false))) {
 1034  0
             throw new IllegalStateException("Controller is not in PAUSED state, but: " +
 1035  
                     stateHolder.getState(false));
 1036  
         }
 1037  
         
 1038  0
         stateHolder.setState(State.STARTED);
 1039  0
     }
 1040  
     
 1041  
     
 1042  
     /**
 1043  
      * Gets this {@link Controller}'s {@link StateHolder}
 1044  
      * @return {@link StateHolder}
 1045  
      */
 1046  
     public StateHolder<State> getStateHolder() {
 1047  136
         return stateHolder;
 1048  
     }
 1049  
 
 1050  
     
 1051  
     /**
 1052  
      * Initialize the number of ReadThreadController.
 1053  
      */
 1054  
     private void initReadThreads() throws IOException {
 1055  
         // Attributes need to be shared among Controller and its ReadControllers
 1056  6
         if (attributes == null) {
 1057  6
             attributes = new HashMap<String, Object>(2);
 1058  
         }
 1059  
         
 1060  6
         readThreadControllers = new ReadController[readThreadsCount];
 1061  21
         for(int i=0; i<readThreadsCount; i++) {
 1062  15
             ReadController controller = new ReadController();
 1063  15
             copyTo(controller);
 1064  15
             controller.setReadThreadsCount(0);
 1065  15
             readThreadControllers[i] = controller;
 1066  
         }
 1067  
         
 1068  6
         for (SelectorHandler selectorHandler : selectorHandlers) {
 1069  6
             addSelectorHandlerOnReadControllers(selectorHandler);
 1070  
         }
 1071  
         
 1072  21
         for (int i=0; i < readThreadControllers.length; i++) {
 1073  
             // TODO Get a Thread from a Pool instead.
 1074  15
             new Thread(readThreadControllers[i], "GrizzlyReadController-" + i).start();
 1075  
         }
 1076  6
     }
 1077  
     
 1078  
     
 1079  
     /**
 1080  
      * Register {@link SelectorHandler} on all read controllers
 1081  
      * @param selectorHandler
 1082  
      */
 1083  
     private void addSelectorHandlerOnReadControllers(SelectorHandler selectorHandler) {
 1084  46
         if (readThreadControllers == null || readThreadsCount == 0) return;
 1085  
         
 1086  
         // Attributes need to be shared among SelectorHandler and its read-copies
 1087  6
         if (selectorHandler.getAttributes() == null) {
 1088  6
             selectorHandler.setAttributes(new HashMap<String, Object>(2));
 1089  
         }
 1090  
 
 1091  21
         for (Controller readController : readThreadControllers) {
 1092  15
             SelectorHandler copySelectorHandler = Cloner.clone(selectorHandler);
 1093  
             try {
 1094  15
                 copySelectorHandler.setSelector(Selector.open());
 1095  0
             } catch(IOException e) {
 1096  0
                 logger.log(Level.SEVERE, "Error opening selector!", e);
 1097  15
             }
 1098  
             
 1099  15
             readController.addSelectorHandler(copySelectorHandler);
 1100  
         }
 1101  6
     }
 1102  
     
 1103  
     
 1104  
     /**
 1105  
      * Starts <code>SelectorHandlerRunner</code>
 1106  
      * @param selectorHandler
 1107  
      * @param isRunAsync if true - <code>SelectorHandlerRunner</code> will be run
 1108  
      *          in separate <code>Thread</code>, if false - in current <code>Thread</code>
 1109  
      */
 1110  
     private void startSelectorHandlerRunner(SelectorHandler selectorHandler, 
 1111  
             boolean isRunAsync) {
 1112  
 
 1113  
         // check if there is java.nio.Selector already open, 
 1114  
         // if so, just notify the controller onReady() listeners 
 1115  119
         if (selectorHandler.getSelector() != null) {
 1116  0
             notifyReady();
 1117  
         }
 1118  119
         Runnable selectorRunner = new SelectorHandlerRunner(this, selectorHandler);
 1119  119
         if (isRunAsync) {
 1120  
             // if there are more than 1 selector handler - run it in separate thread
 1121  
             //@TODO Take Thread from ThreadPool?
 1122  51
             new Thread(selectorRunner, "GrizzlySelectorRunner-" + selectorHandler.protocol()).start();
 1123  
         } else {
 1124  
             // else run it in current thread
 1125  68
             selectorRunner.run();
 1126  
         }
 1127  119
     }
 1128  
     
 1129  
 
 1130  
     /**
 1131  
      * Register {@link SelectorHandler} on all read controllers
 1132  
      * @param selectorHandler
 1133  
      */
 1134  
     private void removeSelectorHandlerOnReadControllers(SelectorHandler selectorHandler) {
 1135  1
         if (readThreadControllers == null) return;
 1136  
         
 1137  0
         for (ReadController readController : readThreadControllers) {
 1138  0
             readController.removeSelectorHandlerClone(selectorHandler);
 1139  
         }
 1140  0
     }
 1141  
 
 1142  
     
 1143  
     /**
 1144  
      * Is this Controller started?
 1145  
      * @return <code>boolean</code> true / false
 1146  
      */
 1147  
     public boolean isStarted() {
 1148  39
         return stateHolder.getState() == State.STARTED;
 1149  
     }
 1150  
     
 1151  
     
 1152  
     // ----------- ConnectorHandlerPool interface implementation ----------- //  
 1153  
     
 1154  
     
 1155  
     /**
 1156  
      * Return an instance of a {@link ConnectorHandler} based on the
 1157  
      * Protocol requested. 
 1158  
      */
 1159  
     public ConnectorHandler acquireConnectorHandler(Protocol protocol){
 1160  176
         return connectorHandlerPool.acquireConnectorHandler(protocol);
 1161  
     }
 1162  
     
 1163  
     
 1164  
     /**
 1165  
      * Return a {@link ConnectorHandler} to the pool of ConnectorHandler.
 1166  
      * Any reference to the returned must not be re-used as that instance
 1167  
      * can always be acquired again, causing unexpected results.
 1168  
      */
 1169  
     public void releaseConnectorHandler(ConnectorHandler connectorHandler){
 1170  176
         connectorHandlerPool.releaseConnectorHandler(connectorHandler);
 1171  176
     }
 1172  
         
 1173  
 
 1174  
     /**
 1175  
      * <tt>true</tt> if OP_ERAD and OP_WRITE can be handled concurrently. 
 1176  
      * If <tt>false</tt>, the Controller will first invoke the OP_READ handler and
 1177  
      * then invoke the OP_WRITE during the next Selector.select() invocation.
 1178  
      */
 1179  
     public boolean isHandleReadWriteConcurrently() {
 1180  0
         return handleReadWriteConcurrently;
 1181  
     }
 1182  
 
 1183  
     
 1184  
     /**
 1185  
      * <tt>true</tt> if OP_ERAD and OP_WRITE can be handled concurrently. 
 1186  
      * If <tt>false</tt>, the Controller will first invoke the OP_READ handler and
 1187  
      * then invoke the OP_WRITE during the next Selector.select() invocation.
 1188  
      */
 1189  
     public void setHandleReadWriteConcurrently(boolean handleReadWriteConcurrently) {
 1190  7
         this.handleReadWriteConcurrently = handleReadWriteConcurrently;
 1191  7
     }
 1192  
     
 1193  
     /**
 1194  
      * Method waits until all initialized {@link SelectorHandler}s will
 1195  
      * not get stopped
 1196  
      */
 1197  
     protected void waitUntilSeletorHandlersStop() {
 1198  151
         synchronized(stoppedSelectorHandlerCounter) {
 1199  730
             while(stoppedSelectorHandlerCounter.get() > 0 || 
 1200  
                     !State.STOPPED.equals(stateHolder.getState())) {
 1201  
                 try {
 1202  579
                     stoppedSelectorHandlerCounter.wait(1000);
 1203  0
                 } catch (InterruptedException ex) {
 1204  579
                 }
 1205  
             }
 1206  151
         }
 1207  151
     }
 1208  
 
 1209  
     // ----------- AttributeHolder interface implementation ----------- //  
 1210  
 
 1211  
     /**
 1212  
      * Remove a key/value object.
 1213  
      * Method is not thread safe
 1214  
      * 
 1215  
      * @param key - name of an attribute
 1216  
      * @return  attribute which has been removed
 1217  
      */
 1218  
     public Object removeAttribute(String key) {
 1219  0
         if (attributes == null) return null;
 1220  
         
 1221  0
         return attributes.remove(key);
 1222  
     }
 1223  
 
 1224  
     /**
 1225  
      * Set a key/value object.
 1226  
      * Method is not thread safe
 1227  
      * 
 1228  
      * @param key - name of an attribute
 1229  
      * @param value - value of named attribute
 1230  
      */
 1231  
     public void setAttribute(String key, Object value) {
 1232  0
         if (attributes == null) {
 1233  0
             attributes = new HashMap<String, Object>();
 1234  
         }
 1235  
         
 1236  0
         attributes.put(key, value);
 1237  0
     }
 1238  
 
 1239  
     /**
 1240  
      * Return an object based on a key.
 1241  
      * Method is not thread safe
 1242  
      * 
 1243  
      * @param key - name of an attribute
 1244  
      * @return - attribute value for the <tt>key</tt>, null if <tt>key</tt>
 1245  
      *           does not exist in <tt>attributes</tt>
 1246  
      */
 1247  
     public Object getAttribute(String key) {
 1248  0
         if (attributes == null) return null;
 1249  
 
 1250  0
         return attributes.get(key);
 1251  
     }
 1252  
     
 1253  
     /**
 1254  
      * Set a {@link Map} of attribute name/value pairs.
 1255  
      * Old {@link AttributeHolder} values will not be available.
 1256  
      * Later changes of this {@link Map} will lead to changes to the current
 1257  
      * {@link AttributeHolder}.
 1258  
      * 
 1259  
      * @param attributes - map of name/value pairs
 1260  
      */
 1261  
     public void setAttributes(Map<String, Object> attributes) {
 1262  0
         this.attributes = attributes;
 1263  0
     }
 1264  
 
 1265  
 
 1266  
     /**
 1267  
      * Return a {@link Map} of attribute name/value pairs.
 1268  
      * Updates, performed on the returned {@link Map} will be reflected in
 1269  
      * this {@link AttributeHolder}
 1270  
      * 
 1271  
      * @return - {@link Map} of attribute name/value pairs
 1272  
      */
 1273  
     public Map<String, Object> getAttributes() {
 1274  0
         return attributes;
 1275  
     }
 1276  
     
 1277  
     
 1278  
     /**
 1279  
      * Return the Controller which is handling the {@link Handler}
 1280  
      * @param handler The handler (like {@link SelectorHandler})
 1281  
      * @return The Controller associated with the Handler, or null if not 
 1282  
      * associated.
 1283  
      */
 1284  
     public static Controller getHandlerController(Handler handler){
 1285  167
         if (handler instanceof SelectorHandler){
 1286  167
             for (Controller controller: controllers){
 1287  7296
                 if (controller.getSelectorHandlers().contains(handler)){
 1288  167
                     return controller;
 1289  
                 }
 1290  
             }
 1291  
         } 
 1292  0
         return null;
 1293  
     }
 1294  
 }