Coverage Report - com.sun.grizzly.TCPSelectorHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
TCPSelectorHandler
77 %
284/370
69 %
79/114
0
TCPSelectorHandler$1
100 %
2/2
N/A
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 
 39  
 package com.sun.grizzly;
 40  
 
 41  
 import com.sun.grizzly.async.AsyncQueueReader;
 42  
 import com.sun.grizzly.async.AsyncQueueReaderContextTask;
 43  
 import com.sun.grizzly.async.AsyncQueueWriter;
 44  
 import com.sun.grizzly.async.TCPAsyncQueueWriter;
 45  
 import com.sun.grizzly.async.AsyncQueueWriterContextTask;
 46  
 import com.sun.grizzly.async.TCPAsyncQueueReader;
 47  
 import com.sun.grizzly.util.CallbackHandlerSelectionKeyAttachment;
 48  
 import com.sun.grizzly.util.Cloner;
 49  
 import com.sun.grizzly.util.Copyable;
 50  
 import com.sun.grizzly.util.SelectionKeyAttachment;
 51  
 import com.sun.grizzly.util.SelectionKeyOP;
 52  
 import com.sun.grizzly.util.SelectionKeyOP.ConnectSelectionKeyOP;
 53  
 import com.sun.grizzly.util.State;
 54  
 import com.sun.grizzly.util.StateHolder;
 55  
 import java.io.IOException;
 56  
 import java.net.BindException;
 57  
 import java.net.InetAddress;
 58  
 import java.net.ServerSocket;
 59  
 import java.net.Socket;
 60  
 import java.net.InetSocketAddress;
 61  
 import java.net.SocketAddress;
 62  
 import java.net.SocketException;
 63  
 import java.nio.channels.CancelledKeyException;
 64  
 import java.nio.channels.ClosedChannelException;
 65  
 import java.nio.channels.ClosedSelectorException;
 66  
 import java.nio.channels.SelectableChannel;
 67  
 import java.nio.channels.SelectionKey;
 68  
 import java.nio.channels.Selector;
 69  
 import java.nio.channels.ServerSocketChannel;
 70  
 import java.nio.channels.SocketChannel;
 71  
 import java.util.ConcurrentModificationException;
 72  
 import java.util.HashMap;
 73  
 import java.util.Iterator;
 74  
 import java.util.Map;
 75  
 import java.util.Set;
 76  
 import java.util.concurrent.Callable;
 77  
 import java.util.concurrent.ConcurrentLinkedQueue;
 78  
 import java.util.concurrent.atomic.AtomicBoolean;
 79  
 import java.util.logging.Level;
 80  
 import java.util.logging.Logger;
 81  
 
 82  
 /**
 83  
  * A SelectorHandler handles all java.nio.channels.Selector operations.
 84  
  * One or more instance of a Selector are handled by SelectorHandler.
 85  
  * The logic for processing of SelectionKey interest (OP_ACCEPT,OP_READ, etc.)
 86  
  * is usually defined using an instance of SelectorHandler.
 87  
  *
 88  
  * This class represents a TCP implementation of a SelectorHandler.
 89  
  * This class first bind a ServerSocketChannel to a TCP port and then start
 90  
  * waiting for NIO events.
 91  
  *
 92  
  * @author Jeanfrancois Arcand
 93  
  */
 94  
 public class TCPSelectorHandler implements SelectorHandler {
 95  
     
 96  
     
 97  
     /**
 98  
      * The ConnectorInstanceHandler used to return a new or pooled
 99  
      * ConnectorHandler
 100  
      */
 101  
     protected ConnectorInstanceHandler connectorInstanceHandler;
 102  
     
 103  
     
 104  
     /**
 105  
      * The list of {@link SelectionKeyOP} to register next time the 
 106  
      * Selector.select is invoked.
 107  
      */
 108  
     protected ConcurrentLinkedQueue<SelectionKeyOP> opToRegister;
 109  
     
 110  
     /**
 111  
      * The socket tcpDelay.
 112  
      * 
 113  
      * Default value for tcpNoDelay is disabled (set to true).
 114  
      */
 115  141
     protected boolean tcpNoDelay = true;
 116  
     
 117  
     
 118  
     /**
 119  
      * The socket reuseAddress
 120  
      */
 121  141
     protected boolean reuseAddress = true;
 122  
     
 123  
     
 124  
     /**
 125  
      * The socket linger.
 126  
      */
 127  141
     protected int linger = -1;
 128  
     
 129  
     
 130  
     /**
 131  
      * The socket time out
 132  
      */
 133  141
     protected int socketTimeout = -1;
 134  
     
 135  
     
 136  
     protected Logger logger;
 137  
     
 138  
     
 139  
     /**
 140  
      * The server socket time out
 141  
      */
 142  141
     protected int serverTimeout = 0;
 143  
     
 144  
     
 145  
     /**
 146  
      * The inet address to use when binding.
 147  
      */
 148  
     protected InetAddress inet;
 149  
     
 150  
     
 151  
     /**
 152  
      * The default TCP port.
 153  
      */
 154  141
     protected int port = 18888;
 155  
     
 156  
     
 157  
     /**
 158  
      * The ServerSocket instance.
 159  
      */
 160  
     protected ServerSocket serverSocket;
 161  
     
 162  
     
 163  
     /**
 164  
      * The ServerSocketChannel.
 165  
      */
 166  
     protected ServerSocketChannel serverSocketChannel;
 167  
     
 168  
     
 169  
     /**
 170  
      * The single Selector.
 171  
      */
 172  
     protected Selector selector;
 173  
     
 174  
     
 175  
     /**
 176  
      * The Selector time out.
 177  
      */
 178  141
     protected long selectTimeout = 1000L;
 179  
     
 180  
     
 181  
     /**
 182  
      * Server socket backlog.
 183  
      */
 184  141
     protected int ssBackLog = 4096;
 185  
     
 186  
     
 187  
     /**
 188  
      * Is this used for client only or client/server operation.
 189  
      */
 190  141
     protected boolean isClient = false;
 191  
     
 192  
     
 193  
     /**
 194  
      * The SelectionKeyHandler associated with this SelectorHandler.
 195  
      */
 196  
     protected SelectionKeyHandler selectionKeyHandler;
 197  
     
 198  
     
 199  
     /**
 200  
      * The ProtocolChainInstanceHandler used by this instance. If not set, and instance
 201  
      * of the DefaultInstanceHandler will be created.
 202  
      */
 203  
     protected ProtocolChainInstanceHandler instanceHandler;
 204  
     
 205  
 
 206  
     /**
 207  
      * The {@link Pipeline} used by this instance. If null - 
 208  
      * {@link Controller}'s {@link Pipeline} will be used
 209  
      */
 210  
     protected Pipeline pipeline;
 211  
 
 212  
     
 213  
     /**
 214  
      * {@link AsyncQueueWriter}
 215  
      */
 216  
     protected AsyncQueueWriter asyncQueueWriter;
 217  
     
 218  
     
 219  
     /**
 220  
      * {@link AsyncQueueWriter}
 221  
      */
 222  
     protected AsyncQueueReader asyncQueueReader;
 223  
 
 224  
     
 225  
     /**
 226  
      * Attributes, associated with the {@link SelectorHandler} instance
 227  
      */
 228  
     protected Map<String, Object> attributes;
 229  
     
 230  
     /**
 231  
      * This {@link SelectorHandler} StateHolder, which is shared among
 232  
      * SelectorHandler and its clones
 233  
      */
 234  141
     protected StateHolder<State> stateHolder = new StateHolder<State>(true);
 235  
     
 236  
     /**
 237  
      * Flag, which shows whether shutdown was called for this {@link SelectorHandler}
 238  
      */
 239  141
     protected AtomicBoolean isShutDown = new AtomicBoolean(false);
 240  
 
 241  
     public TCPSelectorHandler(){
 242  62
         this(false);
 243  62
     }
 244  
     
 245  
     
 246  
     /**
 247  
      * Create a TCPSelectorHandler only used with ConnectorHandler.
 248  
      * 
 249  
      * @param isClient true if this SelectorHandler is only used 
 250  
      * to handle ConnectorHandler.
 251  
      */
 252  141
     public TCPSelectorHandler(boolean isClient) {
 253  141
         this.isClient = isClient;
 254  141
         logger = Controller.logger();
 255  141
     }
 256  
     
 257  
     
 258  
     public void copyTo(Copyable copy) {
 259  15
         TCPSelectorHandler copyHandler = (TCPSelectorHandler) copy;
 260  15
         copyHandler.selector = selector;
 261  15
         if (selectionKeyHandler != null) {
 262  4
             copyHandler.setSelectionKeyHandler(Cloner.clone(selectionKeyHandler));
 263  
         }
 264  
         
 265  15
         copyHandler.attributes = attributes;
 266  15
         copyHandler.selectTimeout = selectTimeout;
 267  15
         copyHandler.serverTimeout = serverTimeout;
 268  15
         copyHandler.inet = inet;
 269  15
         copyHandler.port = port;
 270  15
         copyHandler.ssBackLog = ssBackLog;
 271  15
         copyHandler.tcpNoDelay = tcpNoDelay;
 272  15
         copyHandler.linger = linger;
 273  15
         copyHandler.socketTimeout = socketTimeout;
 274  15
         copyHandler.logger = logger;
 275  15
         copyHandler.reuseAddress = reuseAddress;
 276  15
         copyHandler.connectorInstanceHandler = connectorInstanceHandler;
 277  15
         copyHandler.stateHolder = stateHolder;
 278  15
     }
 279  
     
 280  
     
 281  
     /**
 282  
      * Return the set of SelectionKey registered on this Selector.
 283  
      */
 284  
     public Set<SelectionKey> keys(){
 285  249800
         if (selector != null){
 286  249800
             return selector.keys();
 287  
         } else {
 288  0
             throw new IllegalStateException("Selector is not created!");
 289  
         }
 290  
     }
 291  
     
 292  
     
 293  
     /**
 294  
      * Is the Selector open.
 295  
      */
 296  
     public boolean isOpen(){
 297  249207
         if (selector != null){
 298  249207
             return selector.isOpen();
 299  
         } else {
 300  0
             return false;
 301  
         }
 302  
     }
 303  
     
 304  
     
 305  
     /**
 306  
      * Before invoking Selector.select(), make sure the ServerScoketChannel
 307  
      * has been created. If true, then register all SelectionKey to the Selector.
 308  
      * @param ctx {@link Context}
 309  
      */
 310  
     public void preSelect(Context ctx) throws IOException {
 311  197404
         initOpRegistriesIfRequired();
 312  
         
 313  197404
         if (asyncQueueReader == null) {
 314  130
             asyncQueueReader = new TCPAsyncQueueReader(this);
 315  
         }
 316  
 
 317  197404
         if (asyncQueueWriter == null) {
 318  130
             asyncQueueWriter = new TCPAsyncQueueWriter(this);
 319  
         }
 320  
                 
 321  197404
         if (selector == null){
 322  
             try {
 323  113
                 isShutDown.set(false);
 324  
                 
 325  113
                 connectorInstanceHandler = new ConnectorInstanceHandler.
 326  
                         ConcurrentQueueDelegateCIH(
 327  
                         getConnectorInstanceHandlerDelegate());
 328  
                 
 329  
                 // Create the socket listener
 330  113
                 selector = Selector.open();
 331  
                 
 332  113
                 if (!isClient){
 333  34
                     serverSocketChannel = ServerSocketChannel.open();
 334  34
                     serverSocket = serverSocketChannel.socket();
 335  34
                     serverSocket.setReuseAddress(reuseAddress);
 336  34
                     if ( inet == null)
 337  34
                         serverSocket.bind(new InetSocketAddress(port),ssBackLog);
 338  
                     else
 339  0
                         serverSocket.bind(new InetSocketAddress(inet,port),ssBackLog);
 340  
                     
 341  34
                     serverSocketChannel.configureBlocking(false);
 342  34
                     serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 343  
                 }
 344  113
                 ctx.getController().notifyReady();
 345  0
             } catch (SocketException ex){
 346  0
                 throw new BindException(ex.getMessage() + ": " + port + "=" + this);
 347  113
             }
 348  
             
 349  113
             if (!isClient){
 350  34
                 serverSocket.setSoTimeout(serverTimeout);
 351  
             }
 352  
         } else {
 353  197291
             processPendingOperations(ctx);
 354  
         }
 355  197404
     }
 356  
     
 357  
     
 358  
     protected void processPendingOperations(Context ctx) throws IOException {
 359  249681
         if (!opToRegister.isEmpty()) {
 360  
             SelectionKeyOP operation;
 361  133911
             Iterator<SelectionKeyOP> opIterator = opToRegister.iterator();
 362  560422
             while(opIterator.hasNext()) {
 363  426511
                 operation = opIterator.next();
 364  426511
                 opIterator.remove();
 365  
                 
 366  426511
                 if ((operation.getOp() & SelectionKey.OP_CONNECT) != 0) {
 367  197
                     onConnectOp(ctx, 
 368  
                             (SelectionKeyOP.ConnectSelectionKeyOP) operation);
 369  
                 } else {
 370  426314
                     if ((operation.getOp() & SelectionKey.OP_READ) != 0) {
 371  153520
                         onReadOp(operation);
 372  
                     }
 373  
 
 374  426314
                     if ((operation.getOp() & SelectionKey.OP_WRITE) != 0) {
 375  272794
                         onWriteOp(operation);
 376  
                     }
 377  
                 }
 378  
 
 379  426511
                 SelectionKeyOP.releaseSelectionKeyOP(operation);
 380  
             }
 381  
         }
 382  249681
     }
 383  
 
 384  
     /**
 385  
      * Handle new OP_READ ops.
 386  
      */
 387  
     protected void onReadOp(SelectionKeyOP selectionKeyOp) 
 388  
             throws ClosedChannelException {
 389  153520
         SelectionKey key = selectionKeyOp.getSelectionKey();
 390  153520
         if (key != null) {
 391  153488
             selectionKeyHandler.register(key, SelectionKey.OP_READ);
 392  
         } else {
 393  32
             selectionKeyHandler.register(selectionKeyOp.getChannel(), 
 394  
                     SelectionKey.OP_READ);
 395  
         }
 396  153520
     }
 397  
     
 398  
     
 399  
     /**
 400  
      * Handle new OP_WRITE ops.
 401  
      */
 402  
     protected void onWriteOp(SelectionKeyOP selectionKeyOp) 
 403  
             throws ClosedChannelException {
 404  272794
         SelectionKey key = selectionKeyOp.getSelectionKey();
 405  272794
         if (key != null) {
 406  272794
             selectionKeyHandler.register(key, SelectionKey.OP_WRITE);
 407  
         } else {
 408  0
             selectionKeyHandler.register(selectionKeyOp.getChannel(), 
 409  
                     SelectionKey.OP_WRITE);
 410  
         }
 411  272794
     }
 412  
     
 413  
     
 414  
     /**
 415  
      * Handle new OP_CONNECT ops.
 416  
      */
 417  
     protected void onConnectOp(Context ctx, 
 418  
             SelectionKeyOP.ConnectSelectionKeyOP selectionKeyOp) throws IOException {
 419  166
         SocketAddress remoteAddress = selectionKeyOp.getRemoteAddress();
 420  166
         SocketAddress localAddress = selectionKeyOp.getLocalAddress();
 421  166
         CallbackHandler callbackHandler = selectionKeyOp.getCallbackHandler();
 422  
 
 423  166
         SocketChannel socketChannel = SocketChannel.open();
 424  166
         socketChannel.socket().setReuseAddress(reuseAddress);
 425  166
         if (localAddress != null) {
 426  0
             socketChannel.socket().bind(localAddress);
 427  
         }
 428  
 
 429  166
         socketChannel.configureBlocking(false);
 430  166
         SelectionKey key = socketChannel.register(selector, 
 431  
                 SelectionKey.OP_CONNECT);
 432  166
         key.attach(CallbackHandlerSelectionKeyAttachment.create(key, callbackHandler));
 433  
         
 434  
         boolean isConnected;
 435  
         try {
 436  166
             isConnected = socketChannel.connect(remoteAddress);
 437  0
         } catch(Exception e) {
 438  0
             if (logger.isLoggable(Level.FINE)) {
 439  0
                 logger.log(Level.FINE, "Exception occured when tried to connect socket", e);
 440  
             }
 441  
             
 442  
             // set isConnected to true to let callback handler to know about the problem happened
 443  0
             isConnected = true;
 444  166
         }
 445  
         
 446  
         // if channel was connected immediately or exception occured
 447  166
         if (isConnected) {
 448  0
             onConnectInterest(key, ctx);
 449  
         }
 450  166
     }
 451  
     
 452  
     
 453  
     /**
 454  
      * Execute the Selector.select(...) operations.
 455  
      * @param ctx {@link Context}
 456  
      * @return{@link Set} of {@link Context}
 457  
      */
 458  
     public Set<SelectionKey> select(Context ctx) throws IOException{        
 459  249800
         selector.select(selectTimeout);
 460  249800
         return selector.selectedKeys();
 461  
     }
 462  
     
 463  
     
 464  
     /**
 465  
      * Invoked after Selector.select().
 466  
      * @param ctx {@link Context}
 467  
      */
 468  
     public void postSelect(Context ctx) {
 469  249800
         Set<SelectionKey> readyKeys = keys();
 470  249800
         if (readyKeys.isEmpty()){
 471  593
             return;
 472  
         }
 473  
 
 474  249207
         if (isOpen()) {
 475  249207
             selectionKeyHandler.expire(readyKeys.iterator());
 476  
         }            
 477  249207
     }
 478  
     
 479  
     
 480  
     /**
 481  
      * Register a SelectionKey to this Selector.
 482  
      */
 483  
     public void register(SelectionKey key, int ops) {
 484  426260
         SelectionKeyOP keyOP = SelectionKeyOP.aquireSelectionKeyOP(ops);
 485  426292
         keyOP.setSelectionKey(key);
 486  426291
         keyOP.setOp(ops);
 487  426294
         initOpRegistriesIfRequired();
 488  425883
         opToRegister.offer(keyOP);
 489  426292
         selector.wakeup();
 490  426292
     }
 491  
     
 492  
     public void register(SelectableChannel channel, int ops) {
 493  32
         SelectionKeyOP keyOP = SelectionKeyOP.aquireSelectionKeyOP(ops);
 494  32
         keyOP.setChannel(channel);
 495  32
         keyOP.setOp(ops);
 496  32
         initOpRegistriesIfRequired();
 497  32
         opToRegister.offer(keyOP);
 498  32
         selector.wakeup();
 499  32
     }
 500  
     
 501  
     /**
 502  
      * Register a CallBackHandler to this Selector.
 503  
      *
 504  
      * @param remoteAddress remote address to connect
 505  
      * @param localAddress local address to bin
 506  
      * @param callBackHandler {@link CallbackHandler}
 507  
      * @throws java.io.IOException
 508  
      */
 509  
     protected void connect(SocketAddress remoteAddress, SocketAddress localAddress,
 510  
             CallbackHandler callBackHandler) throws IOException {
 511  
         
 512  197
         SelectionKeyOP.ConnectSelectionKeyOP keyOP = 
 513  
                 (ConnectSelectionKeyOP) SelectionKeyOP.aquireSelectionKeyOP(SelectionKey.OP_CONNECT);
 514  
         
 515  197
         keyOP.setOp(SelectionKey.OP_CONNECT);
 516  197
         keyOP.setRemoteAddress(remoteAddress);
 517  197
         keyOP.setLocalAddress(localAddress);
 518  197
         keyOP.setCallbackHandler(callBackHandler);
 519  197
         initOpRegistriesIfRequired();
 520  197
         opToRegister.offer(keyOP);
 521  197
         selector.wakeup();
 522  197
     }
 523  
         
 524  
     /**
 525  
      * {@inheritDoc}
 526  
      */
 527  
     public void pause() {
 528  1
         stateHolder.setState(State.PAUSED);
 529  1
     }
 530  
 
 531  
     /**
 532  
      * {@inheritDoc}
 533  
      */
 534  
     public void resume() {
 535  1
         if (!State.PAUSED.equals(stateHolder.getState(false))) {
 536  0
             throw new IllegalStateException("SelectorHandler is not in PAUSED state, but: " +
 537  
                     stateHolder.getState(false));
 538  
         }
 539  
 
 540  1
         stateHolder.setState(State.STARTED);
 541  1
     }
 542  
 
 543  
     /**
 544  
      * {@inheritDoc}
 545  
      */
 546  
     public StateHolder<State> getStateHolder() {
 547  250005
         return stateHolder;
 548  
     }
 549  
 
 550  
     /**
 551  
      * Shuntdown this instance by closing its Selector and associated channels.
 552  
      */
 553  
     public void shutdown() {
 554  
         // If shutdown was called for this SelectorHandler
 555  137
         if (isShutDown.getAndSet(true)) return;
 556  
         
 557  136
         stateHolder.setState(State.STOPPED);
 558  
         
 559  136
         if (selector != null) {
 560  
             try {
 561  130
                 for (SelectionKey selectionKey : selector.keys()) {
 562  76
                     selectionKeyHandler.close(selectionKey);
 563  
                 } 
 564  0
             } catch (ClosedSelectorException e) {
 565  
                 // If Selector is already closed - OK
 566  1
             } catch (ConcurrentModificationException e) {
 567  
                 // Someone else works with keys. Create copy
 568  1
                 Object[] keys = selector.keys().toArray();
 569  2
                 for (Object selectionKey : keys) {
 570  1
                     selectionKeyHandler.close((SelectionKey) selectionKey);
 571  
                 } 
 572  
                 
 573  129
             }
 574  
         }
 575  
         
 576  
         try{
 577  136
             if (serverSocket != null)
 578  34
                 serverSocket.close();
 579  0
         } catch (Throwable ex){
 580  0
             Controller.logger().log(Level.SEVERE,
 581  
                     "serverSocket.close",ex);
 582  136
         }
 583  
         
 584  
         try{
 585  136
             if (serverSocketChannel != null)
 586  34
                 serverSocketChannel.close();
 587  0
         } catch (Throwable ex){
 588  0
             Controller.logger().log(Level.SEVERE,
 589  
                     "serverSocketChannel.close",ex);
 590  136
         }
 591  
         
 592  
         try{
 593  136
             if (selector != null)
 594  130
                 selector.close();
 595  0
         } catch (Throwable ex){
 596  0
             Controller.logger().log(Level.SEVERE,
 597  
                     "selector.close",ex);
 598  136
         }
 599  
         
 600  136
         if (asyncQueueReader != null) {
 601  130
             asyncQueueReader.close();
 602  130
             asyncQueueReader = null;
 603  
         }
 604  
 
 605  136
         if (asyncQueueWriter != null) {
 606  130
             asyncQueueWriter.close();
 607  130
             asyncQueueWriter = null;
 608  
         }
 609  
         
 610  136
         attributes = null;
 611  136
     }
 612  
     
 613  
     /**
 614  
      * {@inheritDoc}
 615  
      */
 616  
     public SelectableChannel acceptWithoutRegistration(SelectionKey key)
 617  
     throws IOException {
 618  191
         ServerSocketChannel server = (ServerSocketChannel) key.channel();
 619  191
         SocketChannel channel = server.accept();
 620  191
         return channel;
 621  
     }
 622  
     
 623  
     /**
 624  
      * Handle OP_ACCEPT.
 625  
      * @param ctx {@link Context}
 626  
      * @return always returns false
 627  
      */
 628  
     public boolean onAcceptInterest(SelectionKey key,
 629  
             Context ctx) throws IOException{
 630  161
         SelectableChannel channel = acceptWithoutRegistration(key);
 631  
         
 632  161
         if (channel != null) {
 633  161
             configureChannel(channel);
 634  161
             SelectionKey readKey =
 635  
                     channel.register(selector, SelectionKey.OP_READ);
 636  161
             readKey.attach(System.currentTimeMillis());
 637  
         }
 638  161
         return false;
 639  
     }
 640  
     
 641  
     /**
 642  
      * Handle OP_READ.
 643  
      * @param ctx {@link Context}
 644  
      * @param key {@link SelectionKey}
 645  
      * @return false if handled by a {@link CallbackHandler}, otherwise true
 646  
      */
 647  
     public boolean onReadInterest(final SelectionKey key,final Context ctx)
 648  
     throws IOException{
 649  
         // disable OP_READ on key before doing anything else
 650  153611
         key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
 651  153611
         Object attach = SelectionKeyAttachment.getAttachment(key);
 652  
         
 653  153611
         if (asyncQueueReader.isAsyncQueueReaderEnabledFor(key)) {
 654  9737
             final Context context = pollContext(ctx, key, Context.OpType.OP_READ);
 655  9737
             invokeAsyncQueueReader(context);
 656  9737
             return false;
 657  143874
         } else if (attach instanceof CallbackHandler){
 658  61260
             final Context context = pollContext(ctx, key, Context.OpType.OP_READ);
 659  61260
             invokeCallbackHandler((CallbackHandler) attach, context);
 660  61260
             return false;
 661  
         } else {
 662  82614
             return true;
 663  
         }
 664  
     }
 665  
     
 666  
     
 667  
     /**
 668  
      * Handle OP_WRITE.
 669  
      *
 670  
      * @param key {@link SelectionKey}
 671  
      * @param ctx {@link Context}
 672  
      */
 673  
     public boolean onWriteInterest(final SelectionKey key,final Context ctx)
 674  
     throws IOException{
 675  
         // disable OP_WRITE on key before doing anything else
 676  2401
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 677  
         
 678  
         Object attach;
 679  2401
         if (asyncQueueWriter.hasReadyAsyncWriteData(key)) {
 680  2310
             final Context context = pollContext(ctx, key, Context.OpType.OP_WRITE);
 681  2310
              invokeAsyncQueueWriter(context);
 682  2310
             return false;
 683  91
         } else if ((attach = SelectionKeyAttachment.getAttachment(key)) 
 684  
                 instanceof CallbackHandler){
 685  63
             final Context context = pollContext(ctx, key, Context.OpType.OP_WRITE);
 686  63
             invokeCallbackHandler((CallbackHandler) attach, context);
 687  63
             return false;
 688  
         } else {
 689  28
             return true;
 690  
         }
 691  
     }
 692  
     
 693  
     
 694  
     /**
 695  
      * Handle OP_CONNECT.
 696  
      * @param key {@link SelectionKey}
 697  
      * @param ctx {@link Context}
 698  
      */
 699  
     public boolean onConnectInterest(final SelectionKey key, Context ctx)
 700  
     throws IOException{
 701  
         try {
 702  
             // disable OP_CONNECT on key before doing anything else
 703  197
             key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
 704  
             
 705  
             // No OP_READ nor OP_WRITE allowed yet.
 706  197
             key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 707  197
             key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
 708  0
         } catch(CancelledKeyException e) {
 709  
             // Even if key was cancelled - we need to notify CallBackHandler
 710  0
             if (logger.isLoggable(Level.FINE)) {
 711  0
                 logger.log(Level.FINE, "CancelledKeyException occured when tried to change key interests", e);
 712  
             }
 713  197
         }
 714  
         
 715  197
         Object attach = SelectionKeyAttachment.getAttachment(key);
 716  197
         if (attach instanceof CallbackHandler){
 717  197
             final Context context = pollContext(ctx, key, Context.OpType.OP_CONNECT);
 718  197
             invokeCallbackHandler((CallbackHandler) attach, context);
 719  
         }
 720  197
         return false;
 721  
     }
 722  
     
 723  
     
 724  
     /**
 725  
      * Invoke a CallbackHandler via a Context instance.
 726  
      * @param context {@link Context}
 727  
      * @throws java.io.IOException
 728  
      */
 729  
     protected void invokeCallbackHandler(CallbackHandler callbackHandler, 
 730  
             Context context) throws IOException {
 731  61520
         IOEvent<Context>ioEvent = new IOEvent.DefaultIOEvent<Context>(context);
 732  61520
         context.setIOEvent(ioEvent);
 733  
         
 734  
         // Added because of incompatibility with Grizzly 1.6.0
 735  61520
         context.setSelectorHandler(this);
 736  
         
 737  
         try {
 738  61520
             CallbackHandlerContextTask task = CallbackHandlerContextTask.poll();
 739  61520
             task.setCallBackHandler(callbackHandler);
 740  61520
             boolean isRunInSeparateThread = true;
 741  
             
 742  61520
             if (callbackHandler instanceof CallbackHandlerDescriptor) {
 743  0
                 isRunInSeparateThread = 
 744  
                         ((CallbackHandlerDescriptor) callbackHandler).
 745  
                         isRunInSeparateThread(context.getCurrentOpType());
 746  
             }
 747  61520
             context.execute(task, isRunInSeparateThread);
 748  0
         } catch (PipelineFullException ex) {
 749  0
             throw new IOException(ex.getMessage());
 750  61520
         }
 751  61520
     }
 752  
 
 753  
     
 754  
     /**
 755  
      * Invoke a {@link AsyncQueueReader}
 756  
      * @param context {@link Context}
 757  
      * @throws java.io.IOException
 758  
      */
 759  
     protected void invokeAsyncQueueReader(Context context) throws IOException {
 760  9737
         AsyncQueueReaderContextTask task = AsyncQueueReaderContextTask.poll();
 761  9737
         task.setAsyncQueueReader(asyncQueueReader);
 762  
         try {
 763  9737
             context.execute(task);
 764  0
         } catch (PipelineFullException ex) {
 765  0
             throw new IOException(ex.getMessage());
 766  9737
         }
 767  9737
     }
 768  
 
 769  
     
 770  
     /**
 771  
      * Invoke a {@link AsyncQueueWriter}
 772  
      * @param context {@link Context}
 773  
      * @throws java.io.IOException
 774  
      */
 775  
     protected void invokeAsyncQueueWriter(Context context) throws IOException {
 776  2310
         AsyncQueueWriterContextTask task = AsyncQueueWriterContextTask.poll();
 777  2310
         task.setAsyncQueueWriter(asyncQueueWriter);
 778  
         try {
 779  2310
             context.execute(task);
 780  0
         } catch (PipelineFullException ex) {
 781  0
             throw new IOException(ex.getMessage());
 782  2310
         }
 783  2310
     }
 784  
 
 785  
 
 786  
     /**
 787  
      * Return an instance of the default {@link ConnectorHandler},
 788  
      * which is the {@link TCPConnectorHandler}
 789  
      * @return {@link ConnectorHandler}
 790  
      */
 791  
     public ConnectorHandler acquireConnectorHandler(){
 792  167
         if (selector == null || !selector.isOpen()){
 793  0
             throw new IllegalStateException("SelectorHandler not yet started");
 794  
         }
 795  
         
 796  167
         ConnectorHandler connectorHandler = connectorInstanceHandler.acquire();
 797  167
         connectorHandler.setController(Controller.getHandlerController(this));
 798  167
         return connectorHandler;
 799  
     }
 800  
     
 801  
     
 802  
     /**
 803  
      * Release a ConnectorHandler.
 804  
      */
 805  
     public void releaseConnectorHandler(ConnectorHandler connectorHandler){
 806  166
         connectorInstanceHandler.release(connectorHandler);
 807  166
     }
 808  
     
 809  
     
 810  
     /**
 811  
      * A token decribing the protocol supported by an implementation of this
 812  
      * interface
 813  
      */
 814  
     public Controller.Protocol protocol(){
 815  132643
         return Controller.Protocol.TCP;
 816  
     }
 817  
     // ------------------------------------------------------ Utils ----------//
 818  
     
 819  
     
 820  
     /**
 821  
      * Initializes {@link SelectionKey} operation registries
 822  
      */
 823  
     protected void initOpRegistriesIfRequired() {
 824  676173
         if (opToRegister == null){
 825  135
             opToRegister = new ConcurrentLinkedQueue<SelectionKeyOP>();
 826  
         }
 827  676313
     }
 828  
 
 829  
     /**
 830  
      * {@inheritDoc}
 831  
      */
 832  
     public void configureChannel(SelectableChannel channel) throws IOException{
 833  191
         Socket socket = ((SocketChannel) channel).socket();
 834  
         
 835  191
         channel.configureBlocking(false);
 836  
         
 837  191
         if (!channel.isOpen()){
 838  0
             return;
 839  
         }
 840  
         
 841  
         try{
 842  191
             if(linger >= 0 ) {
 843  0
                 socket.setSoLinger( true, linger);
 844  
             }
 845  0
         } catch (SocketException ex){
 846  0
             if (logger.isLoggable(Level.FINE)){
 847  0
                 logger.log(Level.FINE,
 848  
                         "setSoLinger exception ",ex);
 849  
             }
 850  191
         }
 851  
         
 852  
         try{
 853  191
             socket.setTcpNoDelay(tcpNoDelay);
 854  0
         } catch (SocketException ex){
 855  0
             if (logger.isLoggable(Level.FINE)){
 856  0
                 logger.log(Level.FINE,
 857  
                         "setTcpNoDelay exception ",ex);
 858  
             }
 859  191
         }
 860  
         
 861  
         try{
 862  191
             socket.setReuseAddress(reuseAddress);
 863  0
         } catch (SocketException ex){
 864  0
             if (logger.isLoggable(Level.FINE)){
 865  0
                 logger.log(Level.FINE,
 866  
                         "setReuseAddress exception ",ex);
 867  
             }
 868  191
         }
 869  191
     }
 870  
     
 871  
     
 872  
     // ------------------------------------------------------ Properties -----//
 873  
     
 874  
     public final Selector getSelector() {
 875  777246
         return selector;
 876  
     }
 877  
     
 878  
     public final void setSelector(Selector selector) {
 879  18
         this.selector = selector;
 880  18
     }
 881  
     
 882  
     /**
 883  
      * {@inheritDoc}
 884  
      */
 885  
     public AsyncQueueReader getAsyncQueueReader() {
 886  82922
         return asyncQueueReader;
 887  
     }
 888  
 
 889  
     /**
 890  
      * {@inheritDoc}
 891  
      */
 892  
     public AsyncQueueWriter getAsyncQueueWriter() {
 893  813621
         return asyncQueueWriter;
 894  
     }
 895  
     
 896  
     public long getSelectTimeout() {
 897  0
         return selectTimeout;
 898  
     }
 899  
     
 900  
     public void setSelectTimeout(long selectTimeout) {
 901  0
         this.selectTimeout = selectTimeout;
 902  0
     }
 903  
     
 904  
     public int getServerTimeout() {
 905  0
         return serverTimeout;
 906  
     }
 907  
     
 908  
     public void setServerTimeout(int serverTimeout) {
 909  0
         this.serverTimeout = serverTimeout;
 910  0
     }
 911  
     
 912  
     public InetAddress getInet() {
 913  0
         return inet;
 914  
     }
 915  
     
 916  
     public void setInet(InetAddress inet) {
 917  0
         this.inet = inet;
 918  0
     }
 919  
     
 920  
     /**
 921  
      * Returns port number {@link SelectorHandler} is listening on
 922  
      * Similar to <code>getPort()</code>, but getting port number directly from
 923  
      * connection ({@link ServerSocket}, {@link DatagramSocket}).
 924  
      * So if default port number 0 was set during initialization, then <code>getPort()</code>
 925  
      * will return 0, but getPortLowLevel() will
 926  
      * return port number assigned by OS.
 927  
      *
 928  
      * @return port number or -1 if {@link SelectorHandler} was not initialized for accepting connections.
 929  
      */
 930  
     public int getPortLowLevel() {
 931  2
         if (serverSocket != null) {
 932  2
             return serverSocket.getLocalPort();
 933  
         }
 934  
         
 935  0
         return -1;
 936  
     }
 937  
     
 938  
     public int getPort() {
 939  3
         return port;
 940  
     }
 941  
     
 942  
     public void setPort(int port) {
 943  39
         this.port = port;
 944  39
     }
 945  
     
 946  
     public int getSsBackLog() {
 947  0
         return ssBackLog;
 948  
     }
 949  
     
 950  
     public void setSsBackLog(int ssBackLog) {
 951  0
         this.ssBackLog = ssBackLog;
 952  0
     }
 953  
   
 954  
     
 955  
     /**
 956  
      * Return the tcpNoDelay value used by the underlying accepted Sockets.
 957  
      * 
 958  
      * Also see setTcpNoDelay(boolean tcpNoDelay)
 959  
      */
 960  
     public boolean isTcpNoDelay() {
 961  0
         return tcpNoDelay;
 962  
     }
 963  
     
 964  
     
 965  
     /**
 966  
      * Enable (true) or disable (false) the underlying Socket's
 967  
      * tcpNoDelay.
 968  
      * 
 969  
      * Default value for tcpNoDelay is enabled (set to true), as according to 
 970  
      * the performance tests, it performs better for most cases.
 971  
      * 
 972  
      * The Connector side should also set tcpNoDelay the same as it is set here 
 973  
      * whenever possible.
 974  
      */
 975  
     public void setTcpNoDelay(boolean tcpNoDelay) {
 976  0
         this.tcpNoDelay = tcpNoDelay;
 977  0
     }
 978  
     
 979  
     public int getLinger() {
 980  0
         return linger;
 981  
     }
 982  
     
 983  
     public void setLinger(int linger) {
 984  0
         this.linger = linger;
 985  0
     }
 986  
     
 987  
     public int getSocketTimeout() {
 988  0
         return socketTimeout;
 989  
     }
 990  
     
 991  
     public void setSocketTimeout(int socketTimeout) {
 992  0
         this.socketTimeout = socketTimeout;
 993  0
     }
 994  
     
 995  
     public Logger getLogger() {
 996  0
         return logger;
 997  
     }
 998  
     
 999  
     public void setLogger(Logger logger) {
 1000  0
         this.logger = logger;
 1001  0
     }
 1002  
     
 1003  
     public boolean isReuseAddress() {
 1004  0
         return reuseAddress;
 1005  
     }
 1006  
     
 1007  
     public void setReuseAddress(boolean reuseAddress) {
 1008  0
         this.reuseAddress = reuseAddress;
 1009  0
     }
 1010  
     
 1011  
     /**
 1012  
      * {@inheritDoc}
 1013  
      */
 1014  
     public Pipeline pipeline(){
 1015  82622
         return pipeline;
 1016  
     }
 1017  
     
 1018  
     
 1019  
     /**
 1020  
      * {@inheritDoc}
 1021  
      */
 1022  
     public void setPipeline(Pipeline pipeline){
 1023  0
         this.pipeline = pipeline;
 1024  0
     }
 1025  
 
 1026  
     
 1027  
     /**
 1028  
      * {@inheritDoc}
 1029  
      */
 1030  
     public Class<? extends SelectionKeyHandler> getPreferredSelectionKeyHandler() {
 1031  238
         return DefaultSelectionKeyHandler.class;
 1032  
     }
 1033  
 
 1034  
     
 1035  
     /**
 1036  
      * Get the SelectionKeyHandler associated with this SelectorHandler.
 1037  
      */
 1038  
     public SelectionKeyHandler getSelectionKeyHandler() {
 1039  538439
         return selectionKeyHandler;
 1040  
     }
 1041  
     
 1042  
     
 1043  
     /**
 1044  
      * Set SelectionKeyHandler associated with this SelectorHandler.
 1045  
      */
 1046  
     public void setSelectionKeyHandler(SelectionKeyHandler selectionKeyHandler) {
 1047  135
         this.selectionKeyHandler = selectionKeyHandler;
 1048  135
         this.selectionKeyHandler.setSelectorHandler(this);
 1049  135
     }
 1050  
     
 1051  
     
 1052  
     /**
 1053  
      * Set the {@link ProtocolChainInstanceHandler} to use for
 1054  
      * creating instance of {@link ProtocolChain}.
 1055  
      */
 1056  
     public void setProtocolChainInstanceHandler(ProtocolChainInstanceHandler
 1057  
             instanceHandler){
 1058  0
         this.instanceHandler = instanceHandler;
 1059  0
     }
 1060  
     
 1061  
     
 1062  
     /**
 1063  
      * Return the {@link ProtocolChainInstanceHandler}
 1064  
      */
 1065  
     public ProtocolChainInstanceHandler getProtocolChainInstanceHandler(){
 1066  238810
         return instanceHandler;
 1067  
     }
 1068  
     
 1069  
     /**
 1070  
      * {@inheritDoc}
 1071  
      */
 1072  
     public void closeChannel(SelectableChannel channel) {
 1073  
         // channel could be either SocketChannel or ServerSocketChannel
 1074  392
         if (channel instanceof SocketChannel) {
 1075  358
             Socket socket = ((SocketChannel) channel).socket();
 1076  
             
 1077  
             try {
 1078  358
                 if (!socket.isInputShutdown()) socket.shutdownInput();
 1079  0
             } catch (IOException ex){
 1080  
                 ;
 1081  358
             }
 1082  
             
 1083  
             try {
 1084  358
                 if (!socket.isOutputShutdown()) socket.shutdownOutput();
 1085  0
             } catch (IOException ex){
 1086  
                 ;
 1087  358
             }
 1088  
             
 1089  
             try{
 1090  358
                 socket.close();
 1091  0
             } catch (IOException ex){
 1092  
                 ;
 1093  358
             }
 1094  
         }
 1095  
         
 1096  
         try{
 1097  392
             channel.close();
 1098  0
         } catch (IOException ex){
 1099  
             ; // LOG ME
 1100  392
         }
 1101  
         
 1102  392
         if (asyncQueueReader != null) {
 1103  392
             asyncQueueReader.onClose(channel);
 1104  
         }
 1105  
 
 1106  392
         if (asyncQueueWriter != null) {
 1107  392
             asyncQueueWriter.onClose(channel);
 1108  
         }
 1109  392
     }
 1110  
 
 1111  
     /**
 1112  
      * Polls {@link Context} from pool and initializes it.
 1113  
      * 
 1114  
      * @param serverContext {@link Controller} context
 1115  
      * @param key {@link SelectionKey}
 1116  
      * @return {@link Context}
 1117  
      */
 1118  
     protected Context pollContext(final Context serverContext, 
 1119  
             final SelectionKey key, final Context.OpType opType) {
 1120  73567
         ProtocolChain protocolChain = instanceHandler != null ? 
 1121  
             instanceHandler.poll() : 
 1122  
             serverContext.getController().getProtocolChainInstanceHandler().poll();
 1123  
         
 1124  73567
         final Context context = serverContext.getController().pollContext(key, opType);
 1125  73567
         context.setSelectionKey(key);
 1126  73567
         context.setSelectorHandler(this);
 1127  73567
         context.setAsyncQueueReader(asyncQueueReader);
 1128  73567
         context.setAsyncQueueWriter(asyncQueueWriter);
 1129  73567
         context.setProtocolChain(protocolChain);
 1130  73567
         return context;
 1131  
     }
 1132  
     
 1133  
     //--------------- ConnectorInstanceHandler -----------------------------
 1134  
     /**
 1135  
      * Return <Callable>factory<Callable> object, which knows how
 1136  
      * to create {@link ConnectorInstanceHandler} corresponding to the protocol
 1137  
      * @return <Callable>factory</code>
 1138  
      */
 1139  
     protected Callable<ConnectorHandler> getConnectorInstanceHandlerDelegate() {
 1140  103
         return new Callable<ConnectorHandler>() {
 1141  
             public ConnectorHandler call() throws Exception {
 1142  5
                 return new TCPConnectorHandler();
 1143  
             }
 1144  
         };
 1145  
     }
 1146  
 
 1147  
     // ----------- AttributeHolder interface implementation ----------- //  
 1148  
 
 1149  
     /**
 1150  
      * Remove a key/value object.
 1151  
      * Method is not thread safe
 1152  
      * 
 1153  
      * @param key - name of an attribute
 1154  
      * @return  attribute which has been removed
 1155  
      */
 1156  
     public Object removeAttribute(String key) {
 1157  0
         if (attributes == null) return null;
 1158  
         
 1159  0
         return attributes.remove(key);
 1160  
     }
 1161  
 
 1162  
     /**
 1163  
      * Set a key/value object.
 1164  
      * Method is not thread safe
 1165  
      * 
 1166  
      * @param key - name of an attribute
 1167  
      * @param value - value of named attribute
 1168  
      */
 1169  
     public void setAttribute(String key, Object value) {
 1170  0
         if (attributes == null) {
 1171  0
             attributes = new HashMap<String, Object>();
 1172  
         }
 1173  
         
 1174  0
         attributes.put(key, value);
 1175  0
     }
 1176  
 
 1177  
     /**
 1178  
      * Return an object based on a key.
 1179  
      * Method is not thread safe
 1180  
      * 
 1181  
      * @param key - name of an attribute
 1182  
      * @return - attribute value for the <tt>key</tt>, null if <tt>key</tt>
 1183  
      *           does not exist in <tt>attributes</tt>
 1184  
      */
 1185  
     public Object getAttribute(String key) {
 1186  0
         if (attributes == null) return null;
 1187  
 
 1188  0
         return attributes.get(key);
 1189  
     }
 1190  
     
 1191  
     
 1192  
     /**
 1193  
      * Set a {@link Map} of attribute name/value pairs.
 1194  
      * Old {@link AttributeHolder} values will not be available.
 1195  
      * Later changes of this {@link Map} will lead to changes to the current
 1196  
      * {@link AttributeHolder}.
 1197  
      * 
 1198  
      * @param attributes - map of name/value pairs
 1199  
      */
 1200  
     public void setAttributes(Map<String, Object> attributes) {
 1201  6
         this.attributes = attributes;
 1202  6
     }
 1203  
 
 1204  
 
 1205  
     /**
 1206  
      * Return a {@link Map} of attribute name/value pairs.
 1207  
      * Updates, performed on the returned {@link Map} will be reflected in
 1208  
      * this {@link AttributeHolder}
 1209  
      * 
 1210  
      * @return - {@link Map} of attribute name/value pairs
 1211  
      */
 1212  
     public Map<String, Object> getAttributes() {
 1213  6
         return attributes;
 1214  
     }
 1215  
 }