Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
TCPConnectorHandler |
|
| 0.0;0 | ||||
TCPConnectorHandler$1 |
|
| 0.0;0 |
1 | /* | |
2 | * | |
3 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. | |
4 | * | |
5 | * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. | |
6 | * | |
7 | * The contents of this file are subject to the terms of either the GNU | |
8 | * General Public License Version 2 only ("GPL") or the Common Development | |
9 | * and Distribution License("CDDL") (collectively, the "License"). You | |
10 | * may not use this file except in compliance with the License. You can obtain | |
11 | * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html | |
12 | * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific | |
13 | * language governing permissions and limitations under the License. | |
14 | * | |
15 | * When distributing the software, include this License Header Notice in each | |
16 | * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. | |
17 | * Sun designates this particular file as subject to the "Classpath" exception | |
18 | * as provided by Sun in the GPL Version 2 section of the License file that | |
19 | * accompanied this code. If applicable, add the following below the License | |
20 | * Header, with the fields enclosed by brackets [] replaced by your own | |
21 | * identifying information: "Portions Copyrighted [year] | |
22 | * [name of copyright owner]" | |
23 | * | |
24 | * Contributor(s): | |
25 | * | |
26 | * If you wish your version of this file to be governed by only the CDDL or | |
27 | * only the GPL Version 2, indicate your decision by adding "[Contributor] | |
28 | * elects to include this software in this distribution under the [CDDL or GPL | |
29 | * Version 2] license." If you don't indicate a single choice of license, a | |
30 | * recipient has the option to distribute your version of this file under | |
31 | * either the CDDL, the GPL Version 2 or to extend the choice of license to | |
32 | * its licensees as provided above. However, if you add GPL Version 2 code | |
33 | * and therefore, elected the GPL Version 2 license, then the option applies | |
34 | * only if the new code is made subject to such option by the copyright | |
35 | * holder. | |
36 | * | |
37 | */ | |
38 | package com.sun.grizzly; | |
39 | ||
40 | import com.sun.grizzly.async.AsyncQueueDataProcessor; | |
41 | import com.sun.grizzly.async.AsyncQueueReadable; | |
42 | import com.sun.grizzly.async.AsyncReadCallbackHandler; | |
43 | import com.sun.grizzly.async.AsyncReadCondition; | |
44 | import com.sun.grizzly.async.AsyncWriteCallbackHandler; | |
45 | import com.sun.grizzly.async.AsyncQueueWritable; | |
46 | import com.sun.grizzly.util.InputReader; | |
47 | import com.sun.grizzly.util.OutputWriter; | |
48 | import java.io.IOException; | |
49 | import java.net.Socket; | |
50 | import java.net.SocketAddress; | |
51 | import java.net.SocketException; | |
52 | import java.nio.ByteBuffer; | |
53 | import java.nio.channels.AlreadyConnectedException; | |
54 | import java.nio.channels.NotYetConnectedException; | |
55 | import java.nio.channels.SelectableChannel; | |
56 | import java.nio.channels.SelectionKey; | |
57 | import java.nio.channels.SocketChannel; | |
58 | import java.util.concurrent.CountDownLatch; | |
59 | import java.util.concurrent.TimeUnit; | |
60 | import java.util.logging.Level; | |
61 | /** | |
62 | * Non blocking TCP Connector Handler. The recommended way to use this class | |
63 | * is by creating an external Controller and share the same SelectorHandler | |
64 | * instance. | |
65 | * <p> | |
66 | * Recommended | |
67 | * ----------- | |
68 | * </p><p><pre><code> | |
69 | * Controller controller = new Controller(); | |
70 | * // new TCPSelectorHandler(true) means the Selector will be used only | |
71 | * // for client operation (OP_READ, OP_WRITE, OP_CONNECT). | |
72 | * TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler(true); | |
73 | * controller.setSelectorHandler(tcpSelectorHandler); | |
74 | * TCPConnectorHandler tcpConnectorHandler = new TCPConnectorHandler(); | |
75 | * tcpConnectorHandler.connect(localhost,port, new CallbackHandler(){...}, | |
76 | * tcpSelectorHandler); | |
77 | * TCPConnectorHandler tcpConnectorHandler2 = new TCPConnectorHandler(); | |
78 | * tcpConnectorHandler2.connect(localhost,port, new CallbackHandler(){...}, | |
79 | * tcpSelectorHandler); | |
80 | * </code></pre></p><p> | |
81 | * Not recommended (but still works) | |
82 | * --------------------------------- | |
83 | * </p><p><pre><code> | |
84 | * TCPConnectorHandler tcpConnectorHandler = new TCPConnectorHandler(); | |
85 | * tcpConnectorHandler.connect(localhost,port); | |
86 | * </code></pre></p><p> | |
87 | * | |
88 | * Internally, a new Controller will be created everytime connect(localhost,port) | |
89 | * is invoked, which has an impact on performance. | |
90 | * | |
91 | * @author Jeanfrancois Arcand | |
92 | */ | |
93 | 34 | public class TCPConnectorHandler implements |
94 | ConnectorHandler<TCPSelectorHandler, CallbackHandler>, | |
95 | AsyncQueueWritable, AsyncQueueReadable { | |
96 | ||
97 | /** | |
98 | * default TCP channel connection timeout in milliseconds | |
99 | */ | |
100 | private static final int DEFAULT_CONNECTION_TIMEOUT = 30 * 1000; | |
101 | ||
102 | /** | |
103 | * The underlying TCPSelectorHandler used to mange SelectionKeys. | |
104 | */ | |
105 | protected TCPSelectorHandler selectorHandler; | |
106 | ||
107 | ||
108 | /** | |
109 | * A {@link CallbackHandler} handler invoked by the TCPSelectorHandler | |
110 | * when a non blocking operation is ready to be processed. | |
111 | */ | |
112 | private CallbackHandler callbackHandler; | |
113 | ||
114 | ||
115 | /** | |
116 | * A blocking {@link InputStream} that use a pool of Selector | |
117 | * to execute a blocking read operation. | |
118 | */ | |
119 | private InputReader inputStream; | |
120 | ||
121 | ||
122 | /** | |
123 | * The connection's SocketChannel. | |
124 | */ | |
125 | private SocketChannel socketChannel; | |
126 | ||
127 | ||
128 | /** | |
129 | * Is the connection established. | |
130 | */ | |
131 | private volatile boolean isConnected; | |
132 | ||
133 | ||
134 | /** | |
135 | * The internal Controller used (in case not specified). | |
136 | */ | |
137 | private Controller controller; | |
138 | ||
139 | ||
140 | /** | |
141 | * IsConnected Latch related | |
142 | */ | |
143 | private CountDownLatch isConnectedLatch; | |
144 | ||
145 | ||
146 | /** | |
147 | * Are we creating a controller every run. | |
148 | */ | |
149 | 25 | private boolean isStandalone = false; |
150 | ||
151 | ||
152 | /** | |
153 | * The socket tcpDelay. | |
154 | * | |
155 | * Default value for tcpNoDelay. | |
156 | */ | |
157 | 25 | protected boolean tcpNoDelay = true; |
158 | ||
159 | ||
160 | /** | |
161 | * The socket reuseAddress | |
162 | */ | |
163 | 25 | protected boolean reuseAddress = true; |
164 | ||
165 | ||
166 | /** | |
167 | * The socket linger. | |
168 | */ | |
169 | 25 | protected int linger = -1; |
170 | ||
171 | /** | |
172 | * Connection timeout is milliseconds | |
173 | */ | |
174 | 25 | protected int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; |
175 | ||
176 | ||
177 | /** | |
178 | * Connect to hostname:port. When an aysnchronous event happens (e.g | |
179 | * OP_READ or OP_WRITE), the {@link Controller} will invoke | |
180 | * the CallBackHandler. | |
181 | * @param remoteAddress remote address to connect | |
182 | * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when | |
183 | * a non blocking operation is ready to be handled. When null, all | |
184 | * read and write operation will be delegated to the default | |
185 | * {@link ProtocolChain} and its list of {@link ProtocolFilter} | |
186 | * . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}. | |
187 | * @throws java.io.IOException | |
188 | */ | |
189 | public void connect(SocketAddress remoteAddress, | |
190 | CallbackHandler callbackHandler) throws IOException { | |
191 | ||
192 | 32 | connect(remoteAddress,null,callbackHandler); |
193 | 32 | } |
194 | ||
195 | ||
196 | /** | |
197 | * Connect to hostname:port. When an aysnchronous event happens (e.g | |
198 | * OP_READ or OP_WRITE), the {@link Controller} will invoke | |
199 | * the CallBackHandler. | |
200 | * @param remoteAddress remote address to connect | |
201 | * @param localAddress local address to bind | |
202 | * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when | |
203 | * a non blocking operation is ready to be handled. When null, all | |
204 | * read and write operation will be delegated to the default | |
205 | * {@link ProtocolChain} and its list of {@link ProtocolFilter} | |
206 | * . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}. | |
207 | * @throws java.io.IOException | |
208 | */ | |
209 | public void connect(SocketAddress remoteAddress, SocketAddress localAddress, | |
210 | CallbackHandler callbackHandler) throws IOException { | |
211 | ||
212 | 32 | if (controller == null){ |
213 | 0 | throw new IllegalStateException("Controller cannot be null"); |
214 | } | |
215 | ||
216 | 32 | connect(remoteAddress,localAddress,callbackHandler, |
217 | (TCPSelectorHandler)controller.getSelectorHandler(protocol())); | |
218 | 32 | } |
219 | ||
220 | ||
221 | /** | |
222 | * Connect to hostname:port. When an aysnchronous event happens (e.g | |
223 | * OP_READ or OP_WRITE), the {@link Controller} will invoke | |
224 | * the CallBackHandler. | |
225 | * @param remoteAddress remote address to connect | |
226 | * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when | |
227 | * a non blocking operation is ready to be handled. When null, all | |
228 | * read and write operation will be delegated to the default | |
229 | * {@link ProtocolChain} and its list of {@link ProtocolFilter} | |
230 | * . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}. | |
231 | * @param selectorHandler an instance of SelectorHandler. | |
232 | * @throws java.io.IOException | |
233 | */ | |
234 | public void connect(SocketAddress remoteAddress, | |
235 | CallbackHandler callbackHandler, | |
236 | TCPSelectorHandler selectorHandler) throws IOException { | |
237 | ||
238 | 0 | connect(remoteAddress,null,callbackHandler,selectorHandler); |
239 | 0 | } |
240 | ||
241 | /** | |
242 | * Connect to hostname:port. When an aysnchronous event happens (e.g | |
243 | * OP_READ or OP_WRITE), the {@link Controller} will invoke | |
244 | * the CallBackHandler. | |
245 | * @param remoteAddress remote address to connect | |
246 | * @param localAddress local address to bin | |
247 | * @param callbackHandler the handler invoked by its associated {@link SelectorHandler} when | |
248 | * a non blocking operation is ready to be handled. When null, all | |
249 | * read and write operation will be delegated to the default | |
250 | * {@link ProtocolChain} and its list of {@link ProtocolFilter} | |
251 | * . When null, this {@link ConnectorHandler} will create an instance of {@link DefaultCallbackHandler}. | |
252 | * @param selectorHandler an instance of SelectorHandler. | |
253 | * @throws java.io.IOException | |
254 | */ | |
255 | public void connect(SocketAddress remoteAddress, SocketAddress localAddress, | |
256 | CallbackHandler callbackHandler, | |
257 | TCPSelectorHandler selectorHandler) throws IOException { | |
258 | ||
259 | 52 | if (isConnected){ |
260 | 0 | throw new AlreadyConnectedException(); |
261 | } | |
262 | ||
263 | 52 | if (controller == null){ |
264 | 0 | throw new IllegalStateException("Controller cannot be null"); |
265 | } | |
266 | ||
267 | 52 | if (selectorHandler == null){ |
268 | 0 | throw new IllegalStateException("SelectorHandler cannot be null"); |
269 | } | |
270 | ||
271 | 52 | this.selectorHandler = selectorHandler; |
272 | ||
273 | 52 | if (callbackHandler == null){ |
274 | 0 | callbackHandler = new DefaultCallbackHandler(this); |
275 | } else { | |
276 | 52 | this.callbackHandler = callbackHandler; |
277 | } | |
278 | ||
279 | // Wait for the onConnect to be invoked. | |
280 | 52 | isConnectedLatch = new CountDownLatch(1); |
281 | ||
282 | 52 | selectorHandler.connect(remoteAddress,localAddress,callbackHandler); |
283 | 52 | inputStream = new InputReader(); |
284 | ||
285 | try { | |
286 | 52 | isConnectedLatch.await(connectionTimeout, TimeUnit.MILLISECONDS); |
287 | 0 | } catch (InterruptedException ex) { |
288 | 0 | throw new IOException(ex.getMessage()); |
289 | 52 | } |
290 | 52 | } |
291 | ||
292 | ||
293 | /** | |
294 | * Connect to hostname:port. Internally an instance of Controller and | |
295 | * its default SelectorHandler will be created everytime this method is | |
296 | * called. This method should be used only and only if no external | |
297 | * Controller has been initialized. | |
298 | * @param remoteAddress remote address to connect | |
299 | * @throws java.io.IOException | |
300 | */ | |
301 | public void connect(SocketAddress remoteAddress) | |
302 | throws IOException { | |
303 | 20 | connect(remoteAddress,(SocketAddress)null); |
304 | 20 | } |
305 | ||
306 | ||
307 | /** | |
308 | * Connect to hostname:port. Internally an instance of Controller and | |
309 | * its default SelectorHandler will be created everytime this method is | |
310 | * called. This method should be used only and only if no external | |
311 | * Controller has been initialized. | |
312 | * @param remoteAddress remote address to connect | |
313 | * @throws java.io.IOException | |
314 | * @param localAddress local address to bin | |
315 | */ | |
316 | public void connect(SocketAddress remoteAddress, SocketAddress localAddress) | |
317 | throws IOException { | |
318 | ||
319 | 20 | if (isConnected){ |
320 | 0 | throw new AlreadyConnectedException(); |
321 | } | |
322 | ||
323 | 20 | if (controller == null){ |
324 | 20 | isStandalone = true; |
325 | 20 | controller = new Controller(); |
326 | 20 | controller.setSelectorHandler(new TCPSelectorHandler(true)); |
327 | 20 | DefaultPipeline pipeline = new DefaultPipeline(); |
328 | 20 | pipeline.initPipeline(); |
329 | 20 | pipeline.startPipeline(); |
330 | 20 | controller.setPipeline(pipeline); |
331 | ||
332 | 20 | final CountDownLatch latch = new CountDownLatch(1); |
333 | 20 | controller.addStateListener(new ControllerStateListenerAdapter() { |
334 | @Override | |
335 | public void onReady() { | |
336 | 20 | latch.countDown(); |
337 | 20 | } |
338 | ||
339 | @Override | |
340 | public void onException(Throwable e) { | |
341 | 0 | latch.countDown(); |
342 | 0 | } |
343 | }); | |
344 | ||
345 | 20 | callbackHandler = new DefaultCallbackHandler(this,false); |
346 | ||
347 | 20 | new Thread(controller, "GrizzlyTCPConnectorHandler-Controller").start(); |
348 | ||
349 | try { | |
350 | 20 | latch.await(); |
351 | 0 | } catch (InterruptedException ex) { |
352 | 20 | } |
353 | } | |
354 | ||
355 | 20 | if (null == callbackHandler) { |
356 | 0 | callbackHandler = new DefaultCallbackHandler(this); |
357 | } | |
358 | ||
359 | 20 | connect(remoteAddress,localAddress,callbackHandler, |
360 | (TCPSelectorHandler)controller.getSelectorHandler(protocol())); | |
361 | 20 | } |
362 | ||
363 | ||
364 | /** | |
365 | * Read bytes. If blocking is set to <tt>true</tt>, a pool of temporary | |
366 | * {@link Selector} will be used to read bytes. | |
367 | * @param byteBuffer The byteBuffer to store bytes. | |
368 | * @param blocking <tt>true</tt> if a a pool of temporary Selector | |
369 | * is required to handle a blocking read. | |
370 | * @return number of bytes read | |
371 | * @throws java.io.IOException | |
372 | */ | |
373 | public long read(ByteBuffer byteBuffer, boolean blocking) throws IOException { | |
374 | 1620 | if (!isConnected){ |
375 | 0 | throw new NotYetConnectedException(); |
376 | } | |
377 | ||
378 | 1620 | SelectionKey key = socketChannel.keyFor(selectorHandler.getSelector()); |
379 | 1620 | if (blocking){ |
380 | 312 | inputStream.setSelectionKey(key); |
381 | 312 | return inputStream.read(byteBuffer); |
382 | } else { | |
383 | 1308 | if (callbackHandler == null){ |
384 | 0 | throw new IllegalStateException |
385 | ("Non blocking read needs a CallbackHandler"); | |
386 | } | |
387 | 1308 | int nRead = socketChannel.read(byteBuffer); |
388 | ||
389 | 1308 | if (nRead == 0){ |
390 | 993 | selectorHandler.register(key,SelectionKey.OP_READ); |
391 | } | |
392 | 1308 | return nRead; |
393 | } | |
394 | } | |
395 | ||
396 | ||
397 | /** | |
398 | * Writes bytes. If blocking is set to <tt>true</tt>, a pool of temporary | |
399 | * {@link Selector} will be used to writes bytes. | |
400 | * @param byteBuffer The byteBuffer to write. | |
401 | * @param blocking <tt>true</tt> if a a pool of temporary Selector | |
402 | * is required to handle a blocking write. | |
403 | * @return number of bytes written | |
404 | * @throws java.io.IOException | |
405 | */ | |
406 | public long write(ByteBuffer byteBuffer, boolean blocking) throws IOException { | |
407 | 1414 | if (!isConnected){ |
408 | 0 | throw new NotYetConnectedException(); |
409 | } | |
410 | ||
411 | 1414 | if (blocking){ |
412 | 313 | return OutputWriter.flushChannel(socketChannel,byteBuffer); |
413 | } else { | |
414 | ||
415 | 1101 | if (callbackHandler == null){ |
416 | 0 | throw new IllegalStateException |
417 | ("Non blocking write needs a CallbackHandler"); | |
418 | } | |
419 | ||
420 | 1101 | SelectionKey key = socketChannel.keyFor(selectorHandler.getSelector()); |
421 | 1101 | int nWrite = 1; |
422 | 1101 | int totalWriteBytes = 0; |
423 | 2202 | while (nWrite > 0 && byteBuffer.hasRemaining()){ |
424 | 1101 | nWrite = socketChannel.write(byteBuffer); |
425 | 1101 | totalWriteBytes += nWrite; |
426 | } | |
427 | ||
428 | 1101 | if (totalWriteBytes == 0 && byteBuffer.hasRemaining()){ |
429 | 0 | selectorHandler.register(key,SelectionKey.OP_WRITE); |
430 | } | |
431 | 1101 | return totalWriteBytes; |
432 | } | |
433 | } | |
434 | ||
435 | ||
436 | /** | |
437 | * {@inheritDoc} | |
438 | */ | |
439 | public void readFromAsyncQueue(ByteBuffer buffer, | |
440 | AsyncReadCallbackHandler callbackHandler) throws IOException { | |
441 | 100 | readFromAsyncQueue(buffer, callbackHandler, null); |
442 | 100 | } |
443 | ||
444 | /** | |
445 | * {@inheritDoc} | |
446 | */ | |
447 | public void readFromAsyncQueue(ByteBuffer buffer, | |
448 | AsyncReadCallbackHandler callbackHandler, | |
449 | AsyncReadCondition condition) throws IOException { | |
450 | 100 | readFromAsyncQueue(buffer, callbackHandler, condition, null); |
451 | 100 | } |
452 | ||
453 | /** | |
454 | * {@inheritDoc} | |
455 | */ | |
456 | public void readFromAsyncQueue(ByteBuffer buffer, | |
457 | AsyncReadCallbackHandler callbackHandler, | |
458 | AsyncReadCondition condition, | |
459 | AsyncQueueDataProcessor readPostProcessor) throws IOException { | |
460 | 100 | selectorHandler.getAsyncQueueReader().read( |
461 | socketChannel.keyFor(selectorHandler.getSelector()), buffer, | |
462 | callbackHandler, condition, readPostProcessor); | |
463 | 100 | } |
464 | ||
465 | ||
466 | /** | |
467 | * {@inheritDoc} | |
468 | */ | |
469 | public void writeToAsyncQueue(ByteBuffer buffer) throws IOException { | |
470 | 200000 | writeToAsyncQueue(buffer, null); |
471 | 200000 | } |
472 | ||
473 | ||
474 | /** | |
475 | * {@inheritDoc} | |
476 | */ | |
477 | public void writeToAsyncQueue(ByteBuffer buffer, | |
478 | AsyncWriteCallbackHandler callbackHandler) throws IOException { | |
479 | 200000 | writeToAsyncQueue(buffer, callbackHandler, null); |
480 | 200000 | } |
481 | ||
482 | ||
483 | /** | |
484 | * {@inheritDoc} | |
485 | */ | |
486 | public void writeToAsyncQueue(ByteBuffer buffer, | |
487 | AsyncWriteCallbackHandler callbackHandler, | |
488 | AsyncQueueDataProcessor writePreProcessor) throws IOException { | |
489 | 200000 | writeToAsyncQueue(buffer, callbackHandler, writePreProcessor, false); |
490 | 200000 | } |
491 | ||
492 | ||
493 | /** | |
494 | * {@inheritDoc} | |
495 | */ | |
496 | public void writeToAsyncQueue(ByteBuffer buffer, | |
497 | AsyncWriteCallbackHandler callbackHandler, | |
498 | AsyncQueueDataProcessor writePreProcessor, | |
499 | boolean isCloneByteBuffer) throws IOException { | |
500 | 200000 | selectorHandler.getAsyncQueueWriter().write( |
501 | socketChannel.keyFor(selectorHandler.getSelector()), buffer, | |
502 | callbackHandler, writePreProcessor, isCloneByteBuffer); | |
503 | 200000 | } |
504 | ||
505 | ||
506 | /** | |
507 | * {@inheritDoc} | |
508 | */ | |
509 | public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer) | |
510 | throws IOException { | |
511 | 0 | writeToAsyncQueue(dstAddress, buffer, null); |
512 | 0 | } |
513 | ||
514 | ||
515 | /** | |
516 | * {@inheritDoc} | |
517 | */ | |
518 | public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, | |
519 | AsyncWriteCallbackHandler callbackHandler) throws IOException { | |
520 | 0 | writeToAsyncQueue(dstAddress, buffer, callbackHandler, null); |
521 | 0 | } |
522 | ||
523 | ||
524 | /** | |
525 | * {@inheritDoc} | |
526 | */ | |
527 | public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, | |
528 | AsyncWriteCallbackHandler callbackHandler, | |
529 | AsyncQueueDataProcessor writePreProcessor) throws IOException { | |
530 | 0 | writeToAsyncQueue(dstAddress, buffer, callbackHandler, writePreProcessor, |
531 | false); | |
532 | 0 | } |
533 | ||
534 | ||
535 | /** | |
536 | * {@inheritDoc} | |
537 | */ | |
538 | public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, | |
539 | AsyncWriteCallbackHandler callbackHandler, | |
540 | AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer) | |
541 | throws IOException { | |
542 | 0 | selectorHandler.getAsyncQueueWriter().write( |
543 | socketChannel.keyFor(selectorHandler.getSelector()), dstAddress, | |
544 | buffer, callbackHandler, writePreProcessor, isCloneByteBuffer); | |
545 | 0 | } |
546 | ||
547 | ||
548 | /** | |
549 | * Close the underlying connection. | |
550 | */ | |
551 | public void close() throws IOException{ | |
552 | 51 | if (socketChannel != null){ |
553 | 51 | if (selectorHandler != null){ |
554 | 51 | SelectionKey key = |
555 | socketChannel.keyFor(selectorHandler.getSelector()); | |
556 | ||
557 | 51 | if (key == null) return; |
558 | ||
559 | 51 | selectorHandler.getSelectionKeyHandler().cancel(key); |
560 | 51 | } else { |
561 | 0 | socketChannel.close(); |
562 | } | |
563 | } | |
564 | ||
565 | 51 | if (controller != null && isStandalone){ |
566 | 20 | controller.stop(); |
567 | 20 | controller = null; |
568 | } | |
569 | ||
570 | 51 | isStandalone = false; |
571 | 51 | isConnected = false; |
572 | 51 | connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; |
573 | 51 | } |
574 | ||
575 | ||
576 | /** | |
577 | * Finish handling the OP_CONNECT interest ops. | |
578 | * @param key - a {@link SelectionKey} | |
579 | */ | |
580 | public void finishConnect(SelectionKey key) throws IOException{ | |
581 | try{ | |
582 | 52 | if (Controller.logger().isLoggable(Level.FINE)) { |
583 | 0 | Controller.logger().log(Level.FINE, "Finish connect"); |
584 | } | |
585 | ||
586 | 52 | socketChannel = (SocketChannel)key.channel(); |
587 | 52 | socketChannel.finishConnect(); |
588 | 52 | isConnected = socketChannel.isConnected(); |
589 | 52 | configureChannel(socketChannel); |
590 | ||
591 | 52 | if (Controller.logger().isLoggable(Level.FINE)) { |
592 | 0 | Controller.logger().log(Level.FINE, "isConnected: " + isConnected); |
593 | } | |
594 | 0 | } catch (IOException ex){ |
595 | 0 | throw ex; |
596 | } finally { | |
597 | 52 | isConnectedLatch.countDown(); |
598 | 52 | } |
599 | 52 | } |
600 | ||
601 | ||
602 | /** | |
603 | * {@inheritDoc} | |
604 | */ | |
605 | public void configureChannel(SelectableChannel channel) throws IOException{ | |
606 | 52 | Socket socket = ((SocketChannel) channel).socket(); |
607 | ||
608 | try{ | |
609 | 52 | if(linger >= 0 ) { |
610 | 0 | socket.setSoLinger( true, linger); |
611 | } | |
612 | 0 | } catch (SocketException ex){ |
613 | 0 | Controller.logger().log(Level.WARNING, |
614 | "setSoLinger exception ",ex); | |
615 | 52 | } |
616 | ||
617 | try{ | |
618 | 52 | socket.setTcpNoDelay(tcpNoDelay); |
619 | 0 | } catch (SocketException ex){ |
620 | 0 | Controller.logger().log(Level.WARNING, |
621 | "setTcpNoDelay exception ",ex); | |
622 | 52 | } |
623 | ||
624 | try{ | |
625 | 52 | socket.setReuseAddress(reuseAddress); |
626 | 0 | } catch (SocketException ex){ |
627 | 0 | Controller.logger().log(Level.WARNING, |
628 | "setReuseAddress exception ",ex); | |
629 | 52 | } |
630 | 52 | } |
631 | ||
632 | ||
633 | /** | |
634 | * A token decribing the protocol supported by an implementation of this | |
635 | * interface | |
636 | * @return this {@link ConnectorHandler}'s protocol | |
637 | */ | |
638 | public Controller.Protocol protocol(){ | |
639 | 83 | return Controller.Protocol.TCP; |
640 | } | |
641 | ||
642 | ||
643 | /** | |
644 | * Is the underlying SocketChannel connected. | |
645 | * @return <tt>true</tt> if connected, otherwise <tt>false</tt> | |
646 | */ | |
647 | public boolean isConnected(){ | |
648 | 10 | return isConnected && socketChannel.isOpen(); |
649 | } | |
650 | ||
651 | ||
652 | /** | |
653 | * Return the {@link Controller} | |
654 | * @return the {@link Controller} | |
655 | */ | |
656 | public Controller getController() { | |
657 | 0 | return controller; |
658 | } | |
659 | ||
660 | ||
661 | /** | |
662 | * Set the {@link Controller} to use with this instance. | |
663 | * @param controller the {@link Controller} to use with this instance. | |
664 | */ | |
665 | public void setController(Controller controller) { | |
666 | 64 | this.controller = controller; |
667 | 64 | } |
668 | ||
669 | ||
670 | /** | |
671 | * Return the current {@link SocketChannel} used. | |
672 | * @return the current {@link SocketChannel} used. | |
673 | */ | |
674 | public SelectableChannel getUnderlyingChannel() { | |
675 | 9 | return socketChannel; |
676 | } | |
677 | ||
678 | ||
679 | /** | |
680 | * Set the {@link SocketChannel}. | |
681 | * @param the {@link SocketChannel} to use. | |
682 | */ | |
683 | protected void setUnderlyingChannel(SocketChannel socketChannel){ | |
684 | 20 | this.socketChannel = socketChannel; |
685 | 20 | } |
686 | ||
687 | ||
688 | /** | |
689 | * Return the {@link CallbackHandler}. | |
690 | * @return the {@link CallbackHandler}. | |
691 | */ | |
692 | public CallbackHandler getCallbackHandler() { | |
693 | 0 | return callbackHandler; |
694 | } | |
695 | ||
696 | ||
697 | /** | |
698 | * Set the {@link CallbackHandler}. | |
699 | * @param callbackHandler the {@link CallbackHandler}. | |
700 | */ | |
701 | public void setCallbackHandler(CallbackHandler callbackHandler) { | |
702 | 9 | this.callbackHandler = callbackHandler; |
703 | 9 | } |
704 | ||
705 | ||
706 | /** | |
707 | * Return the associated {@link SelectorHandler}. | |
708 | * @return the associated {@link SelectorHandler}. | |
709 | */ | |
710 | public TCPSelectorHandler getSelectorHandler() { | |
711 | 9 | return selectorHandler; |
712 | } | |
713 | ||
714 | ||
715 | /** | |
716 | * Return the tcpNoDelay value used by the underlying accepted Sockets. | |
717 | * | |
718 | * Also see setTcpNoDelay(boolean tcpNoDelay) | |
719 | */ | |
720 | public boolean isTcpNoDelay() { | |
721 | 0 | return tcpNoDelay; |
722 | } | |
723 | ||
724 | ||
725 | /** | |
726 | * Enable (true) or disable (false) the underlying Socket's | |
727 | * tcpNoDelay. | |
728 | * | |
729 | * Default value for tcpNoDelay is disabled (set to false). | |
730 | * | |
731 | * Disabled by default since enabling tcpNoDelay for most applications | |
732 | * can cause packets to appear to arrive in a fragmented fashion where it | |
733 | * takes multiple OP_READ events (i.e. multiple calls to read small | |
734 | * messages). The common behaviour seen when this occurs is that often times | |
735 | * a small number of bytes, as small as 1 byte at a time is read per OP_READ | |
736 | * event dispatch. This results in a large number of system calls to | |
737 | * read(), system calls to enable and disable interest ops and potentially | |
738 | * a large number of thread context switches between a thread doing the | |
739 | * Select(ing) and a worker thread doing the read. | |
740 | * | |
741 | * The Connector side should also set tcpNoDelay the same as it is set here | |
742 | * whenever possible. | |
743 | */ | |
744 | public void setTcpNoDelay(boolean tcpNoDelay) { | |
745 | 0 | this.tcpNoDelay = tcpNoDelay; |
746 | 0 | } |
747 | ||
748 | ||
749 | /** | |
750 | * @see java.net.Socket#setLinger() | |
751 | * @return the linger value. | |
752 | */ | |
753 | public int getLinger() { | |
754 | 0 | return linger; |
755 | } | |
756 | ||
757 | ||
758 | /** | |
759 | * @see java.net.Socket#setLinger() | |
760 | */ | |
761 | public void setLinger(int linger) { | |
762 | 0 | this.linger = linger; |
763 | 0 | } |
764 | ||
765 | ||
766 | /** | |
767 | * Get TCP channel connection timeout in milliseconds | |
768 | * @return TCP channel connection timeout in milliseconds | |
769 | */ | |
770 | public int getConnectionTimeout() { | |
771 | 0 | return connectionTimeout; |
772 | } | |
773 | ||
774 | ||
775 | /** | |
776 | * Set TCP channel connection timeout in milliseconds | |
777 | * @param connectionTimeout TCP channel connection timeout in milliseconds | |
778 | */ | |
779 | public void setConnectionTimeout(int connectionTimeout) { | |
780 | 0 | this.connectionTimeout = connectionTimeout; |
781 | 0 | } |
782 | ||
783 | ||
784 | /** | |
785 | * @see java.net.Socket#setReuseAddress() | |
786 | */ | |
787 | public boolean isReuseAddress() { | |
788 | 0 | return reuseAddress; |
789 | } | |
790 | ||
791 | ||
792 | /** | |
793 | * @see java.net.Socket#setReuseAddress() | |
794 | */ | |
795 | public void setReuseAddress(boolean reuseAddress) { | |
796 | 0 | this.reuseAddress = reuseAddress; |
797 | 0 | } |
798 | } |