Index: test/java/com/sun/grizzly/RecycledChannelTest.java =================================================================== --- test/java/com/sun/grizzly/RecycledChannelTest.java (revision 0) +++ test/java/com/sun/grizzly/RecycledChannelTest.java (revision 0) @@ -0,0 +1,143 @@ +package com.sun.grizzly; + +import junit.framework.TestCase; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.net.Socket; +import java.net.BindException; +import java.net.SocketAddress; +import java.io.IOException; +import java.util.logging.Level; + +import com.sun.grizzly.utils.ControllerUtils; + +/** + * @author Bongjae Chang + * @date 2009. 6. 15 + */ +public class RecycledChannelTest extends TestCase { + + private static final int PORT = 17520; + private static final int SLEEP_TIME = 3000; // ms + + private final InetAddress localInetAddress; + private final InetSocketAddress localInetSocketAddress; + + public RecycledChannelTest() throws UnknownHostException { + localInetAddress = InetAddress.getLocalHost(); + localInetSocketAddress = new InetSocketAddress( localInetAddress, PORT ); + } + + public void testSimpleTCPConnect() throws IOException { + final Controller controller = new Controller(); + SelectorHandler selectorHandler = new TCPSelectorHandler(); + ( (TCPSelectorHandler)selectorHandler ).setPort( PORT ); + ( (TCPSelectorHandler)selectorHandler ).setInet( localInetAddress ); + ( (TCPSelectorHandler)selectorHandler ).setReuseAddress( false ); + controller.addSelectorHandler( selectorHandler ); + + Socket clientSocket = null; + try { + ControllerUtils.startController( controller ); + + boolean result = false; + Controller.logger().log( Level.INFO, "Try to get a connector handler with the local address which already has been bound." ); + try { + tryToConnect( controller, Controller.Protocol.TCP, null, localInetSocketAddress ); + } catch( IOException ie ) { + if( ie instanceof BindException ) { + result = true; + Controller.logger().log( Level.INFO, "Got the expected BindException." ); + assertTrue( "Got the expected BindException.", true ); + } else { + Controller.logger().log( Level.INFO, "Got the unexpected error.", ie ); + assertTrue( "Got the unexpected error.", false ); + } + } + if( !result ) + assertTrue( "The BindException was expected.", false ); + + Controller.logger().log( Level.INFO, "Try to connect the local server." ); + clientSocket = new Socket( localInetAddress, PORT ); + Controller.logger().log( Level.INFO, "Wait for " + SLEEP_TIME + "(ms)" ); + try { + Thread.sleep( SLEEP_TIME ); + } catch( InterruptedException e ) { + } + + Controller.logger().log( Level.INFO, "Try to get a connector handler with the local address which already has been bound again." ); + try { + tryToConnect( controller, Controller.Protocol.TCP, clientSocket.getLocalSocketAddress(), localInetSocketAddress ); + } catch( IOException ie ) { + Controller.logger().log( Level.INFO, "Got the unexpected error.", ie ); + assertTrue( "Got the unexpected error.", false ); + throw ie; + } + } finally { + if( clientSocket != null ) { + try { + clientSocket.shutdownInput(); + } catch( IOException e ) { + } + try { + clientSocket.shutdownOutput(); + } catch( IOException e ) { + } + try { + clientSocket.close(); + } catch( IOException e ) { + } + } + controller.stop(); + } + } + + public void testSimpleUDPConnect() throws IOException { + final Controller controller = new Controller(); + SelectorHandler selectorHandler = new UDPSelectorHandler(); + ( (UDPSelectorHandler)selectorHandler ).setPort( PORT ); + ( (UDPSelectorHandler)selectorHandler ).setInet( localInetAddress ); + ( (UDPSelectorHandler)selectorHandler ).setReuseAddress( false ); + controller.addSelectorHandler( selectorHandler ); + + try { + ControllerUtils.startController( controller ); + + Controller.logger().log( Level.INFO, "Try to get a connector handler with the local address which already has been bound." ); + try { + tryToConnect( controller, Controller.Protocol.UDP, localInetSocketAddress, localInetSocketAddress ); + } catch( IOException ie ) { + Controller.logger().log( Level.INFO, "Got the unexpected error.", ie ); + assertTrue( "Got the unexpected error.", false ); + throw ie; + } + } finally { + controller.stop(); + } + } + + private void tryToConnect( Controller controller, Controller.Protocol protocol, SocketAddress remote, SocketAddress local ) throws IOException { + ConnectorHandler connectorHandler = null; + try { + connectorHandler = controller.acquireConnectorHandler( protocol ); + connectorHandler.connect( remote, local ); + } finally { + if( connectorHandler != null ) { + try { + connectorHandler.close(); + } catch( IOException e ) { + e.printStackTrace(); + } + controller.releaseConnectorHandler( connectorHandler ); + } + } + } + + public static void main( String[] args ) throws IOException { + RecycledChannelTest test = new RecycledChannelTest(); + test.testSimpleTCPConnect(); + test.testSimpleUDPConnect(); + } +} Index: main/java/com/sun/grizzly/UDPSelectorHandler.java =================================================================== --- main/java/com/sun/grizzly/UDPSelectorHandler.java (revision 3298) +++ main/java/com/sun/grizzly/UDPSelectorHandler.java (working copy) @@ -38,11 +38,9 @@ package com.sun.grizzly; -import com.sun.grizzly.SelectionKeyOP.ConnectSelectionKeyOP; import com.sun.grizzly.async.UDPAsyncQueueReader; import com.sun.grizzly.async.UDPAsyncQueueWriter; import com.sun.grizzly.util.Copyable; -import com.sun.grizzly.util.State; import java.io.IOException; import java.net.BindException; import java.net.DatagramSocket; @@ -138,9 +136,9 @@ ConcurrentQueueDelegateCIH( getConnectorInstanceHandlerDelegate()); - datagramChannel = DatagramChannel.open(); selector = Selector.open(); if (role != Role.CLIENT){ + datagramChannel = DatagramChannel.open(); datagramSocket = datagramChannel.socket(); datagramSocket.setReuseAddress(reuseAddress); if (inet == null) @@ -159,35 +157,22 @@ } } - - /** - * Register a CallBackHandler to this Selector. - * - * @param remoteAddress remote address to connect - * @param localAddress local address to bin - * @param callbackHandler {@link CallbackHandler} - * @throws java.io.IOException - */ @Override - protected void connect(SocketAddress remoteAddress, SocketAddress localAddress, - CallbackHandler callbackHandler) throws IOException { + protected SelectableChannel getUsedSelectableChannel( SocketAddress remoteAddress ) { + if( role != Role.CLIENT && datagramChannel != null && datagramSocket != null ) + return datagramChannel; + else + return null; + } + @Override + protected SelectableChannel getNewSelectableChannel( SocketAddress localAddress ) throws IOException { DatagramChannel newDatagramChannel = DatagramChannel.open(); newDatagramChannel.socket().setReuseAddress(reuseAddress); - if (localAddress != null) { + if (localAddress != null) newDatagramChannel.socket().bind(localAddress); - } - newDatagramChannel.configureBlocking(false); - - SelectionKeyOP.ConnectSelectionKeyOP keyOP = new ConnectSelectionKeyOP(); - - keyOP.setOp(SelectionKey.OP_CONNECT); - keyOP.setChannel(newDatagramChannel); - keyOP.setRemoteAddress(remoteAddress); - keyOP.setCallbackHandler(callbackHandler); - opToRegister.offer(keyOP); - selector.wakeup(); + return newDatagramChannel; } /** @@ -196,19 +181,19 @@ @Override protected void onConnectOp(Context ctx, SelectionKeyOP.ConnectSelectionKeyOP selectionKeyOp) throws IOException { - DatagramChannel newDatagramChannel = (DatagramChannel) selectionKeyOp.getChannel(); + DatagramChannel datagramChannel = (DatagramChannel) selectionKeyOp.getChannel(); SocketAddress remoteAddress = selectionKeyOp.getRemoteAddress(); CallbackHandler callbackHandler = selectionKeyOp.getCallbackHandler(); CallbackHandlerSelectionKeyAttachment attachment = new CallbackHandlerSelectionKeyAttachment(callbackHandler); - SelectionKey key = newDatagramChannel.register(selector, + SelectionKey key = datagramChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, attachment); attachment.associateKey(key); try { - newDatagramChannel.connect(remoteAddress); + datagramChannel.connect(remoteAddress); } catch(Exception e) { if (logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "Exception occured when tried to connect datagram channel", e); @@ -224,44 +209,26 @@ */ @Override public void shutdown(){ - // If shutdown was called for this SelectorHandler - if (isShutDown.getAndSet(true)) return; - - stateHolder.setState(State.STOPPED); - + super.shutdown(); try { - if ( datagramSocket != null ) + if ( datagramSocket != null ) { datagramSocket.close(); + datagramSocket = null; + } } catch (Throwable ex){ Controller.logger().log(Level.SEVERE, "closeSocketException",ex); } try{ - if ( datagramChannel != null) + if ( datagramChannel != null) { datagramChannel.close(); + datagramChannel = null; + } } catch (Throwable ex){ Controller.logger().log(Level.SEVERE, "closeSocketException",ex); } - - try{ - if ( selector != null) - selector.close(); - } catch (Throwable ex){ - Controller.logger().log(Level.SEVERE, - "closeSocketException",ex); - } - - if (asyncQueueReader != null) { - asyncQueueReader.close(); - asyncQueueReader = null; - } - - if (asyncQueueWriter != null) { - asyncQueueWriter.close(); - asyncQueueWriter = null; - } } @@ -385,6 +352,8 @@ @Override public void closeChannel(SelectableChannel channel) { + if( datagramChannel == channel ) + return; try{ channel.close(); } catch (IOException ex){ Index: main/java/com/sun/grizzly/TCPSelectorHandler.java =================================================================== --- main/java/com/sun/grizzly/TCPSelectorHandler.java (revision 3298) +++ main/java/com/sun/grizzly/TCPSelectorHandler.java (working copy) @@ -72,6 +72,8 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -274,7 +276,12 @@ private long lastSpinTimestamp; private int emptySpinCounter; - + + private final ConcurrentHashMap acceptedSocketChannelMap = + new ConcurrentHashMap(); + private final CopyOnWriteArrayList recycledSocketChannels = + new CopyOnWriteArrayList(); + public TCPSelectorHandler(){ this(Role.CLIENT_SERVER); } @@ -397,15 +404,13 @@ serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + + serverSocket.setSoTimeout(serverTimeout); } ctx.getController().notifyReady(); } catch (SocketException ex){ throw new BindException(ex.getMessage() + ": " + port + "=" + this); } - - if (role != Role.CLIENT){ - serverSocket.setSoTimeout(serverTimeout); - } } /** @@ -531,7 +536,7 @@ }else{ ((Runnable)obj).run(); } - }catch(Throwable t){ + }catch(Throwable t){ logger.log(Level.FINEST, "doExecutePendiongIO failed.", t); } } @@ -567,7 +572,7 @@ * Register a SelectionKey to this Selector.
* Storing each interest type in different queues removes the need of wrapper (SelectionKeyOP) * while lowering thread contention due to the load is spread out on different queues. - * + * * @param key * @param ops */ @@ -595,11 +600,11 @@ opToRegister.offer(new SelectionKeyOP(null,ops,channel)); wakeUp(); } - + /** * Workaround for NIO issue 6524172 */ - private void wakeUp(){ + protected void wakeUp(){ try{ selector.wakeup(); } catch (NullPointerException ne){ @@ -617,25 +622,52 @@ */ protected void connect(SocketAddress remoteAddress, SocketAddress localAddress, CallbackHandler callbackHandler) throws IOException { - - SocketChannel socketChannel = SocketChannel.open(); - socketChannel.socket().setReuseAddress(reuseAddress); - if (localAddress != null) { - socketChannel.socket().bind(localAddress); - } - - socketChannel.configureBlocking(false); - + SelectableChannel selectableChannel = getSelectableChannel( remoteAddress, localAddress ); SelectionKeyOP.ConnectSelectionKeyOP keyOP = new ConnectSelectionKeyOP(); - keyOP.setOp(SelectionKey.OP_CONNECT); - keyOP.setChannel(socketChannel); + keyOP.setChannel(selectableChannel); keyOP.setRemoteAddress(remoteAddress); keyOP.setCallbackHandler(callbackHandler); opToRegister.offer(keyOP); wakeUp(); } + private SelectableChannel getSelectableChannel( SocketAddress remoteAddress, SocketAddress localAddress ) throws IOException { + SelectableChannel selectableChannel = null; + if( localAddress != null ) { + if( inet != null && localAddress instanceof InetSocketAddress ) { + InetSocketAddress inetSocketAddress = (InetSocketAddress)localAddress; + if( inet.equals( inetSocketAddress.getAddress() ) ) + selectableChannel = getUsedSelectableChannel( remoteAddress ); + } + } else { + selectableChannel = getUsedSelectableChannel( remoteAddress ); + } + if( selectableChannel == null ) + selectableChannel = getNewSelectableChannel( localAddress ); + return selectableChannel; + } + + protected SelectableChannel getUsedSelectableChannel( SocketAddress remoteAddress ) { + if( remoteAddress != null ) { + SocketChannel acceptedSocketChannel = acceptedSocketChannelMap.get( remoteAddress ); + if( acceptedSocketChannel != null ) + recycledSocketChannels.add( acceptedSocketChannel ); + return acceptedSocketChannel; + } else { + return null; + } + } + + protected SelectableChannel getNewSelectableChannel( SocketAddress localAddress ) throws IOException { + SocketChannel newSocketChannel = SocketChannel.open(); + newSocketChannel.socket().setReuseAddress( reuseAddress ); + if( localAddress != null ) + newSocketChannel.socket().bind( localAddress ); + newSocketChannel.configureBlocking(false); + return newSocketChannel; + } + /** * {@inheritDoc} */ @@ -691,16 +723,20 @@ } try{ - if (serverSocket != null) + if (serverSocket != null) { serverSocket.close(); + serverSocket = null; + } } catch (Throwable ex){ Controller.logger().log(Level.SEVERE, "serverSocket.close",ex); } try{ - if (serverSocketChannel != null) + if (serverSocketChannel != null) { serverSocketChannel.close(); + serverSocketChannel = null; + } } catch (Throwable ex){ Controller.logger().log(Level.SEVERE, "serverSocketChannel.close",ex); @@ -727,6 +763,8 @@ readOpToRegister.clear(); writeOpToRegister.clear(); opToRegister.clear(); + acceptedSocketChannelMap.clear(); + recycledSocketChannels.clear(); attributes = null; } @@ -735,7 +773,16 @@ * {@inheritDoc} */ public SelectableChannel acceptWithoutRegistration(SelectionKey key) throws IOException { - return ((ServerSocketChannel) key.channel()).accept(); + SocketChannel acceptedSocketChannel = ((ServerSocketChannel) key.channel()).accept(); + if( acceptedSocketChannel != null ) { + SocketAddress remoteSocketAddress = null; + Socket acceptedSocket = acceptedSocketChannel.socket(); + if( acceptedSocket != null ) + remoteSocketAddress = acceptedSocket.getRemoteSocketAddress(); + if( remoteSocketAddress != null ) + acceptedSocketChannelMap.put( remoteSocketAddress, acceptedSocketChannel ); + } + return acceptedSocketChannel; } /** @@ -845,7 +892,7 @@ // Added because of incompatibility with Grizzly 1.6.0 context.setSelectorHandler(this); - CallbackHandlerContextTask task = + CallbackHandlerContextTask task = context.getCallbackHandlerContextTask(callbackHandler); boolean isRunInSeparateThread = true; @@ -1224,7 +1271,15 @@ public void closeChannel(SelectableChannel channel) { // channel could be either SocketChannel or ServerSocketChannel if (channel instanceof SocketChannel) { - Socket socket = ((SocketChannel) channel).socket(); + SocketChannel socketChannel = (SocketChannel)channel; + if( recycledSocketChannels.remove( socketChannel ) ) + return; + Socket socket = socketChannel.socket(); + SocketAddress remoteSocketAddress = null; + if( socket != null ) + remoteSocketAddress = socket.getRemoteSocketAddress(); + if( remoteSocketAddress != null ) + acceptedSocketChannelMap.remove( remoteSocketAddress ); try { if (!socket.isInputShutdown()) socket.shutdownInput(); @@ -1268,14 +1323,14 @@ * @return {@link Context} */ protected NIOContext pollContext(final Context serverContext, - final SelectionKey key, final Context.OpType opType) { + final SelectionKey key, final Context.OpType opType) { Controller c = serverContext.getController(); ProtocolChain protocolChain = instanceHandler != null ? instanceHandler.poll() : c.getProtocolChainInstanceHandler().poll(); final NIOContext context = (NIOContext)c.pollContext(); - c.configureContext(key, opType, context, this); + c.configureContext(key, opType, context, this); context.setProtocolChain(protocolChain); return context; } @@ -1379,7 +1434,7 @@ * {@inheritDoc} */ public void resetSpinCounter(){ - emptySpinCounter = 0; + emptySpinCounter = 0; } /**