dev@grizzly.java.net

Re: About recycling the server channel in XXXSelectorHandler

From: Bongjae Chang <carryel_at_korea.com>
Date: Thu, 18 Jun 2009 22:05:55 +0900

Hi,

I created new SelectorHandler like ReusableTCP|UDPSelectorHandler and SelectorHanderFactory#createSelectorHandler( Protocol, boolean reusable ).

And TCP|UDPSelectorHandler was modified a little bit. Of course, original logic of TCP|UDPSelectorHandler was preserved.

Could you review this again?

Thanks.
--
Bongjae Chang


----- Original Message -----
From: "Bongjae Chang" <carryel_at_korea.com>
To: <dev_at_grizzly.dev.java.net>
Sent: Tuesday, June 16, 2009 10:24 PM
Subject: Re: About recycling the server channel in XXXSelectorHandler


> Hi Jeanfrancois,
>
> I agree with you completely.
>
> I like (2), too.
>
> So I will try to modify the proposed patch and reply it again.
>
> Thanks.
>
> --
> Bongjae Chang
>
>
> ----- Original Message -----
> From: "Jeanfrancois Arcand" <Jeanfrancois.Arcand_at_Sun.COM>
> To: <dev_at_grizzly.dev.java.net>
> Sent: Tuesday, June 16, 2009 1:22 AM
> Subject: Re: About recycling the server channel in XXXSelectorHandler
>
>
>>
>> Salut,
>> Bongjae Chang wrote:
>>> Hi,
>>>
>>> I wrote the proposed patch about this issue experimentally.
>>>
>>> In UDP, the patch is simple because it overrides some TCPSelectorHandler's methods.
>>>
>>> But in TCP, if only accepted socket channel existed and the socket's remote address was equal to each other(ConnectorHandler's remote address), I recycled it.
>>>
>>> I used a Map and a List for caching accepted socket channels and client socket channels which ConnectorHandler used for connecting in TCPSelectorHandler.
>>>
>>> But frankly speaking, I didn't like it completely and I didn't find any better ideas in TCP.
>>
>> It looks OK except I'm afraid of a little performance regression due to
>> the Concurrent Map used. Also I think calling
>>
>> acceptedSocket.getRemoteSocketAddress();
>>
>> for every request may introduce performance regression as well. We need
>> to test, but I do think it will be a hit.
>>
>>
>>>
>>> Once, I attached the patch and a simple unit test case.
>>>
>>> When I tested it, it worked well and all unit tests passed locally.
>>>
>>> But I think that more tests needed and it should be reviewed carefully because any regressions which I have not thought could exist.
>>>
>>> I look forward to your feedback or any ideas.
>>
>> see inline
>>
>> Since we never got request for such mechanism, I think we should explore
>> the possible solutions offered:
>>
>> * (1) Apply the current patch
>> * (2) Create new SelectorHandler like RecyclableTCP|UDPSelectorHandler
>> * (3) Don't enable the mechanism by default, use a property
>>
>> I would think (2) might be the cleaner solution. We can possibly add a
>> new Factory for creating SelectorHandler like:
>>
>> * SelectorHandlerFactory.new(Protocol, boolean recycled);
>>
>> which return the proper instance of SelectorHandler.
>>
>> What do you think?
>>
>> A+
>>
>> -Jeanfrancois
>>
>>
>>>
>>> Thanks!
>>>
>>> --
>>> Bongjae Chang
>>>
>>>
>>> ----- Original Message -----
>>> From: "Bongjae Chang" <carryel_at_korea.com>
>>> To: <dev_at_grizzly.dev.java.net>
>>> Sent: Saturday, June 13, 2009 5:26 PM
>>> Subject: Re: About recycling the server channel in XXXSelectorHandler
>>>
>>>
>>>> Hi Jeanfrancois,
>>>>
>>>> Jeanfrancois wrote:
>>>>> I like the idea. I still need to catch up on the "Strange behavior..."
>>>>> but if the above solution is implemented, would it fix Minoru issue? At
>>>>> least you discovered an issue with the BindingException (duplicated). So
>>>>> I'm all for fixing that.
>>>> Then, I will try to implement recycling the server channel once.
>>>>
>>>> I am not sure whether this solution will fix Minoru issue or not, but I think that at least it may be able to help the similar case with workaround.
>>>>
>>>> For fixing the issue with recycling the server channel, I think that Minoru application should be modified in order to avoid the duplicated binding situation.
>>>>
>>>> i.g. Should use only one Controller.
>>>>
>>>> So I also think that it is helpful for me that you catch up on the issue because I don't know how Grizzly can prevent a user from binding duplicated address completely.
>>>>
>>>> After finishing the impl and testing it, I will reply for being reviewed.
>>>>
>>>> Thanks!
>>>> --
>>>> Bongjae Chang
>>>>
>>>>
>>>> ----- Original Message -----
>>>> From: "Jeanfrancois Arcand" <Jeanfrancois.Arcand_at_Sun.COM>
>>>> To: <dev_at_grizzly.dev.java.net>
>>>> Sent: Saturday, June 13, 2009 5:02 AM
>>>> Subject: Re: About recycling the server channel in XXXSelectorHandler
>>>>
>>>>
>>>>> Salut,
>>>>>
>>>>> Bongjae Chang wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have seen Minoru's issue("Strange behavior of Grizzly") for several days.
>>>>>>
>>>>>> Meanwhile, I thought that sometimes it was possible that a user would
>>>>>> like to use the Grizzly for both server and client.
>>>>>>
>>>>>> Of course, XXXSelectorHandler( Role.CLIENT_SERVER ) is used for both
>>>>>> server and client purpose.
>>>>>>
>>>>>> See the following use case.
>>>>>>
>>>>>> ---
>>>>>> ..
>>>>>> XXXSelectorHandler selectorHandler = new XXXSelectorHandler(
>>>>>> Role.CLIENT_SERVER );
>>>>>> selectorHandler.setPort( port );
>>>>>> selectorHandler.setInet( localAddress );
>>>>>> controller.addSelectorHandler( selectorHandler );
>>>>>> ...
>>>>>> ConnectorHandler connectorHandler = controller.acquireConnectorHandler(
>>>>>> Controller.Protocol.XXX );
>>>>>> connectorHandler.connect( remoteSocketAddress, new InetSocketAddress(
>>>>>> localAddress, port ); ---- (1)
>>>>>> connectorHandler.write(...);
>>>>>> ...
>>>>>> ---
>>>>>>
>>>>>> At above (1), the following method is called finally. (TCP is similar to
>>>>>> UDP)
>>>>>>
>>>>>> In UDPSelectorHandler.java
>>>>>> ---
>>>>>> protected void connect(SocketAddress remoteAddress, SocketAddress
>>>>>> localAddress, CallbackHandler callbackHandler) throws IOException {
>>>>>> DatagramChannel newDatagramChannel = DatagramChannel.open();
>>>>>> newDatagramChannel.socket().setReuseAddress(reuseAddress);
>>>>>> if( localAddress != null ) {
>>>>>> newDatagramChannel.socket().bind(localAddress);
>>>>>> }
>>>>>> ...
>>>>>> }
>>>>>> ---
>>>>>>
>>>>>> If user made XXXSelectorHandler with Role.CLIENT_SERVER, and user used
>>>>>> setInet( localAddress ), binding operation will be duplicated because
>>>>>> XXXSelectorHandler#initSelector() makes the server channel which binds
>>>>>> the localAddress.
>>>>>>
>>>>>> If setReuseAddress set to be false, maybe user can't use the specific
>>>>>> address for sending through Controller#acquireConnectorHandler() and
>>>>>> ConnectorHandler#connect(remote, local). But user can use only the any
>>>>>> address like "0.0.0.0" for sending.
>>>>>>
>>>>>> If setReuseAddress set to be true, default is true, maybe at least above
>>>>>> case, there is no problem.
>>>>>>
>>>>>> But if user used one more Controller locally, unexpected result could be
>>>>>> occurred according to platform(See the "Strange behavior of Grizzly"
>>>>>> issue of dev mailing).
>>>>>>
>>>>>> So I think that it is better that Grizzly can provide a client with the
>>>>>> way of being able to use server's channel which already has been created.
>>>>>>
>>>>>> How about recycling the server channel in XXXSelectorHandler#connect()
>>>>>> if it exists?
>>>>> I like the idea. I still need to catch up on the "Strange behavior..."
>>>>> but if the above solution is implemented, would it fix Minoru issue? At
>>>>> least you discovered an issue with the BindingException (duplicated). So
>>>>> I'm all for fixing that.
>>>>>
>>>>>>
>>>>>> Then, could any problems be occurred?
>>>>> I don't think. On the contrary, I think it will help.
>>>>>
>>>>> Great work!
>>>>>
>>>>> -- Jeanfrancois
>>>>>
>>>>>>
>>>>>> Please advice me.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>
>>> Index: test/java/com/sun/grizzly/RecycledChannelTest.java
>>> ===================================================================
>>> --- test/java/com/sun/grizzly/RecycledChannelTest.java (revision 0)
>>> +++ test/java/com/sun/grizzly/RecycledChannelTest.java (revision 0)
>>> @@ -0,0 +1,143 @@
>>> +package com.sun.grizzly;
>>> +
>>> +import junit.framework.TestCase;
>>> +
>>> +import java.net.InetAddress;
>>> +import java.net.InetSocketAddress;
>>> +import java.net.UnknownHostException;
>>> +import java.net.Socket;
>>> +import java.net.BindException;
>>> +import java.net.SocketAddress;
>>> +import java.io.IOException;
>>> +import java.util.logging.Level;
>>> +
>>> +import com.sun.grizzly.utils.ControllerUtils;
>>> +
>>> +/**
>>> + * @author Bongjae Chang
>>> + * @date 2009. 6. 15
>>> + */
>>> +public class RecycledChannelTest extends TestCase {
>>> +
>>> + private static final int PORT = 17520;
>>> + private static final int SLEEP_TIME = 3000; // ms
>>> +
>>> + private final InetAddress localInetAddress;
>>> + private final InetSocketAddress localInetSocketAddress;
>>> +
>>> + public RecycledChannelTest() throws UnknownHostException {
>>> + localInetAddress = InetAddress.getLocalHost();
>>> + localInetSocketAddress = new InetSocketAddress( localInetAddress, PORT );
>>> + }
>>> +
>>> + public void testSimpleTCPConnect() throws IOException {
>>> + final Controller controller = new Controller();
>>> + SelectorHandler selectorHandler = new TCPSelectorHandler();
>>> + ( (TCPSelectorHandler)selectorHandler ).setPort( PORT );
>>> + ( (TCPSelectorHandler)selectorHandler ).setInet( localInetAddress );
>>> + ( (TCPSelectorHandler)selectorHandler ).setReuseAddress( false );
>>> + controller.addSelectorHandler( selectorHandler );
>>> +
>>> + Socket clientSocket = null;
>>> + try {
>>> + ControllerUtils.startController( controller );
>>> +
>>> + boolean result = false;
>>> + Controller.logger().log( Level.INFO, "Try to get a connector handler with the local address which already has been bound." );
>>> + try {
>>> + tryToConnect( controller, Controller.Protocol.TCP, null, localInetSocketAddress );
>>> + } catch( IOException ie ) {
>>> + if( ie instanceof BindException ) {
>>> + result = true;
>>> + Controller.logger().log( Level.INFO, "Got the expected BindException." );
>>> + assertTrue( "Got the expected BindException.", true );
>>> + } else {
>>> + Controller.logger().log( Level.INFO, "Got the unexpected error.", ie );
>>> + assertTrue( "Got the unexpected error.", false );
>>> + }
>>> + }
>>> + if( !result )
>>> + assertTrue( "The BindException was expected.", false );
>>> +
>>> + Controller.logger().log( Level.INFO, "Try to connect the local server." );
>>> + clientSocket = new Socket( localInetAddress, PORT );
>>> + Controller.logger().log( Level.INFO, "Wait for " + SLEEP_TIME + "(ms)" );
>>> + try {
>>> + Thread.sleep( SLEEP_TIME );
>>> + } catch( InterruptedException e ) {
>>> + }
>>> +
>>> + Controller.logger().log( Level.INFO, "Try to get a connector handler with the local address which already has been bound again." );
>>> + try {
>>> + tryToConnect( controller, Controller.Protocol.TCP, clientSocket.getLocalSocketAddress(), localInetSocketAddress );
>>> + } catch( IOException ie ) {
>>> + Controller.logger().log( Level.INFO, "Got the unexpected error.", ie );
>>> + assertTrue( "Got the unexpected error.", false );
>>> + throw ie;
>>> + }
>>> + } finally {
>>> + if( clientSocket != null ) {
>>> + try {
>>> + clientSocket.shutdownInput();
>>> + } catch( IOException e ) {
>>> + }
>>> + try {
>>> + clientSocket.shutdownOutput();
>>> + } catch( IOException e ) {
>>> + }
>>> + try {
>>> + clientSocket.close();
>>> + } catch( IOException e ) {
>>> + }
>>> + }
>>> + controller.stop();
>>> + }
>>> + }
>>> +
>>> + public void testSimpleUDPConnect() throws IOException {
>>> + final Controller controller = new Controller();
>>> + SelectorHandler selectorHandler = new UDPSelectorHandler();
>>> + ( (UDPSelectorHandler)selectorHandler ).setPort( PORT );
>>> + ( (UDPSelectorHandler)selectorHandler ).setInet( localInetAddress );
>>> + ( (UDPSelectorHandler)selectorHandler ).setReuseAddress( false );
>>> + controller.addSelectorHandler( selectorHandler );
>>> +
>>> + try {
>>> + ControllerUtils.startController( controller );
>>> +
>>> + Controller.logger().log( Level.INFO, "Try to get a connector handler with the local address which already has been bound." );
>>> + try {
>>> + tryToConnect( controller, Controller.Protocol.UDP, localInetSocketAddress, localInetSocketAddress );
>>> + } catch( IOException ie ) {
>>> + Controller.logger().log( Level.INFO, "Got the unexpected error.", ie );
>>> + assertTrue( "Got the unexpected error.", false );
>>> + throw ie;
>>> + }
>>> + } finally {
>>> + controller.stop();
>>> + }
>>> + }
>>> +
>>> + private void tryToConnect( Controller controller, Controller.Protocol protocol, SocketAddress remote, SocketAddress local ) throws IOException {
>>> + ConnectorHandler connectorHandler = null;
>>> + try {
>>> + connectorHandler = controller.acquireConnectorHandler( protocol );
>>> + connectorHandler.connect( remote, local );
>>> + } finally {
>>> + if( connectorHandler != null ) {
>>> + try {
>>> + connectorHandler.close();
>>> + } catch( IOException e ) {
>>> + e.printStackTrace();
>>> + }
>>> + controller.releaseConnectorHandler( connectorHandler );
>>> + }
>>> + }
>>> + }
>>> +
>>> + public static void main( String[] args ) throws IOException {
>>> + RecycledChannelTest test = new RecycledChannelTest();
>>> + test.testSimpleTCPConnect();
>>> + test.testSimpleUDPConnect();
>>> + }
>>> +}
>>> Index: main/java/com/sun/grizzly/UDPSelectorHandler.java
>>> ===================================================================
>>> --- main/java/com/sun/grizzly/UDPSelectorHandler.java (revision 3298)
>>> +++ main/java/com/sun/grizzly/UDPSelectorHandler.java (working copy)
>>> @@ -38,11 +38,9 @@
>>>
>>> package com.sun.grizzly;
>>>
>>> -import com.sun.grizzly.SelectionKeyOP.ConnectSelectionKeyOP;
>>> import com.sun.grizzly.async.UDPAsyncQueueReader;
>>> import com.sun.grizzly.async.UDPAsyncQueueWriter;
>>> import com.sun.grizzly.util.Copyable;
>>> -import com.sun.grizzly.util.State;
>>> import java.io.IOException;
>>> import java.net.BindException;
>>> import java.net.DatagramSocket;
>>> @@ -138,9 +136,9 @@
>>> ConcurrentQueueDelegateCIH(
>>> getConnectorInstanceHandlerDelegate());
>>>
>>> - datagramChannel = DatagramChannel.open();
>>> selector = Selector.open();
>>> if (role != Role.CLIENT){
>>> + datagramChannel = DatagramChannel.open();
>>> datagramSocket = datagramChannel.socket();
>>> datagramSocket.setReuseAddress(reuseAddress);
>>> if (inet == null)
>>> @@ -159,35 +157,22 @@
>>> }
>>> }
>>>
>>> -
>>> - /**
>>> - * Register a CallBackHandler to this Selector.
>>> - *
>>> - * @param remoteAddress remote address to connect
>>> - * @param localAddress local address to bin
>>> - * @param callbackHandler {_at_link CallbackHandler}
>>> - * @throws java.io.IOException
>>> - */
>>> @Override
>>> - protected void connect(SocketAddress remoteAddress, SocketAddress localAddress,
>>> - CallbackHandler callbackHandler) throws IOException {
>>
>> Can you explain why we need to remove that method? Is it because it is
>> never used? We should leave it there as some application might still use it.
>>
>>
>>> + protected SelectableChannel getUsedSelectableChannel( SocketAddress remoteAddress ) {
>>
>> Do we need to make a difference between getNew and getUsed? Since this
>> API is not exposed directly to the user, should we just have
>> getSelectableChannel() (like you did below for TCPSelectorHandler()).
>>
>> Remaining looks good.
>>
>> A+
>>
>> - jeanfrancois
>>
>>
>>> + if( role != Role.CLIENT && datagramChannel != null && datagramSocket != null )
>>> + return datagramChannel;
>>> + else
>>> + return null;
>>> + }
>>>
>>> + @Override
>>> + protected SelectableChannel getNewSelectableChannel( SocketAddress localAddress ) throws IOException {
>>> DatagramChannel newDatagramChannel = DatagramChannel.open();
>>> newDatagramChannel.socket().setReuseAddress(reuseAddress);
>>> - if (localAddress != null) {
>>> + if (localAddress != null)
>>> newDatagramChannel.socket().bind(localAddress);
>>> - }
>>> -
>>> newDatagramChannel.configureBlocking(false);
>>> -
>>> - SelectionKeyOP.ConnectSelectionKeyOP keyOP = new ConnectSelectionKeyOP();
>>> -
>>> - keyOP.setOp(SelectionKey.OP_CONNECT);
>>> - keyOP.setChannel(newDatagramChannel);
>>> - keyOP.setRemoteAddress(remoteAddress);
>>> - keyOP.setCallbackHandler(callbackHandler);
>>> - opToRegister.offer(keyOP);
>>> - selector.wakeup();
>>> + return newDatagramChannel;
>>> }
>>>
>>> /**
>>> @@ -196,19 +181,19 @@
>>> @Override
>>> protected void onConnectOp(Context ctx,
>>> SelectionKeyOP.ConnectSelectionKeyOP selectionKeyOp) throws IOException {
>>> - DatagramChannel newDatagramChannel = (DatagramChannel) selectionKeyOp.getChannel();
>>> + DatagramChannel datagramChannel = (DatagramChannel) selectionKeyOp.getChannel();
>>> SocketAddress remoteAddress = selectionKeyOp.getRemoteAddress();
>>> CallbackHandler callbackHandler = selectionKeyOp.getCallbackHandler();
>>>
>>> CallbackHandlerSelectionKeyAttachment attachment =
>>> new CallbackHandlerSelectionKeyAttachment(callbackHandler);
>>>
>>> - SelectionKey key = newDatagramChannel.register(selector,
>>> + SelectionKey key = datagramChannel.register(selector,
>>> SelectionKey.OP_READ | SelectionKey.OP_WRITE, attachment);
>>> attachment.associateKey(key);
>>>
>>> try {
>>> - newDatagramChannel.connect(remoteAddress);
>>> + datagramChannel.connect(remoteAddress);
>>> } catch(Exception e) {
>>> if (logger.isLoggable(Level.FINE)) {
>>> logger.log(Level.FINE, "Exception occured when tried to connect datagram channel", e);
>>> @@ -224,44 +209,26 @@
>>> */
>>> @Override
>>> public void shutdown(){
>>> - // If shutdown was called for this SelectorHandler
>>> - if (isShutDown.getAndSet(true)) return;
>>> -
>>> - stateHolder.setState(State.STOPPED);
>>> -
>>> + super.shutdown();
>>> try {
>>> - if ( datagramSocket != null )
>>> + if ( datagramSocket != null ) {
>>> datagramSocket.close();
>>> + datagramSocket = null;
>>> + }
>>> } catch (Throwable ex){
>>> Controller.logger().log(Level.SEVERE,
>>> "closeSocketException",ex);
>>> }
>>>
>>> try{
>>> - if ( datagramChannel != null)
>>> + if ( datagramChannel != null) {
>>> datagramChannel.close();
>>> + datagramChannel = null;
>>> + }
>>> } catch (Throwable ex){
>>> Controller.logger().log(Level.SEVERE,
>>> "closeSocketException",ex);
>>> }
>>> -
>>> - try{
>>> - if ( selector != null)
>>> - selector.close();
>>> - } catch (Throwable ex){
>>> - Controller.logger().log(Level.SEVERE,
>>> - "closeSocketException",ex);
>>> - }
>>> -
>>> - if (asyncQueueReader != null) {
>>> - asyncQueueReader.close();
>>> - asyncQueueReader = null;
>>> - }
>>> -
>>> - if (asyncQueueWriter != null) {
>>> - asyncQueueWriter.close();
>>> - asyncQueueWriter = null;
>>> - }
>>> }
>>>
>>>
>>> @@ -385,6 +352,8 @@
>>>
>>> @Override
>>> public void closeChannel(SelectableChannel channel) {
>>> + if( datagramChannel == channel )
>>> + return;
>>> try{
>>> channel.close();
>>> } catch (IOException ex){
>>> Index: main/java/com/sun/grizzly/TCPSelectorHandler.java
>>> ===================================================================
>>> --- main/java/com/sun/grizzly/TCPSelectorHandler.java (revision 3298)
>>> +++ main/java/com/sun/grizzly/TCPSelectorHandler.java (working copy)
>>> @@ -72,6 +72,8 @@
>>> import java.util.Set;
>>> import java.util.concurrent.Callable;
>>> import java.util.concurrent.ExecutorService;
>>> +import java.util.concurrent.ConcurrentHashMap;
>>> +import java.util.concurrent.CopyOnWriteArrayList;
>>> import java.util.concurrent.atomic.AtomicBoolean;
>>> import java.util.logging.Level;
>>> import java.util.logging.Logger;
>>> @@ -274,7 +276,12 @@
>>>
>>> private long lastSpinTimestamp;
>>> private int emptySpinCounter;
>>> -
>>> +
>>> + private final ConcurrentHashMap<SocketAddress, SocketChannel> acceptedSocketChannelMap =
>>> + new ConcurrentHashMap<SocketAddress, SocketChannel>();
>>> + private final CopyOnWriteArrayList<SocketChannel> recycledSocketChannels =
>>> + new CopyOnWriteArrayList<SocketChannel>();
>>> +
>>> public TCPSelectorHandler(){
>>> this(Role.CLIENT_SERVER);
>>> }
>>> @@ -397,15 +404,13 @@
>>>
>>> serverSocketChannel.configureBlocking(false);
>>> serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
>>> +
>>> + serverSocket.setSoTimeout(serverTimeout);
>>> }
>>> ctx.getController().notifyReady();
>>> } catch (SocketException ex){
>>> throw new BindException(ex.getMessage() + ": " + port + "=" + this);
>>> }
>>> -
>>> - if (role != Role.CLIENT){
>>> - serverSocket.setSoTimeout(serverTimeout);
>>> - }
>>> }
>>>
>>> /**
>>> @@ -531,7 +536,7 @@
>>> }else{
>>> ((Runnable)obj).run();
>>> }
>>> - }catch(Throwable t){
>>> + }catch(Throwable t){
>>> logger.log(Level.FINEST, "doExecutePendiongIO failed.", t);
>>> }
>>> }
>>> @@ -567,7 +572,7 @@
>>> * Register a SelectionKey to this Selector.<br>
>>> * Storing each interest type in different queues removes the need of wrapper (SelectionKeyOP)
>>> * while lowering thread contention due to the load is spread out on different queues.
>>> - *
>>> + *
>>> * @param key
>>> * @param ops
>>> */
>>> @@ -595,11 +600,11 @@
>>> opToRegister.offer(new SelectionKeyOP(null,ops,channel));
>>> wakeUp();
>>> }
>>> -
>>> +
>>> /**
>>> * Workaround for NIO issue 6524172
>>> */
>>> - private void wakeUp(){
>>> + protected void wakeUp(){
>>> try{
>>> selector.wakeup();
>>> } catch (NullPointerException ne){
>>> @@ -617,25 +622,52 @@
>>> */
>>> protected void connect(SocketAddress remoteAddress, SocketAddress localAddress,
>>> CallbackHandler callbackHandler) throws IOException {
>>> -
>>> - SocketChannel socketChannel = SocketChannel.open();
>>> - socketChannel.socket().setReuseAddress(reuseAddress);
>>> - if (localAddress != null) {
>>> - socketChannel.socket().bind(localAddress);
>>> - }
>>> -
>>> - socketChannel.configureBlocking(false);
>>> -
>>> + SelectableChannel selectableChannel = getSelectableChannel( remoteAddress, localAddress );
>>> SelectionKeyOP.ConnectSelectionKeyOP keyOP = new ConnectSelectionKeyOP();
>>> -
>>> keyOP.setOp(SelectionKey.OP_CONNECT);
>>> - keyOP.setChannel(socketChannel);
>>> + keyOP.setChannel(selectableChannel);
>>> keyOP.setRemoteAddress(remoteAddress);
>>> keyOP.setCallbackHandler(callbackHandler);
>>> opToRegister.offer(keyOP);
>>> wakeUp();
>>> }
>>>
>>> + private SelectableChannel getSelectableChannel( SocketAddress remoteAddress, SocketAddress localAddress ) throws IOException {
>>> + SelectableChannel selectableChannel = null;
>>> + if( localAddress != null ) {
>>> + if( inet != null && localAddress instanceof InetSocketAddress ) {
>>> + InetSocketAddress inetSocketAddress = (InetSocketAddress)localAddress;
>>> + if( inet.equals( inetSocketAddress.getAddress() ) )
>>> + selectableChannel = getUsedSelectableChannel( remoteAddress );
>>> + }
>>> + } else {
>>> + selectableChannel = getUsedSelectableChannel( remoteAddress );
>>> + }
>>> + if( selectableChannel == null )
>>> + selectableChannel = getNewSelectableChannel( localAddress );
>>> + return selectableChannel;
>>> + }
>>> +
>>> + protected SelectableChannel getUsedSelectableChannel( SocketAddress remoteAddress ) {
>>> + if( remoteAddress != null ) {
>>> + SocketChannel acceptedSocketChannel = acceptedSocketChannelMap.get( remoteAddress );
>>> + if( acceptedSocketChannel != null )
>>> + recycledSocketChannels.add( acceptedSocketChannel );
>>> + return acceptedSocketChannel;
>>> + } else {
>>> + return null;
>>> + }
>>> + }
>>> +
>>> + protected SelectableChannel getNewSelectableChannel( SocketAddress localAddress ) throws IOException {
>>> + SocketChannel newSocketChannel = SocketChannel.open();
>>> + newSocketChannel.socket().setReuseAddress( reuseAddress );
>>> + if( localAddress != null )
>>> + newSocketChannel.socket().bind( localAddress );
>>> + newSocketChannel.configureBlocking(false);
>>> + return newSocketChannel;
>>> + }
>>> +
>>> /**
>>> * {_at_inheritDoc}
>>> */
>>> @@ -691,16 +723,20 @@
>>> }
>>>
>>> try{
>>> - if (serverSocket != null)
>>> + if (serverSocket != null) {
>>> serverSocket.close();
>>> + serverSocket = null;
>>> + }
>>> } catch (Throwable ex){
>>> Controller.logger().log(Level.SEVERE,
>>> "serverSocket.close",ex);
>>> }
>>>
>>> try{
>>> - if (serverSocketChannel != null)
>>> + if (serverSocketChannel != null) {
>>> serverSocketChannel.close();
>>> + serverSocketChannel = null;
>>> + }
>>> } catch (Throwable ex){
>>> Controller.logger().log(Level.SEVERE,
>>> "serverSocketChannel.close",ex);
>>> @@ -727,6 +763,8 @@
>>> readOpToRegister.clear();
>>> writeOpToRegister.clear();
>>> opToRegister.clear();
>>> + acceptedSocketChannelMap.clear();
>>> + recycledSocketChannels.clear();
>>>
>>> attributes = null;
>>> }
>>> @@ -735,7 +773,16 @@
>>> * {_at_inheritDoc}
>>> */
>>> public SelectableChannel acceptWithoutRegistration(SelectionKey key) throws IOException {
>>> - return ((ServerSocketChannel) key.channel()).accept();
>>> + SocketChannel acceptedSocketChannel = ((ServerSocketChannel) key.channel()).accept();
>>> + if( acceptedSocketChannel != null ) {
>>> + SocketAddress remoteSocketAddress = null;
>>> + Socket acceptedSocket = acceptedSocketChannel.socket();
>>> + if( acceptedSocket != null )
>>> + remoteSocketAddress = acceptedSocket.getRemoteSocketAddress();
>>> + if( remoteSocketAddress != null )
>>> + acceptedSocketChannelMap.put( remoteSocketAddress, acceptedSocketChannel );
>>> + }
>>> + return acceptedSocketChannel;
>>> }
>>>
>>> /**
>>> @@ -845,7 +892,7 @@
>>> // Added because of incompatibility with Grizzly 1.6.0
>>> context.setSelectorHandler(this);
>>>
>>> - CallbackHandlerContextTask task =
>>> + CallbackHandlerContextTask task =
>>> context.getCallbackHandlerContextTask(callbackHandler);
>>> boolean isRunInSeparateThread = true;
>>>
>>> @@ -1224,7 +1271,15 @@
>>> public void closeChannel(SelectableChannel channel) {
>>> // channel could be either SocketChannel or ServerSocketChannel
>>> if (channel instanceof SocketChannel) {
>>> - Socket socket = ((SocketChannel) channel).socket();
>>> + SocketChannel socketChannel = (SocketChannel)channel;
>>> + if( recycledSocketChannels.remove( socketChannel ) )
>>> + return;
>>> + Socket socket = socketChannel.socket();
>>> + SocketAddress remoteSocketAddress = null;
>>> + if( socket != null )
>>> + remoteSocketAddress = socket.getRemoteSocketAddress();
>>> + if( remoteSocketAddress != null )
>>> + acceptedSocketChannelMap.remove( remoteSocketAddress );
>>>
>>> try {
>>> if (!socket.isInputShutdown()) socket.shutdownInput();
>>> @@ -1268,14 +1323,14 @@
>>> * @return {_at_link Context}
>>> */
>>> protected NIOContext pollContext(final Context serverContext,
>>> - final SelectionKey key, final Context.OpType opType) {
>>> + final SelectionKey key, final Context.OpType opType) {
>>> Controller c = serverContext.getController();
>>> ProtocolChain protocolChain = instanceHandler != null ?
>>> instanceHandler.poll() :
>>> c.getProtocolChainInstanceHandler().poll();
>>>
>>> final NIOContext context = (NIOContext)c.pollContext();
>>> - c.configureContext(key, opType, context, this);
>>> + c.configureContext(key, opType, context, this);
>>> context.setProtocolChain(protocolChain);
>>> return context;
>>> }
>>> @@ -1379,7 +1434,7 @@
>>> * {_at_inheritDoc}
>>> */
>>> public void resetSpinCounter(){
>>> - emptySpinCounter = 0;
>>> + emptySpinCounter = 0;
>>> }
>>>
>>> /**
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>>
>>
>>