Coverage Report - com.sun.grizzly.TCPConnectorHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
TCPConnectorHandler
70 %
119/170
57 %
33/58
0
TCPConnectorHandler$1
60 %
3/5
N/A
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 package com.sun.grizzly;
 39  
 
 40  
 import com.sun.grizzly.async.AsyncQueueDataProcessor;
 41  
 import com.sun.grizzly.async.AsyncQueueReadable;
 42  
 import com.sun.grizzly.async.AsyncReadCallbackHandler;
 43  
 import com.sun.grizzly.async.AsyncReadCondition;
 44  
 import com.sun.grizzly.async.AsyncWriteCallbackHandler;
 45  
 import com.sun.grizzly.async.AsyncQueueWritable;
 46  
 import com.sun.grizzly.util.InputReader;
 47  
 import com.sun.grizzly.util.OutputWriter;
 48  
 import java.io.IOException;
 49  
 import java.net.Socket;
 50  
 import java.net.SocketAddress;
 51  
 import java.net.SocketException;
 52  
 import java.nio.ByteBuffer;
 53  
 import java.nio.channels.AlreadyConnectedException;
 54  
 import java.nio.channels.NotYetConnectedException;
 55  
 import java.nio.channels.SelectableChannel;
 56  
 import java.nio.channels.SelectionKey;
 57  
 import java.nio.channels.SocketChannel;
 58  
 import java.util.concurrent.CountDownLatch;
 59  
 import java.util.concurrent.TimeUnit;
 60  
 import java.util.logging.Level;
 61  
 /**
 62  
  * Non blocking TCP Connector Handler. The recommended way to use this class
 63  
  * is by creating an external Controller and share the same SelectorHandler
 64  
  * instance.
 65  
  * <p>
 66  
  * Recommended
 67  
  * -----------
 68  
  * </p><p><pre><code>
 69  
  * Controller controller = new Controller();
 70  
  * // new TCPSelectorHandler(true) means the Selector will be used only
 71  
  * // for client operation (OP_READ, OP_WRITE, OP_CONNECT).
 72  
  * TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler(true);
 73  
  * controller.setSelectorHandler(tcpSelectorHandler);
 74  
  * TCPConnectorHandler tcpConnectorHandler = new TCPConnectorHandler();
 75  
  * tcpConnectorHandler.connect(localhost,port, new CallbackHandler(){...},
 76  
  *                             tcpSelectorHandler);
 77  
  * TCPConnectorHandler tcpConnectorHandler2 = new TCPConnectorHandler();
 78  
  * tcpConnectorHandler2.connect(localhost,port, new CallbackHandler(){...},
 79  
  *                             tcpSelectorHandler);
 80  
  * </code></pre></p><p>
 81  
  * Not recommended (but still works)
 82  
  * ---------------------------------
 83  
  * </p><p><pre><code>
 84  
  * TCPConnectorHandler tcpConnectorHandler = new TCPConnectorHandler();
 85  
  * tcpConnectorHandler.connect(localhost,port);
 86  
  * </code></pre></p><p>
 87  
  *
 88  
  * Internally, a new Controller will be created everytime connect(localhost,port)
 89  
  * is invoked, which has an impact on performance.
 90  
  *
 91  
  * @author Jeanfrancois Arcand
 92  
  */
 93  34
 public class TCPConnectorHandler implements 
 94  
         ConnectorHandler<TCPSelectorHandler, CallbackHandler>, 
 95  
         AsyncQueueWritable, AsyncQueueReadable {
 96  
     
 97  
     /**
 98  
      * default TCP channel connection timeout in milliseconds
 99  
      */
 100  
     private static final int DEFAULT_CONNECTION_TIMEOUT = 30 * 1000;
 101  
     
 102  
     /**
 103  
      * The underlying TCPSelectorHandler used to mange SelectionKeys.
 104  
      */
 105  
     protected TCPSelectorHandler selectorHandler;
 106  
     
 107  
     
 108  
     /**
 109  
      * A {@link CallbackHandler} handler invoked by the TCPSelectorHandler
 110  
      * when a non blocking operation is ready to be processed.
 111  
      */
 112  
     private CallbackHandler callbackHandler;
 113  
     
 114  
     
 115  
     /**
 116  
      * A blocking {@link InputStream} that use a pool of Selector
 117  
      * to execute a blocking read operation.
 118  
      */
 119  
     private InputReader inputStream;
 120  
     
 121  
     
 122  
     /**
 123  
      * The connection's SocketChannel.
 124  
      */
 125  
     private SocketChannel socketChannel;
 126  
     
 127  
     
 128  
     /**
 129  
      * Is the connection established.
 130  
      */
 131  
     private volatile boolean isConnected;
 132  
     
 133  
     
 134  
     /**
 135  
      * The internal Controller used (in case not specified).
 136  
      */
 137  
     private Controller controller;
 138  
     
 139  
     
 140  
     /**
 141  
      * IsConnected Latch related
 142  
      */
 143  
     private CountDownLatch isConnectedLatch;
 144  
     
 145  
     
 146  
     /**
 147  
      * Are we creating a controller every run.
 148  
      */
 149  25
     private boolean isStandalone = false;
 150  
     
 151  
     
 152  
     /**
 153  
      * The socket tcpDelay.
 154  
      * 
 155  
      * Default value for tcpNoDelay.
 156  
      */
 157  25
     protected boolean tcpNoDelay = true;
 158  
     
 159  
     
 160  
     /**
 161  
      * The socket reuseAddress
 162  
      */
 163  25
     protected boolean reuseAddress = true;
 164  
     
 165  
     
 166  
     /**
 167  
      * The socket linger.
 168  
      */
 169  25
     protected int linger = -1;    
 170  
     
 171  
     /**
 172  
      * Connection timeout is milliseconds
 173  
      */
 174  25
     protected int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
 175  
     
 176  
     
 177  
     /**
 178  
      * Connect to hostname:port. When an aysnchronous event happens (e.g
 179  
      * OP_READ or OP_WRITE), the {@link Controller} will invoke
 180  
      * the CallBackHandler.
 181  
      * @param remoteAddress remote address to connect
 182  
      * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when
 183  
      *        a non blocking operation is ready to be handled. When null, all 
 184  
      *        read and write operation will be delegated to the default
 185  
      *        {@link ProtocolChain} and its list of {@link ProtocolFilter} 
 186  
      *        . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}.
 187  
      * @throws java.io.IOException
 188  
      */
 189  
     public void connect(SocketAddress remoteAddress,
 190  
             CallbackHandler callbackHandler) throws IOException {
 191  
         
 192  32
         connect(remoteAddress,null,callbackHandler);
 193  32
     }
 194  
     
 195  
     
 196  
     /**
 197  
      * Connect to hostname:port. When an aysnchronous event happens (e.g
 198  
      * OP_READ or OP_WRITE), the {@link Controller} will invoke
 199  
      * the CallBackHandler.
 200  
      * @param remoteAddress remote address to connect
 201  
      * @param localAddress local address to bind
 202  
      * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when
 203  
      *        a non blocking operation is ready to be handled. When null, all 
 204  
      *        read and write operation will be delegated to the default
 205  
      *        {@link ProtocolChain} and its list of {@link ProtocolFilter} 
 206  
      *        . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}.
 207  
      * @throws java.io.IOException
 208  
      */
 209  
     public void connect(SocketAddress remoteAddress, SocketAddress localAddress,
 210  
             CallbackHandler callbackHandler) throws IOException {
 211  
         
 212  32
         if (controller == null){
 213  0
             throw new IllegalStateException("Controller cannot be null");
 214  
         }
 215  
         
 216  32
         connect(remoteAddress,localAddress,callbackHandler,
 217  
                 (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
 218  32
     }
 219  
     
 220  
     
 221  
     /**
 222  
      * Connect to hostname:port. When an aysnchronous event happens (e.g
 223  
      * OP_READ or OP_WRITE), the {@link Controller} will invoke
 224  
      * the CallBackHandler.
 225  
      * @param remoteAddress remote address to connect
 226  
      * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when
 227  
      *        a non blocking operation is ready to be handled. When null, all 
 228  
      *        read and write operation will be delegated to the default
 229  
      *        {@link ProtocolChain} and its list of {@link ProtocolFilter} 
 230  
      *        . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}.
 231  
      * @param selectorHandler an instance of SelectorHandler.
 232  
      * @throws java.io.IOException
 233  
      */
 234  
     public void connect(SocketAddress remoteAddress,
 235  
             CallbackHandler callbackHandler,
 236  
             TCPSelectorHandler selectorHandler) throws IOException {
 237  
         
 238  0
         connect(remoteAddress,null,callbackHandler,selectorHandler);
 239  0
     }
 240  
     
 241  
     /**
 242  
      * Connect to hostname:port. When an aysnchronous event happens (e.g
 243  
      * OP_READ or OP_WRITE), the {@link Controller} will invoke
 244  
      * the CallBackHandler.
 245  
      * @param remoteAddress remote address to connect
 246  
      * @param localAddress local address to bin
 247  
      * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when
 248  
      *        a non blocking operation is ready to be handled. When null, all 
 249  
      *        read and write operation will be delegated to the default
 250  
      *        {@link ProtocolChain} and its list of {@link ProtocolFilter} 
 251  
      *        . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}.
 252  
      * @param selectorHandler an instance of SelectorHandler.
 253  
      * @throws java.io.IOException
 254  
      */
 255  
     public void connect(SocketAddress remoteAddress, SocketAddress localAddress,
 256  
             CallbackHandler callbackHandler,
 257  
             TCPSelectorHandler selectorHandler) throws IOException {
 258  
         
 259  52
         if (isConnected){
 260  0
             throw new AlreadyConnectedException();
 261  
         }
 262  
         
 263  52
         if (controller == null){
 264  0
             throw new IllegalStateException("Controller cannot be null");
 265  
         }
 266  
         
 267  52
         if (selectorHandler == null){
 268  0
             throw new IllegalStateException("SelectorHandler cannot be null");
 269  
         }
 270  
         
 271  52
         this.selectorHandler = selectorHandler;
 272  
         
 273  52
         if (callbackHandler == null){
 274  0
             callbackHandler = new DefaultCallbackHandler(this); 
 275  
         } else {
 276  52
             this.callbackHandler = callbackHandler;
 277  
         }
 278  
         
 279  
         // Wait for the onConnect to be invoked.
 280  52
         isConnectedLatch = new CountDownLatch(1);
 281  
         
 282  52
         selectorHandler.connect(remoteAddress,localAddress,callbackHandler);
 283  52
         inputStream = new InputReader();
 284  
         
 285  
         try {
 286  52
             isConnectedLatch.await(connectionTimeout, TimeUnit.MILLISECONDS);
 287  0
         } catch (InterruptedException ex) {
 288  0
             throw new IOException(ex.getMessage());
 289  52
         }
 290  52
     }
 291  
     
 292  
     
 293  
     /**
 294  
      * Connect to hostname:port. Internally an instance of Controller and
 295  
      * its default SelectorHandler will be created everytime this method is
 296  
      * called. This method should be used only and only if no external
 297  
      * Controller has been initialized.
 298  
      * @param remoteAddress remote address to connect
 299  
      * @throws java.io.IOException
 300  
      */
 301  
     public void connect(SocketAddress remoteAddress)
 302  
             throws IOException {
 303  20
         connect(remoteAddress,(SocketAddress)null);
 304  20
     }
 305  
     
 306  
     
 307  
     /**
 308  
      * Connect to hostname:port. Internally an instance of Controller and
 309  
      * its default SelectorHandler will be created everytime this method is
 310  
      * called. This method should be used only and only if no external
 311  
      * Controller has been initialized.
 312  
      * @param remoteAddress remote address to connect
 313  
      * @throws java.io.IOException
 314  
      * @param localAddress local address to bin
 315  
      */
 316  
     public void connect(SocketAddress remoteAddress, SocketAddress localAddress)
 317  
             throws IOException {
 318  
         
 319  20
         if (isConnected){
 320  0
             throw new AlreadyConnectedException();
 321  
         }
 322  
         
 323  20
         if (controller == null){
 324  20
             isStandalone = true;
 325  20
             controller = new Controller();
 326  20
             controller.setSelectorHandler(new TCPSelectorHandler(true));
 327  20
             DefaultPipeline pipeline = new DefaultPipeline();
 328  20
             pipeline.initPipeline();
 329  20
             pipeline.startPipeline();
 330  20
             controller.setPipeline(pipeline);
 331  
             
 332  20
             final CountDownLatch latch = new CountDownLatch(1);
 333  20
             controller.addStateListener(new ControllerStateListenerAdapter() {
 334  
                 @Override
 335  
                 public void onReady() {
 336  20
                     latch.countDown();
 337  20
                 }
 338  
                 
 339  
                 @Override
 340  
                 public void onException(Throwable e) {
 341  0
                     latch.countDown();
 342  0
                 }
 343  
             });
 344  
 
 345  20
             callbackHandler = new DefaultCallbackHandler(this,false);
 346  
             
 347  20
             new Thread(controller, "GrizzlyTCPConnectorHandler-Controller").start();
 348  
             
 349  
             try {
 350  20
                 latch.await();
 351  0
             } catch (InterruptedException ex) {
 352  20
             }
 353  
         }
 354  
         
 355  20
         if (null == callbackHandler) {
 356  0
             callbackHandler = new DefaultCallbackHandler(this);
 357  
         }
 358  
         
 359  20
         connect(remoteAddress,localAddress,callbackHandler,
 360  
                 (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
 361  20
     }
 362  
 
 363  
 
 364  
     /**
 365  
      * Read bytes. If blocking is set to <tt>true</tt>, a pool of temporary
 366  
      * {@link Selector} will be used to read bytes.
 367  
      * @param byteBuffer The byteBuffer to store bytes.
 368  
      * @param blocking <tt>true</tt> if a a pool of temporary Selector
 369  
      *        is required to handle a blocking read.
 370  
      * @return number of bytes read
 371  
      * @throws java.io.IOException
 372  
      */
 373  
     public long read(ByteBuffer byteBuffer, boolean blocking) throws IOException {
 374  1605
         if (!isConnected){
 375  0
             throw new NotYetConnectedException();
 376  
         }
 377  
         
 378  1605
         SelectionKey key = socketChannel.keyFor(selectorHandler.getSelector());
 379  1605
         if (blocking){
 380  312
             inputStream.setSelectionKey(key);
 381  312
             return inputStream.read(byteBuffer);
 382  
         } else {
 383  1293
             if (callbackHandler == null){
 384  0
                 throw new IllegalStateException
 385  
                         ("Non blocking read needs a CallbackHandler");
 386  
             }
 387  1293
             int nRead = socketChannel.read(byteBuffer);
 388  
             
 389  1293
             if (nRead == 0){
 390  1097
                 selectorHandler.register(key,SelectionKey.OP_READ);
 391  
             }
 392  1293
             return nRead;
 393  
         }
 394  
     }
 395  
     
 396  
     
 397  
     /**
 398  
      * Writes bytes. If blocking is set to <tt>true</tt>, a pool of temporary
 399  
      * {@link Selector} will be used to writes bytes.
 400  
      * @param byteBuffer The byteBuffer to write.
 401  
      * @param blocking <tt>true</tt> if a a pool of temporary Selector
 402  
      *        is required to handle a blocking write.
 403  
      * @return number of bytes written
 404  
      * @throws java.io.IOException
 405  
      */
 406  
     public long write(ByteBuffer byteBuffer, boolean blocking) throws IOException {
 407  1414
         if (!isConnected){
 408  0
             throw new NotYetConnectedException();
 409  
         }
 410  
         
 411  1414
         if (blocking){
 412  313
             return OutputWriter.flushChannel(socketChannel,byteBuffer);
 413  
         } else {
 414  
             
 415  1101
             if (callbackHandler == null){
 416  0
                 throw new IllegalStateException
 417  
                         ("Non blocking write needs a CallbackHandler");
 418  
             }
 419  
             
 420  1101
             SelectionKey key = socketChannel.keyFor(selectorHandler.getSelector());
 421  1101
             int nWrite = 1;
 422  1101
             int totalWriteBytes = 0;
 423  2202
             while (nWrite > 0 && byteBuffer.hasRemaining()){
 424  1101
                 nWrite = socketChannel.write(byteBuffer);
 425  1101
                 totalWriteBytes += nWrite;
 426  
             }
 427  
             
 428  1101
             if (totalWriteBytes == 0 && byteBuffer.hasRemaining()){
 429  0
                 selectorHandler.register(key,SelectionKey.OP_WRITE);
 430  
             }
 431  1101
             return totalWriteBytes;
 432  
         }
 433  
     }
 434  
     
 435  
     
 436  
     /**
 437  
      * {@inheritDoc}
 438  
      */
 439  
     public void readFromAsyncQueue(ByteBuffer buffer, 
 440  
             AsyncReadCallbackHandler callbackHandler) throws IOException {
 441  100
         readFromAsyncQueue(buffer, callbackHandler, null);
 442  100
     }
 443  
 
 444  
     /**
 445  
      * {@inheritDoc}
 446  
      */
 447  
     public void readFromAsyncQueue(ByteBuffer buffer, 
 448  
             AsyncReadCallbackHandler callbackHandler, 
 449  
             AsyncReadCondition condition) throws IOException {
 450  100
         readFromAsyncQueue(buffer, callbackHandler, condition, null);
 451  100
     }
 452  
 
 453  
     /**
 454  
      * {@inheritDoc}
 455  
      */
 456  
     public void readFromAsyncQueue(ByteBuffer buffer, 
 457  
             AsyncReadCallbackHandler callbackHandler, 
 458  
             AsyncReadCondition condition, 
 459  
             AsyncQueueDataProcessor readPostProcessor) throws IOException {
 460  100
         selectorHandler.getAsyncQueueReader().read(
 461  
                 socketChannel.keyFor(selectorHandler.getSelector()), buffer, 
 462  
                 callbackHandler, condition, readPostProcessor);
 463  100
     }
 464  
 
 465  
     
 466  
     /**
 467  
      * {@inheritDoc}
 468  
      */
 469  
     public void writeToAsyncQueue(ByteBuffer buffer) throws IOException {
 470  200000
         writeToAsyncQueue(buffer, null);
 471  200000
     }
 472  
 
 473  
 
 474  
     /**
 475  
      * {@inheritDoc}
 476  
      */
 477  
     public void writeToAsyncQueue(ByteBuffer buffer, 
 478  
             AsyncWriteCallbackHandler callbackHandler) throws IOException {
 479  200000
         writeToAsyncQueue(buffer, callbackHandler, null);
 480  200000
     }
 481  
 
 482  
 
 483  
     /**
 484  
      * {@inheritDoc}
 485  
      */
 486  
     public void writeToAsyncQueue(ByteBuffer buffer, 
 487  
             AsyncWriteCallbackHandler callbackHandler,
 488  
             AsyncQueueDataProcessor writePreProcessor) throws IOException {
 489  200000
         writeToAsyncQueue(buffer, callbackHandler, writePreProcessor, false);
 490  200000
     }
 491  
 
 492  
     
 493  
     /**
 494  
      * {@inheritDoc}
 495  
      */
 496  
     public void writeToAsyncQueue(ByteBuffer buffer, 
 497  
             AsyncWriteCallbackHandler callbackHandler,
 498  
             AsyncQueueDataProcessor writePreProcessor,
 499  
             boolean isCloneByteBuffer) throws IOException {
 500  200000
         selectorHandler.getAsyncQueueWriter().write(
 501  
                 socketChannel.keyFor(selectorHandler.getSelector()), buffer,
 502  
                 callbackHandler, writePreProcessor, isCloneByteBuffer);
 503  200000
     }
 504  
 
 505  
     
 506  
     /**
 507  
      * {@inheritDoc}
 508  
      */
 509  
     public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer)
 510  
             throws IOException {
 511  0
         writeToAsyncQueue(dstAddress, buffer, null);
 512  0
     }
 513  
 
 514  
 
 515  
     /**
 516  
      * {@inheritDoc}
 517  
      */
 518  
     public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, 
 519  
             AsyncWriteCallbackHandler callbackHandler) throws IOException {
 520  0
         writeToAsyncQueue(dstAddress, buffer, callbackHandler, null);
 521  0
     }
 522  
 
 523  
 
 524  
     /**
 525  
      * {@inheritDoc}
 526  
      */
 527  
     public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, 
 528  
             AsyncWriteCallbackHandler callbackHandler, 
 529  
             AsyncQueueDataProcessor writePreProcessor) throws IOException {
 530  0
         writeToAsyncQueue(dstAddress, buffer, callbackHandler, writePreProcessor,
 531  
                 false);
 532  0
     }
 533  
 
 534  
     
 535  
     /**
 536  
      * {@inheritDoc}
 537  
      */
 538  
     public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, 
 539  
             AsyncWriteCallbackHandler callbackHandler, 
 540  
             AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer)
 541  
             throws IOException {
 542  0
         selectorHandler.getAsyncQueueWriter().write(
 543  
                 socketChannel.keyFor(selectorHandler.getSelector()), dstAddress,
 544  
                 buffer, callbackHandler, writePreProcessor, isCloneByteBuffer);
 545  0
     }
 546  
 
 547  
     
 548  
     /**
 549  
      * Close the underlying connection.
 550  
      */
 551  
     public void close() throws IOException{
 552  51
         if (socketChannel != null){
 553  51
             if (selectorHandler != null){
 554  51
                 SelectionKey key =
 555  
                         socketChannel.keyFor(selectorHandler.getSelector());
 556  
                 
 557  51
                 if (key == null) return;
 558  
                 
 559  51
                 selectorHandler.getSelectionKeyHandler().cancel(key);
 560  51
             } else {
 561  0
                 socketChannel.close();
 562  
             }
 563  
         }
 564  
         
 565  51
         if (controller != null && isStandalone){
 566  20
             controller.stop();
 567  20
             controller = null;
 568  
         }
 569  
         
 570  51
         isStandalone = false;
 571  51
         isConnected = false;
 572  51
         connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
 573  51
     }
 574  
     
 575  
     
 576  
     /**
 577  
      * Finish handling the OP_CONNECT interest ops.
 578  
      * @param key - a {@link SelectionKey}
 579  
      */
 580  
     public void finishConnect(SelectionKey key) throws IOException{
 581  
         try{
 582  52
             if (Controller.logger().isLoggable(Level.FINE)) {
 583  0
                 Controller.logger().log(Level.FINE, "Finish connect");
 584  
             }
 585  
             
 586  52
             socketChannel = (SocketChannel)key.channel();
 587  52
             socketChannel.finishConnect();
 588  52
             isConnected = socketChannel.isConnected();
 589  52
             configureChannel(socketChannel);
 590  
             
 591  52
             if (Controller.logger().isLoggable(Level.FINE)) {
 592  0
                 Controller.logger().log(Level.FINE, "isConnected: " + isConnected);
 593  
             }
 594  0
         } catch (IOException ex){
 595  0
             throw ex;
 596  
         } finally {
 597  52
             isConnectedLatch.countDown();
 598  52
         }
 599  52
     }
 600  
     
 601  
     
 602  
     /**
 603  
      * {@inheritDoc}
 604  
      */
 605  
     public void configureChannel(SelectableChannel channel) throws IOException{
 606  52
         Socket socket = ((SocketChannel) channel).socket();
 607  
 
 608  
         try{
 609  52
             if(linger >= 0 ) {
 610  0
                 socket.setSoLinger( true, linger);
 611  
             }
 612  0
         } catch (SocketException ex){
 613  0
             Controller.logger().log(Level.WARNING,
 614  
                     "setSoLinger exception ",ex);
 615  52
         }
 616  
         
 617  
         try{
 618  52
             socket.setTcpNoDelay(tcpNoDelay);
 619  0
         } catch (SocketException ex){
 620  0
             Controller.logger().log(Level.WARNING,
 621  
                     "setTcpNoDelay exception ",ex);
 622  52
         }
 623  
         
 624  
         try{
 625  52
             socket.setReuseAddress(reuseAddress);
 626  0
         } catch (SocketException ex){
 627  0
             Controller.logger().log(Level.WARNING,
 628  
                     "setReuseAddress exception ",ex);
 629  52
         }
 630  52
     }
 631  
     
 632  
     
 633  
     /**
 634  
      * A token decribing the protocol supported by an implementation of this
 635  
      * interface
 636  
      * @return this {@link ConnectorHandler}'s protocol
 637  
      */
 638  
     public Controller.Protocol protocol(){
 639  83
         return Controller.Protocol.TCP;
 640  
     }
 641  
     
 642  
     
 643  
     /**
 644  
      * Is the underlying SocketChannel connected.
 645  
      * @return <tt>true</tt> if connected, otherwise <tt>false</tt>
 646  
      */
 647  
     public boolean isConnected(){
 648  10
         return isConnected && socketChannel.isOpen();
 649  
     }
 650  
     
 651  
     
 652  
     /**
 653  
      * Return the  {@link Controller}
 654  
      * @return the  {@link Controller}
 655  
      */
 656  
     public Controller getController() {
 657  0
         return controller;
 658  
     }
 659  
     
 660  
     
 661  
     /**
 662  
      * Set the {@link Controller} to use with this instance.
 663  
      * @param controller the {@link Controller} to use with this instance.
 664  
      */
 665  
     public void setController(Controller controller) {
 666  64
         this.controller = controller;
 667  64
     }
 668  
     
 669  
     
 670  
     /**
 671  
      * Return the current {@link SocketChannel} used.
 672  
      * @return the current {@link SocketChannel} used.
 673  
      */
 674  
     public SelectableChannel getUnderlyingChannel() {
 675  9
         return socketChannel;
 676  
     }
 677  
     
 678  
     
 679  
     /**
 680  
      * Set the {@link SocketChannel}.
 681  
      * @param the {@link SocketChannel} to use.
 682  
      */  
 683  
     protected void setUnderlyingChannel(SocketChannel socketChannel){
 684  20
         this.socketChannel = socketChannel;
 685  20
     }   
 686  
     
 687  
     
 688  
     /**
 689  
      * Return the {@link CallbackHandler}. 
 690  
      * @return the {@link CallbackHandler}. 
 691  
      */
 692  
     public CallbackHandler getCallbackHandler() {
 693  0
         return callbackHandler;
 694  
     }
 695  
     
 696  
 
 697  
     /**
 698  
      * Set the {@link CallbackHandler}. 
 699  
      * @param callbackHandler the {@link CallbackHandler}. 
 700  
      */   
 701  
     public void setCallbackHandler(CallbackHandler callbackHandler) {
 702  9
         this.callbackHandler = callbackHandler;
 703  9
     }
 704  
     
 705  
     
 706  
     /**
 707  
      * Return the associated {@link SelectorHandler}.
 708  
      * @return the associated {@link SelectorHandler}.
 709  
      */
 710  
     public TCPSelectorHandler getSelectorHandler() {
 711  9
         return selectorHandler;
 712  
     }
 713  
     
 714  
     
 715  
     /**
 716  
      * Return the tcpNoDelay value used by the underlying accepted Sockets.
 717  
      * 
 718  
      * Also see setTcpNoDelay(boolean tcpNoDelay)
 719  
      */
 720  
     public boolean isTcpNoDelay() {
 721  0
         return tcpNoDelay;
 722  
     }
 723  
     
 724  
     
 725  
     /**
 726  
      * Enable (true) or disable (false) the underlying Socket's
 727  
      * tcpNoDelay.
 728  
      * 
 729  
      * Default value for tcpNoDelay is disabled (set to false).
 730  
      * 
 731  
      * Disabled by default since enabling tcpNoDelay for most applications
 732  
      * can cause packets to appear to arrive in a fragmented fashion where it 
 733  
      * takes multiple OP_READ events (i.e. multiple calls to read small 
 734  
      * messages). The common behaviour seen when this occurs is that often times
 735  
      * a small number of bytes, as small as 1 byte at a time is read per OP_READ
 736  
      * event dispatch.  This results in a large number of system calls to
 737  
      * read(), system calls to enable and disable interest ops and potentially
 738  
      * a large number of thread context switches between a thread doing the
 739  
      * Select(ing) and a worker thread doing the read.
 740  
      * 
 741  
      * The Connector side should also set tcpNoDelay the same as it is set here 
 742  
      * whenever possible.
 743  
      */
 744  
     public void setTcpNoDelay(boolean tcpNoDelay) {
 745  0
         this.tcpNoDelay = tcpNoDelay;
 746  0
     }
 747  
     
 748  
     
 749  
     /**
 750  
      * @see java.net.Socket#setLinger()
 751  
      * @return the linger value.
 752  
      */
 753  
     public int getLinger() {
 754  0
         return linger;
 755  
     }
 756  
     
 757  
     
 758  
     /**
 759  
      * @see java.net.Socket#setLinger()
 760  
      */    
 761  
     public void setLinger(int linger) {
 762  0
         this.linger = linger;
 763  0
     }
 764  
 
 765  
     
 766  
     /**
 767  
      * Get TCP channel connection timeout in milliseconds
 768  
      * @return TCP channel connection timeout in milliseconds
 769  
      */
 770  
     public int getConnectionTimeout() {
 771  0
         return connectionTimeout;
 772  
     }
 773  
 
 774  
     
 775  
     /**
 776  
      * Set TCP channel connection timeout in milliseconds
 777  
      * @param connectionTimeout TCP channel connection timeout in milliseconds
 778  
      */
 779  
     public void setConnectionTimeout(int connectionTimeout) {
 780  0
         this.connectionTimeout = connectionTimeout;
 781  0
     }
 782  
     
 783  
     
 784  
     /**
 785  
      * @see java.net.Socket#setReuseAddress()
 786  
      */      
 787  
     public boolean isReuseAddress() {
 788  0
         return reuseAddress;
 789  
     }
 790  
     
 791  
     
 792  
     /**
 793  
      * @see java.net.Socket#setReuseAddress()
 794  
      */      
 795  
     public void setReuseAddress(boolean reuseAddress) {
 796  0
         this.reuseAddress = reuseAddress;
 797  0
     }
 798  
 }