dev@grizzly.java.net

Re: Buildinga Proxy/Load Balacer (was Re: [ANN] Port unification now available in Project Grizzly)

From: Ramesh Parthasarathy <Ramesh.Parthasarathy_at_Sun.COM>
Date: Thu, 27 Sep 2007 11:45:39 +0000

Hi JeanFrancois,
        I tried to extend the PU mechanism in Grizzly 1.0 ([1]) and implement a
way by which we can park the request (release the resources and keep the
channel alive). Could you please review the changes, i have attached them.
Briefly, i have just propagated the selectorthread through the
protocolinfo object so that the callback handler can take the decision
of canceling/registering the key back on the selector thread once the
response is complete. Also i have refactored the DefaultReadTask so that
 the resources can be released appropriately.


Thanks
-Ramesh


Jeanfrancois Arcand wrote On 09/25/07 17:45,:
> Hi Ramesh,
>
> thanks for not having private discussions :-)
>
> Ramesh wrote:
>
>>Hi JeanFrancois,
>> please find responses inline
>>(ccied dev alias)
>>
>>
>>Jeanfrancois Arcand wrote:
>>
>>>Hi Ramesh,
>>>
>>>removing mridul, adding Alexey. Can we have this discussion on the
>>>dev alias? I'm sure a lot of peoples can collaborate :-)
>>>
>>>Ramesh wrote:
>>>
>>>>Hi JeanFrancois,
>>>> I looked at ARP code flow. The DefaultProcessorTask is always
>>>>initialized (in AsynReadTask) and the AsyncProcessorTask is just a
>>>>wrapper around the DefaultProcessorTask I guess it would be
>>>>inefficient to initialize a processor task even before we know if we
>>>>will actually be processing the request in the local container.
>>>
>>>Yes, but I'm not longer following what you are trying to achieve ;-)
>>>
>>>
>>
>>Our intention has always been to intercept a http request at the
>>earliest possible oppurtunity.. If we had our implementation of
>>Asynchandler and AsyncFilter, the only concern was that we will be
>>doing the interception after some of the artifacts have been created
>>(or initialized), which actually may not be required if this request
>>was going to be forwarded and not processed locally.
>>
>>
>>>>If we intercept the request at the AsyncFilter, would it be not
>>>>expensive because we would be creating Processor tasks (which in
>>>>turn intializes request, response...) for every request.
>>>
>>>Ouf..and good you care about performance :-) I recommended PU at the
>>>beginning of the project and that still applies :-) But if you need
>>>to park connection, you just need to write an AsyncFilter and enable
>>>ARP (only in that case) from your PU Handler. So you take the ARP way
>>>only when required.
>>>
>>
>>Iam sorry , i did not understand this (still learning ARP :-)). Are
>>you suggesting we still use PU pipeline along with our implementation
>>of the ARP classes (asynchandler and asyncfilter). Would this enable
>>the handler to control the task execution (park/resume).
>
>
> ARP can be combined with PU. To enable ARP, you just call
> SelectorThread.setAsyncExecution(true). Now a possible solution for you
> would consist of writing your own AsyncHandler to decide if the request
> needs to be parked or not. To recap, if you need to park requests, you
> have four solutions:
>
> [1] Extends Port Unification mechanism (like you already did) and write
> your own way of parking the request.
> [2] Extends Port Unification, programmatically configure ARP (You can
> take a look at how openESB is doing it) to park only the requests you
> are interested
> [3] Extends ARP's AsyncHandler (you can replace the default
> programmatically using the SelectorThread API)
> [4] Write your own Pipeline Grizzly 1.0 (similar to the Port Unification
> Pipeline). Inside that Pipeline, first detect if the request has to be
> serviced locally or remotely. If locally, just call super.addTask(). If
> remotely, get an AsyncReadTask and execute it assuming you have written
> your AsyncHandler).
>
>
>>Also right now ARP can be enabled by setting system properties, is
>>there a programmatic way (API) for setting the ARP implementation
>>classes on the selector thread.
>
> Yes, you can configure the AsyncHandler, AsyncExecutor and AsyncFilter
> from the SelectorThread.
>
> Hope that help.
>
> -- Jeanfrancois
>
>
>
>>
>>-Ramesh
>>
>>>Hope that help....
>>>
>>>Thanks
>>>
>>>-- Jeanfrancois
>>>
>>>
>>>>Thanks
>>>>-Ramesh
>>>>
>>>>Mridul Muralidharan wrote:
>>>>
>>>>>Hi,
>>>>>
>>>>> I had plugged it into grizzly as part of the http processing ...
>>>>>after the request has been read, and before invoking the servlet
>>>>>container.
>>>>>
>>>>>Also, I was using what was exposed through grizzly (actually,
>>>>>whatever was in daily builds then ... glassfish2 b54 or thereabouts
>>>>>- grizzly 1.6 I guess).
>>>>>
>>>>>
>>>>>Regards,
>>>>>Mridul
>>>>>
>>>>>Ramesh wrote:
>>>>>
>>>>>>Hi JeanFrancois,
>>>>>> The requirements we have from the http proxy/clb in sailfin are
>>>>>>
>>>>>>1. Intercept every Http request at the earliest possible time,
>>>>>>this would be using Grizzly 1.0, i don't think we are planning to
>>>>>>move to 1.6 on the server side.
>>>>>>2. Parse the request line and headers..
>>>>>>3. Support a pluggable invocation model so that interceptors can
>>>>>>be invoked on the request.
>>>>>>4. The HA CLB (being one of the interceptors) takes a routing
>>>>>>decision based on the headers and request line. It identifies the
>>>>>>instance that needs to process this request. And the action taken
>>>>>>could be one of (5) or (6)
>>>>>>5. This instance identified could be the local instance (the
>>>>>>container in the same instance) in which case it has to be treated
>>>>>>like a normal http request to a glassfish instance... (nothing
>>>>>>fancy needs to be done here))
>>>>>>6. The instance is a remote instance in which case it has to be
>>>>>>forwarded to a remote instance, this would be using Grizzly 1.6
>>>>>>client APIs, the client channel has to be parked and a new
>>>>>>connection opened with the remote instance, the remote instance
>>>>>>might respond immediately or after some time, so we need to
>>>>>>asynchronously write data to the client channel whenever we
>>>>>>receive the response.
>>>>>>7. We also have to support SSL from the http client to the front
>>>>>>end server, from the clb/proxy to any of the target instances
>>>>>>internally , we would not be using SSL. Basically the ssl off
>>>>>>loading takes place at the clb and we only propagate the cert and
>>>>>>auth information through http headers (just like we do currently
>>>>>>in the native-lb)
>>>>>>8. Needless to say , the solution has to be scalable and
>>>>>>performant :-).
>>>>>>
>>>>>>Could you please advice if using ARP is the best way to satisfy
>>>>>>these requirements, and not PU.
>>>>>>Especially can (5) and (7), be satisfied. with ARP. Also the
>>>>>>clb/proxy feature can be configuratively turned off (in
>>>>>>domain.xml). Would it be possible to turn off ARP in such a case..
>>>>>>
>>>>>>Thanks
>>>>>>-Ramesh
>>>>>>
>>>>>>Jeanfrancois Arcand wrote:
>>>>>>
>>>>>>>Hi Ramesh, Mridul
>>>>>>>
>>>>>>>Mridul, do you still have the code you did when you extended
>>>>>>>Grizzly ARP. I know your final version is using Comet, but Ramesh
>>>>>>>is intersted about ARP and would like to see your code, if possible.
>>>>>>>
>>>>>>>Ramesh, checkout the JRuby module under Grizzly which contains a
>>>>>>>very simple example of an ARP AsyncFilter implementation.
>>>>>>>
>>>>>>>BTW I'm in Santa Clara this week for AjaxWorld and I will be in
>>>>>>>an out.
>>>>>>>
>>>>>>>Thanks
>>>>>>>
>>>>>>>-- Jeanfrancois
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>Ramesh wrote:
>>>>>>>
>>>>>>>>Hi JeanFrancois,
>>>>>>>>
>>>>>>>> Could you please point me to the code (Sun Instant Messenger)
>>>>>>>>that uses ARP.
>>>>>>>>
>>>>>>>>Pls note: I have not ccied dev_at_grizzly, because i though the Sun
>>>>>>>>Messenger code is not open source
>>>>>>>>
>>>>>>>>Thanks
>>>>>>>>-Ramesh
>>>>>>>>
>>>>>>>>Jeanfrancois Arcand wrote:
>>>>>>>>
>>>>>>>>>Hi Ramesh,
>>>>>>>>>
>>>>>>>>>Ramesh wrote:
>>>>>>>>>
>>>>>>>>>>Hi Alexey,
>>>>>>>>>>
>>>>>>>>>> >Currently you can propogate a custom data using Context
>>>>>>>>>>attributes. Is it enough?
>>>>>>>>>>
>>>>>>>>>>Yes, should be enough in 1.6.1... ,
>>>>>>>>>>
>>>>>>>>>> > I didn't think about possibility to dynamically switch the
>>>>>>>>>>protocol during connection lifecycle. But think it's not a
>>>>>>>>>>problem to add this feature.
>>>>>>>>>>
>>>>>>>>>>It depends on which life-cycle do we choose to associate a
>>>>>>>>>>handler, it could be for the lifetime of a selectionkey or it
>>>>>>>>>>could be for the lifetime of a request. Thats why i propose to
>>>>>>>>>>leave this to the handler itself so that the design is
>>>>>>>>>>flexible. We could add another attribute in the
>>>>>>>>>>PUProtocolRequest through which we can propagate whether or
>>>>>>>>>>not to map the handler back to the key, and this can be
>>>>>>>>>>set/reset by the handler itself.
>>>>>>>>>>
>>>>>>>>>> > Do you have any specific usecase?
>>>>>>>>>>Yes , if we are implementing a http proxy behavior in the
>>>>>>>>>>handler, where we are going to forward the received bytes to
>>>>>>>>>>another server, then after we have sent the entire data, we
>>>>>>>>>>just have to wait for the response from the server, IMO it is
>>>>>>>>>>inefficient to just wait and block at the handler until the
>>>>>>>>>>response is received from the server, we could register a
>>>>>>>>>>callback to handle the response when it arrives, but if we
>>>>>>>>>>returned from the handler we have to ensure the channel is
>>>>>>>>>>kept alive so that the response can be sent back on it.
>>>>>>>>>
>>>>>>>>>In that case you should explore Grizzlly ARP (Asynchronous
>>>>>>>>>Request Processing) [1]. This extension was done exactly to
>>>>>>>>>support that use case. Port unification is not the place to
>>>>>>>>>implement such feature IMO, as it role is to:
>>>>>>>>>
>>>>>>>>>1. Find the protocol
>>>>>>>>>2. Redirect the request to the proper endpoind.
>>>>>>>>>
>>>>>>>>>In you case, you should 'leave' the Handler and enter the
>>>>>>>>>Grizzly ARP mechanism (like Comet is doing. You can taste it by
>>>>>>>>>playing with the Sun Instant Messenger tool internally. It use
>>>>>>>>>ARP/Comet to park requests).
>>>>>>>>>
>>>>>>>>>That is why it would be good if we can
>>>>>>>>>
>>>>>>>>>>separate out the readkey registering from the keepalive logic
>>>>>>>>>>so that we can just keep the channel alive without registering
>>>>>>>>>>a OP_READ. We should be able to register a OP_READ once the
>>>>>>>>>>response has been written back completely.
>>>>>>>>>
>>>>>>>>>Comet[2] is the solution, not Port Unification IMO.
>>>>>>>>>
>>>>>>>>>Thanks
>>>>>>>>>
>>>>>>>>>-- Jeanfrancois
>>>>>>>>>
>>>>>>>>>[1]http://weblogs.java.net/blog/jfarcand/archive/2006/02/grizzly_part_ii.html
>>>>>>>>>
>>>>>>>>>[2]http://weblogs.java.net/blog/jfarcand/archive/2006/07/the_grizzly_com.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>Thanks
>>>>>>>>>>-Ramesh
>>>>>>>>>>
>>>>>>>>>>Oleksiy Stashok wrote:
>>>>>>>>>>
>>>>>>>>>>>Hello Ramesh,
>>>>>>>>>>>
>>>>>>>>>>>Ramesh Parthasarathy wrote:
>>>>>>>>>>>
>>>>>>>>>>>>I will try to write a doc on how i have used PU, but it
>>>>>>>>>>>>might take some
>>>>>>>>>>>>time , so here are some initial thoughts
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>It could be very useful! Thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>1. Though the Finder and the Handler are meant for specific
>>>>>>>>>>>>purposes,
>>>>>>>>>>>>the ProtocoInfo/PUProtocolRequest (or the Context in Grizzly
>>>>>>>>>>>>1.6.1)
>>>>>>>>>>>>object is the only standard means of sharing data between
>>>>>>>>>>>>them, it would
>>>>>>>>>>>>be good if we could propagate custom data from the Finder to
>>>>>>>>>>>>the Handler
>>>>>>>>>>>>so that we dont have to redo work we might have already done
>>>>>>>>>>>>in the
>>>>>>>>>>>>Finder. This custom object could be propagated through the
>>>>>>>>>>>>ProtocolInfo
>>>>>>>>>>>>or any other means.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>Currently you can propogate a custom data using Context
>>>>>>>>>>>attributes. Is it enough?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>2. The logic in the PUpipeline/PUFilter that determines
>>>>>>>>>>>>whether the
>>>>>>>>>>>>Finder has to be invoked. If i understand correctly for a
>>>>>>>>>>>>selection Key
>>>>>>>>>>>>oncer the Finder returns with a protocol, we create a map of
>>>>>>>>>>>>the key and
>>>>>>>>>>>>the handler and this holds good until the key is
>>>>>>>>>>>>valid/channel is alive.
>>>>>>>>>>>>So, any further data that is read on that channel will
>>>>>>>>>>>>result in the
>>>>>>>>>>>>invocation of the same handler. For e.g the data could be
>>>>>>>>>>>>more data for
>>>>>>>>>>>>the same http request or it could be a new http request on
>>>>>>>>>>>>the same
>>>>>>>>>>>>channel (if keep-alive). I guess it would be good if we
>>>>>>>>>>>>allowed the
>>>>>>>>>>>>handler to control the association of the key and the
>>>>>>>>>>>>handler, so that
>>>>>>>>>>>>the logic of determining whether the finder is invoked on
>>>>>>>>>>>>the next chunk
>>>>>>>>>>>>of data can be controlled by user logic.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>Hmm. I was looking at port unification as mechanism how newly
>>>>>>>>>>>accepted connection could be classified and in future it will
>>>>>>>>>>>be processed by certain (single) ProtocolHandler.
>>>>>>>>>>>I didn't think about possibility to dynamically switch the
>>>>>>>>>>>protocol during connection lifecycle. But think it's not a
>>>>>>>>>>>problem to add this feature.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>3. The handler basically has to keep the channel alive
>>>>>>>>>>>>(return true),
>>>>>>>>>>>>until it is done using the channel. There may be
>>>>>>>>>>>>circumstances where the
>>>>>>>>>>>>handler is finished reading the entire data from the
>>>>>>>>>>>>channel, but it is
>>>>>>>>>>>>waiting (for some async event) to write more data into the
>>>>>>>>>>>>channel. In
>>>>>>>>>>>>such a case the channel has to kept alive (else the
>>>>>>>>>>>>cancelkey closed the
>>>>>>>>>>>>channel) . And if the channel is kept alive the key is
>>>>>>>>>>>>registered for
>>>>>>>>>>>>OP_READ ? Is there a way we can keep the channel alive and
>>>>>>>>>>>>indicate
>>>>>>>>>>>>somehow to the selector thread we are not expecting any data
>>>>>>>>>>>>to be read
>>>>>>>>>>>>as of now... Iam not sure if this is possible in 1.6.1
>>>>>>>>>>>>(sorry, i havent
>>>>>>>>>>>>gone through the 1.6.1 code in detail) , but it was
>>>>>>>>>>>>certainly difficult
>>>>>>>>>>>>in 1.0.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>You're right, in 1.6.1 PU works the same way. PUFilter should
>>>>>>>>>>>be usually added after the ReadFilter, and ReadFilter, by
>>>>>>>>>>>default, reregisters connection for reading when postProcess
>>>>>>>>>>>is called.
>>>>>>>>>>>But again if it's required - we can think about possibility
>>>>>>>>>>>to avoid reregistering on OP_READ.
>>>>>>>>>>>Do you have any specific usecase?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>Thank you!!!
>>>>>>>>>>>
>>>>>>>>>>>WBR,
>>>>>>>>>>>Alexey.
>>>>>>>>>>>
>>>>>>>>>>>>Please correct me if any of my observation is not correct.
>>>>>>>>>>>>
>>>>>>>>>>>>Thanks
>>>>>>>>>>>>-Ramesh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>Jeanfrancois Arcand wrote On 09/19/07 18:02,:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>>Kudo to Alexey, the Grizzly's port unification[1] used by
>>>>>>>>>>>>>GlassFish v2 (Grizzly 1.0) is now available as a sub module
>>>>>>>>>>>>>in Project Grizzly.
>>>>>>>>>>>>>
>>>>>>>>>>>>>Port unification allow any server base application to
>>>>>>>>>>>>>support more than one protocol using a single tcp/udp/tls
>>>>>>>>>>>>>port. As as example, GlassFish v2 by default listen on port
>>>>>>>>>>>>>8080 for http, https and Soap over tcp requests.
>>>>>>>>>>>>>
>>>>>>>>>>>>>Any volunteer to writes docs or tutorial? :-) :-)
>>>>>>>>>>>>>
>>>>>>>>>>>>>Port Unification is currently available with version
>>>>>>>>>>>>>1.6-SNAPSHOT and soon with 1.6.1.
>>>>>>>>>>>>>
>>>>>>>>>>>>>-- Jeanfrancois
>>>>>>>>>>>>>
>>>>>>>>>>>>>http://weblogs.java.net/blog/jfarcand/archive/2006/11/one_port_to_rul.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>---------------------------------------------------------------------
>>>>>>>>>>>>>
>>>>>>>>>>>>>To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>>>>For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>---------------------------------------------------------------------
>>>>>>>>>>>>
>>>>>>>>>>>>To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>>>For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>---------------------------------------------------------------------
>>>>>>>>>>>
>>>>>>>>>>>To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>>For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>---------------------------------------------------------------------
>>>>>>>>>>
>>>>>>>>>>To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>---------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>>To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>>>>>>
>>>>>
>>---------------------------------------------------------------------
>>To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>



Index: src/java/com/sun/enterprise/web/connector/grizzly/DefaultReadTask.java
===================================================================
RCS file: /cvs/glassfish/appserv-http-engine/src/java/com/sun/enterprise/web/connector/grizzly/DefaultReadTask.java,v
retrieving revision 1.10.6.1
diff -u -w -r1.10.6.1 DefaultReadTask.java
--- src/java/com/sun/enterprise/web/connector/grizzly/DefaultReadTask.java 6 Jul 2007 10:37:04 -0000 1.10.6.1
+++ src/java/com/sun/enterprise/web/connector/grizzly/DefaultReadTask.java 27 Sep 2007 04:55:33 -0000
@@ -349,13 +349,22 @@
     /**
      * Return this object to the pool
      */
- protected void returnTask(){
+ public void returnTask(){
         if (recycle) {
             recycle();
             selectorThread.returnTask(this);
         }
     }
     
+ public void releaseContext(){
+ try{
+ if (taskContext != null){
+ taskContext.recycle();
+ }
+ } catch (IOException ioe){
+ ;
+ }
+ }
 
     public void taskEvent(TaskEvent event){
         if (event.getStatus() == TaskEvent.COMPLETED
@@ -417,13 +426,7 @@
         if (SelectorThread.logger().isLoggable(Level.FINEST))
             SelectorThread.logger().log(Level.FINEST,"finishConnection");
         
- try{
- if (taskContext != null){
- taskContext.recycle();
- }
- } catch (IOException ioe){
- ;
- }
+ releaseContext();
         
         selectorThread.cancelKey(key);
     }
Index: src/java/com/sun/enterprise/web/portunif/PortUnificationPipeline.java
===================================================================
RCS file: /cvs/glassfish/appserv-http-engine/src/java/com/sun/enterprise/web/portunif/PortUnificationPipeline.java,v
retrieving revision 1.22.2.2
diff -u -w -r1.22.2.2 PortUnificationPipeline.java
--- src/java/com/sun/enterprise/web/portunif/PortUnificationPipeline.java 20 Aug 2007 11:28:57 -0000 1.22.2.2
+++ src/java/com/sun/enterprise/web/portunif/PortUnificationPipeline.java 27 Sep 2007 04:55:33 -0000
@@ -153,6 +153,38 @@
         loadHandlers();
     }
     
+ /**
+ * Hack for the LB proxy, this method is required because, when the lb proxy
+ * is enabled we want every new request though from same channel to go
+ * through the finder.
+ * The method will return true if lb proxy is enabled
+ *
+ * 1. For a Http request that is processed by the local container,
+ * when the lb/http finder is enabled, every new request, though coming
+ * from the same channel has to go through the Finder.
+ * 2. For a Http request to a remote instance the handler determines if the
+ * new request has to go through the finder by removing itself from
+ * the handler map.
+ */
+ private boolean getLbProxy(SelectionKey key) {
+ ProtocolHandler protocolHandler = protocolHandlers.get("lb/http");
+ ProtocolHandler lbhandler = mappedProtocols.get(key);
+ Class lbHandlerClass = null;
+ try {
+ lbHandlerClass = Class.forName(
+ "org.jvnet.glassfish.comms.clb.proxy.http.LoadBalancerProxyHandler");
+ } catch (ClassNotFoundException ex) {
+ //ex.printStackTrace();
+ }
+ boolean proxy = false;
+ if ((protocolHandler != null) || ((lbhandler != null)
+ &&(lbHandlerClass != null)
+ && (lbhandler.getClass()
+ == lbHandlerClass))) {
+ proxy = true;
+ }
+ return proxy;
+ }
     
     /**
      * Seek for the TCP protocol used. First all ProtocolFinder will be invoked.
@@ -173,11 +205,13 @@
         SelectorThread selectorThread = task.getSelectorThread();
         KeepAlivePipeline kap = selectorThread == null ? null :
                                         selectorThread.getKeepAlivePipeline();
+ boolean lbproxy = getLbProxy(task.getSelectionKey());
         if (protocolFinders.isEmpty()
                 || kap == null
                 || protocolHandlers.isEmpty()
                 || task.getType() != Task.READ_TASK
- || (kap.isKeepAlive(task.getSelectionKey()) && !cachedHandler)
+ || (kap.isKeepAlive(task.getSelectionKey()) && !cachedHandler
+ && !lbproxy)
                 ){
             super.addTask(task);
             return;
@@ -291,6 +325,7 @@
             }
             protocolInfo.sslEngine = sslEngine;
             protocolInfo.key = key;
+ protocolInfo.selectorThread = readTask.getSelectorThread();
             protocolInfo.isRequestedTransportSecure = isRequestedTransportSecure;
             protocolInfo.mappedProtocols = mappedProtocols;
             if (isRequestedTransportSecure){
@@ -354,10 +389,16 @@
                                         mappedProtocols.put(key,protocolHandler);
                                     }
                                 }
+ if (protocolInfo.parkRequest){
+ // means we release the task
+ readTask.releaseContext();
+ readTask.returnTask();
+ } else {
                                 if (protocolInfo.keepAlive){
                                     readTask.registerKey();
                                 }
                                 readTask.terminate(protocolInfo.keepAlive);
+ }
                             } else {
                                 if ( isRequestedTransportSecure ) {
                                     ((SSLReadTask)readTask).setHandshake(false);
@@ -392,9 +433,14 @@
                     }
                 } else {
                     protocolHandler.handle(protocolInfo);
+ if (protocolInfo.parkRequest){
+ readTask.releaseContext();
+ readTask.returnTask();
+ } else {
                     readTask.registerKey();
                     readTask.terminate(protocolInfo.keepAlive);
                 }
+ }
             } catch (Throwable ex){
                 notFound = false;
                 if (logger.isLoggable(Level.WARNING)){
Index: src/java/com/sun/enterprise/web/portunif/util/ProtocolInfo.java
===================================================================
RCS file: /cvs/glassfish/appserv-http-engine/src/java/com/sun/enterprise/web/portunif/util/ProtocolInfo.java,v
retrieving revision 1.7.6.2
diff -u -w -r1.7.6.2 ProtocolInfo.java
--- src/java/com/sun/enterprise/web/portunif/util/ProtocolInfo.java 20 Aug 2007 11:28:57 -0000 1.7.6.2
+++ src/java/com/sun/enterprise/web/portunif/util/ProtocolInfo.java 27 Sep 2007 04:55:34 -0000
@@ -36,6 +36,7 @@
 
 package com.sun.enterprise.web.portunif.util;
 
+import com.sun.enterprise.web.connector.grizzly.SelectorThread;
 import com.sun.enterprise.web.portunif.ProtocolHandler;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
@@ -86,6 +87,10 @@
     
     public boolean cacheHandler = true;
     
+ public SelectorThread selectorThread = null;
+
+ public boolean parkRequest = false;
+
     public void recycle(){
         sslContext = null;
         key = null;
@@ -103,6 +108,8 @@
         handshake = true;
         cacheHandler = true;
         object = null;
+ selectorThread = null;
+ parkRequest = false;
     }
     
 }


Index: http/LoadBalancerProxyFinder.java
===================================================================
RCS file: /cvs/sailfin/clb/src/main/java/org/jvnet/glassfish/comms/clb/proxy/http/LoadBalancerProxyFinder.java,v
retrieving revision 1.3
diff -u -r1.3 LoadBalancerProxyFinder.java
--- http/LoadBalancerProxyFinder.java 6 Sep 2007 13:21:13 -0000 1.3
+++ http/LoadBalancerProxyFinder.java 27 Sep 2007 11:34:52 -0000
@@ -24,28 +24,15 @@
 
 package org.jvnet.glassfish.comms.clb.proxy.http;
 
-import com.sun.enterprise.web.connector.grizzly.ByteBufferInputStream;
+
 import com.sun.enterprise.web.portunif.*;
 import com.sun.enterprise.web.portunif.util.ProtocolInfo;
-import com.sun.grizzly.connectioncache.spi.concurrent.ConcurrentQueue;
-import com.sun.grizzly.connectioncache.spi.concurrent.ConcurrentQueueFactory;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.HttpHeaderParser;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.HttpRequestWrapper;
 import org.jvnet.glassfish.comms.clb.proxy.http.util.ObjectManager;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.PayloadInfo;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import org.jvnet.glassfish.comms.clb.proxy.DefaultEndpoint;
+import org.jvnet.glassfish.comms.clb.proxy.ProxyTask;
 import org.jvnet.glassfish.comms.clb.proxy.api.Endpoint;
-import org.jvnet.glassfish.comms.clb.proxy.api.Protocols;
-import org.jvnet.glassfish.comms.clb.proxy.config.LoadBalancerProxyConstants;
 import org.jvnet.glassfish.comms.clb.proxy.config.ProxyConfig;
-import org.jvnet.glassfish.comms.clb.proxy.config.RoundRobinPolicy;
 
 /**
  * A <code>ProtocolFinder</code> implementation that parse the available
@@ -102,126 +89,27 @@
             }
             /* We have the byte buffer, feel free to parse and get the headers
              */
-
- ByteBuffer tmpBuffer = (ByteBuffer) protocolInfo.byteBuffer.flip();
- ByteBufferInputStream is = new ByteBufferInputStream(tmpBuffer);
- is.setByteBuffer(tmpBuffer);
- is.setSelectionKey(protocolInfo.key);
- is.setReadTimeout(LoadBalancerProxyConstants.DEFAULT_READ_TIMEOUT);
- HttpHeaderParser parser = objManager.pollParser();
- parser.setInputStream(is);
- try {
- parser.parseRequestLine();
- if (_logger.isLoggable(Level.FINE)) {
- _logger.log(Level.FINE,"Request line ");
- dumpBuffer(parser);
- }
- parser.parseHeaders();
- if (_logger.isLoggable(Level.FINE)) {
- _logger.log(Level.FINE,"Headers ");
- dumpBuffer(parser);
- }
- } catch(IOException e) {
- e.printStackTrace();
- // ignore
- } finally {
- ;
- }
-
- if ((parser.getHeaders().getHeader(LoadBalancerProxyConstants.
- HTTP_PROXY_HEADER) != null)) {
- _logger.log(Level.FINE," FELB header in message " +
- parser.getHeaders().getHeader(
- LoadBalancerProxyConstants.HTTP_PROXY_HEADER));
- tmpBuffer.clear();
- tmpBuffer.put(parser.getBytes(), tmpBuffer.position(),
- parser.lastValidPos());
- protocolInfo.inputBB = tmpBuffer;
- protocolInfo.bytesRead = tmpBuffer.position();
- parser.recycle();
- objManager.offerParser(parser);
- return false;
- }
- /* Invoke the CLB API after creating the request wrapper, we have the
- * headers.
- */
-
- HttpRequestWrapper wrapper = objManager.pollWrapper();
- wrapper.setParser(parser);
-
- Socket socket = ((SocketChannel)protocolInfo.key.channel()).socket();
- /*
- * Until the HA CLB Apis are implemented this is what we have.
- */
- InetSocketAddress remoteHost = RoundRobinPolicy.getInetSocketAddress(wrapper);
- wrapper.recycle();
- objManager.offerWrapper(wrapper);
-
- if (remoteHost.getAddress().isLoopbackAddress() &&
- (remoteHost.getPort() == ProxyConfig.getInstance().getLocalPort())) {
- _logger.log(Level.INFO,"Local Termination request" );
- tmpBuffer.clear();
- tmpBuffer.put(parser.getBytes(), tmpBuffer.position(),
- parser.lastValidPos());
- protocolInfo.inputBB = tmpBuffer;
- protocolInfo.bytesRead = tmpBuffer.position();
- parser.recycle();
- objManager.offerParser(parser);
-
+ ProxyTask task = objManager.pollTask();
+ task.setByteBuffer(protocolInfo.byteBuffer);
+ task.setSelectionKey(protocolInfo.key);
+ task.doTask();
+ protocolInfo.inputBB = task.getBuffer();
+ protocolInfo.bytesRead = task.getBuffer().position();
+ Endpoint remoteHost = task.getEndpoint();
+ if (remoteHost != null && remoteHost.isLocal()) {
+ _logger.log(Level.INFO,"Local Termination request" );
+ task.recycle();
+ objManager.offerTask(task);
                 return false;
             } else {
                 _logger.log(Level.INFO,"Remote Termination request" );
             }
- parser.addHeader(LoadBalancerProxyConstants.HTTP_PROXY_HEADER,
- "ConvergedProxy");
- if (_logger.isLoggable(Level.FINE)) {
- _logger.log(Level.FINE,"Headers after adding ");
- dumpBuffer(parser);
- }
- _logger.log(Level.FINE,"Last position " + parser.lastValidPos());
- _logger.log(Level.FINE,"Length of buffer " + parser.getBytes().length);
- tmpBuffer.clear();
- tmpBuffer.put(parser.getBytes(), tmpBuffer.position(),
- parser.lastValidPos());
- protocolInfo.inputBB = tmpBuffer;
- protocolInfo.bytesRead = tmpBuffer.position();
- _logger.log(Level.FINE,"Protocol Info bytesRead " + protocolInfo.bytesRead);
- Endpoint remoteEndpoint = new DefaultEndpoint(remoteHost,
- Protocols.TCP);
- String contentlength = parser.getHeaders().getHeader("content-length");
- long clength = 0;
- try {
- clength = Long.parseLong(contentlength);
- }catch (Exception e) {
- ;
- }
- PayloadInfo payload = objManager.pollPayload();
- payload.setEndpoint(remoteEndpoint);
- payload.setProtocol(Protocols.HTTP);
- payload.setTransportProtocol(Protocols.TCP);
- payload.setPayloadLength(clength + parser.getHeaderLength());
- _logger.log(Level.FINE,"Header length " + parser.getHeaderLength());
- _logger.log(Level.FINE,"Content length " + clength);
- protocolInfo.object = payload;
- _logger.log(Level.FINE,"Remote Host " + remoteHost.getAddress().toString() + ":"+
- remoteHost.getPort());
- _logger.log(Level.FINE,"Local Host " + socket.getLocalAddress().toString() + ":" +
- ProxyConfig.getInstance().getLocalPort());
- parser.recycle();
- objManager.offerParser(parser);
+ _logger.log(Level.FINE,"Protocol Info bytesRead " + protocolInfo.bytesRead);
+ _logger.log(Level.FINE,"Payload length " + task.getPayloadLength());
+ protocolInfo.object = task;
         } catch (Exception e){
             e.printStackTrace();
         }
         return true;
     }
-
- private void dumpBuffer(HttpHeaderParser parser){
- StringBuffer sbuf = new StringBuffer();
- for (int i=0; i < parser.lastValidPos(); i++) {
- sbuf.append((char)parser.getBytes()[i]);
- }
- _logger.log(Level.FINE,sbuf.toString());
-
- }
-
 }
Index: http/LoadBalancerProxyHandler.java
===================================================================
RCS file: /cvs/sailfin/clb/src/main/java/org/jvnet/glassfish/comms/clb/proxy/http/LoadBalancerProxyHandler.java,v
retrieving revision 1.3
diff -u -r1.3 LoadBalancerProxyHandler.java
--- http/LoadBalancerProxyHandler.java 6 Sep 2007 13:21:13 -0000 1.3
+++ http/LoadBalancerProxyHandler.java 27 Sep 2007 11:34:53 -0000
@@ -24,33 +24,30 @@
 
 package org.jvnet.glassfish.comms.clb.proxy.http;
 
-import com.sun.enterprise.web.connector.grizzly.ByteBufferInputStream;
 import com.sun.enterprise.web.portunif.*;
 import com.sun.enterprise.web.portunif.util.ProtocolInfo;
-import com.sun.org.apache.xalan.internal.xsltc.compiler.Parser;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.HttpHeaderParser;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.HttpRequestWrapper;
+import com.sun.grizzly.http.SocketChannelOutputBuffer;
+import com.sun.grizzly.tcp.Request;
+import com.sun.grizzly.tcp.Response;
+import com.sun.grizzly.tcp.http11.InternalInputBuffer;
+import com.sun.grizzly.tcp.http11.filters.IdentityOutputFilter;
+import com.sun.grizzly.tcp.http11.filters.VoidOutputFilter;
+import com.sun.grizzly.util.buf.MessageBytes;
+import com.sun.grizzly.util.http.FastHttpDateFormat;
+import com.sun.grizzly.util.http.MimeHeaders;
+import org.jvnet.glassfish.comms.clb.proxy.ProxyTask;
+import org.jvnet.glassfish.comms.clb.proxy.http.util.HttpRequest;
 import org.jvnet.glassfish.comms.clb.proxy.http.util.ObjectManager;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.PayloadInfo;
-import com.sun.grizzly.Controller;
-import org.jvnet.glassfish.comms.clb.proxy.DefaultEndpoint;
-import org.jvnet.glassfish.comms.clb.proxy.ConvergedProxy;
+import org.jvnet.glassfish.comms.clb.proxy.HttpProxy;
 import org.jvnet.glassfish.comms.clb.proxy.api.Endpoint;
-import org.jvnet.glassfish.comms.clb.proxy.api.Protocols;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import org.jvnet.glassfish.comms.clb.proxy.config.LoadBalancerProxyConstants;
 import org.jvnet.glassfish.comms.clb.proxy.config.ProxyConfig;
-import org.jvnet.glassfish.comms.clb.proxy.config.RoundRobinPolicy;
 
 /**
  * Redirect the request to the proper protocol, which can be http or https.
@@ -64,15 +61,15 @@
      */
     protected String[] protocols = {"lb/http", "lb/https"};
     
- private ConvergedProxy proxy ;
+ private HttpProxy proxy ;
     
     private Logger _logger = null;
     
- private ObjectManager objManager;
+ private ObjectManager objManager;
     
     public LoadBalancerProxyHandler() {
         _logger = ProxyConfig.getInstance().getLogger();
- proxy = ConvergedProxy.getInstance();
+ proxy = HttpProxy.getInstance();
         objManager = ObjectManager.getInstance();
     }
     
@@ -94,12 +91,32 @@
     
     
     private void _handle(ProtocolInfo protocolInfo) throws IOException {
- Object payload = protocolInfo.object;
+ Object ptask = protocolInfo.object;
         _logger.log(Level.FINE,"Inside Protocol Handler " + protocolInfo.key);
-
+ ProxyTask task = null;
         Endpoint endpoint = null;
- if (payload != null){
- endpoint = ((PayloadInfo)payload).getEndpoint();
+ Request request = null;
+ Response response = null;
+ proxy.setSelectorThread(protocolInfo.selectorThread);
+ if (ptask != null){
+ task = (ProxyTask)ptask;
+ response = task.getResponse();
+ request = task.getRequest();
+ if (response.getStatus() != 200){
+ _logger.log(Level.SEVERE, "Sending error response in Handler");
+ /** Set
+ * Connection close here
+ */
+ response.setHeader("connection", "close");
+ sendResponse(request, response);
+ _logger.log(Level.SEVERE, "Sent error response in Handler");
+ task.recycle();
+ objManager.offerTask(task);
+ _logger.log(Level.SEVERE, "Released reources");
+ protocolInfo.keepAlive = false;
+ return;
+ }
+ endpoint = task.getEndpoint();
             _logger.log(Level.FINE,"LoadBalancer protocol handler invoked with " +
                     " endpoint " + endpoint);
         } else {
@@ -121,7 +138,7 @@
          * in WSTCPProtocolHandler
          */
         protocolInfo.cacheHandler =
- proxy.doProxyHttp((PayloadInfo) payload,
+ proxy.doProxyHttp(task,
                 protocolInfo.byteBuffer, protocolInfo.key);
         _logger.log(Level.INFO,"doProxyHttp returned " + protocolInfo.cacheHandler);
          /* If we return false here the grizzly 1.0 controller will cancel key and close
@@ -136,18 +153,18 @@
         
         if (!protocolInfo.cacheHandler ){
             protocolInfo.mappedProtocols.remove(protocolInfo.key);
- // offer the payload
- ((PayloadInfo)payload).recycle();
- objManager.offerPayload((PayloadInfo)payload);
- }
-
-
- if (_logger.isLoggable(Level.FINE)) {
- _logger.log(Level.FINE,"Mapped protocols dump : After " );
- dump(protocolInfo.mappedProtocols);
+ // offer the payload
+ task.recycle();
+ objManager.offerTask(task);
         }
         
+ // need to see how we should handle keepAlive here
+ // always keep alive for now, because we will cancel key through the
+ // call back
         protocolInfo.keepAlive = true;
+ // unless we know that we can close the channel park request should be true
+ // we have to close the channel once the response is written back
+ protocolInfo.parkRequest = true;
         _logger.log(Level.INFO,"Load Balancer Handler returning keep alive "
                 + protocolInfo.keepAlive );
     }
@@ -174,15 +191,38 @@
         proxy.expireKey(key);
         return true;
     }
-
- private void dump(Map<SelectionKey, ProtocolHandler> mappedProtocols) {
- Set keys = mappedProtocols.keySet();
- Iterator iter = keys.iterator();
- while (iter.hasNext()){
- SelectionKey key = (SelectionKey) iter.next();
- _logger.log(Level.FINE," Key " + key);
- _logger.log(Level.FINE,"Handler " + mappedProtocols.get(key));
+ /**
+ * When committing the response, we have to validate the set of headers, as
+ * well as setup the response filters.
+ */
+ protected void sendResponse(Request request, Response response) {
+
+ SocketChannelOutputBuffer outputBuffer = (SocketChannelOutputBuffer)
+ response.getOutputBuffer();
+
+ MimeHeaders headers = response.getMimeHeaders();
+ // Add date header
+ if (! response.containsHeader("Date")){
+ String date = FastHttpDateFormat.getCurrentDate();
+ response.addHeader("Date", date);
+ }
+ headers.setValue("Connection").setString("close");
+
+ // Build the response header
+ outputBuffer.sendStatus();
+
+ int size = headers.size();
+ for (int i = 0; i < size; i++) {
+ outputBuffer.sendHeader(headers.getName(i), headers.getValue(i)); }
+ outputBuffer.endHeaders();
+ try {
+ outputBuffer.endRequest();
+ outputBuffer.flush();
+ outputBuffer.commit();
+ } catch (IOException ex) {
+ ex.printStackTrace();
         }
     }
+
 }
 
Index: http/util/ObjectManager.java
===================================================================
RCS file: /cvs/sailfin/clb/src/main/java/org/jvnet/glassfish/comms/clb/proxy/http/util/ObjectManager.java,v
retrieving revision 1.1
diff -u -r1.1 ObjectManager.java
--- http/util/ObjectManager.java 6 Sep 2007 13:22:03 -0000 1.1
+++ http/util/ObjectManager.java 27 Sep 2007 11:34:53 -0000
@@ -11,6 +11,9 @@
 
 import com.sun.grizzly.connectioncache.spi.concurrent.ConcurrentQueue;
 import com.sun.grizzly.connectioncache.spi.concurrent.ConcurrentQueueFactory;
+import org.jvnet.glassfish.comms.clb.proxy.ProxyTask;
+import org.jvnet.glassfish.comms.clb.proxy.config.LoadBalancerProxyConstants;
+
 import org.jvnet.glassfish.comms.clb.proxy.config.ProxyConfig;
 
 /**
@@ -19,14 +22,9 @@
  */
 public class ObjectManager {
     
- private static ObjectManager _thisInstance = null;
-
- private ConcurrentQueue<HttpHeaderParser> parsers;
-
- private ConcurrentQueue<PayloadInfo> payloads;
-
- private ConcurrentQueue<HttpRequestWrapper> wrappers;
+ private static ObjectManager _thisInstance = null;
     
+ private ConcurrentQueue<ProxyTask> tasks;
     /** Creates a new instance of ObjectManager */
     private ObjectManager() {
     }
@@ -39,39 +37,18 @@
     }
     
     public void initialize() {
- parsers = ConcurrentQueueFactory.makeConcurrentQueue();
- for (int i = 0; i < ProxyConfig.HTTP_PARSERS_POOL_SIZE; i++){
- parsers.offer(new HttpHeaderParser());
- }
- payloads = ConcurrentQueueFactory.makeConcurrentQueue();
- for (int i = 0; i < ProxyConfig.HTTP_PAYLOAD_POOL_SIZE; i++){
- payloads.offer(new PayloadInfo());
- }
- wrappers = ConcurrentQueueFactory.makeConcurrentQueue();
+ tasks = ConcurrentQueueFactory.makeConcurrentQueue();
         for (int i = 0; i < ProxyConfig.HTTP_PAYLOAD_POOL_SIZE; i++){
- wrappers.offer(new HttpRequestWrapper());
+ tasks.offer(new ProxyTask());
         }
     }
     
- public HttpHeaderParser pollParser(){
- return parsers.poll();
- }
-
- public void offerParser(HttpHeaderParser parser){
- parsers.offer(parser);
- }
- public HttpRequestWrapper pollWrapper(){
- return wrappers.poll();
- }
-
- public void offerWrapper(HttpRequestWrapper wrapper){
- wrappers.offer(wrapper);
- }
- public PayloadInfo pollPayload(){
- return payloads.poll();
+
+ public ProxyTask pollTask(){
+ return tasks.poll();
     }
     
- public void offerPayload(PayloadInfo payload){
- payloads.offer(payload);
+ public void offerTask(ProxyTask task){
+ tasks.offer(task);
     }
 }
Index: outbound/AsyncResponseCallBackHandler.java
===================================================================
RCS file: /cvs/sailfin/clb/src/main/java/org/jvnet/glassfish/comms/clb/proxy/outbound/AsyncResponseCallBackHandler.java,v
retrieving revision 1.3
diff -u -r1.3 AsyncResponseCallBackHandler.java
--- outbound/AsyncResponseCallBackHandler.java 6 Sep 2007 13:21:13 -0000 1.3
+++ outbound/AsyncResponseCallBackHandler.java 27 Sep 2007 11:34:53 -0000
@@ -28,8 +28,7 @@
  * The contentlength algo of GRIZZLY 1.5 has not been stanged yet
  * this should be updated to 1.5 when the staging is done.
  */
-import com.sun.enterprise.web.connector.grizzly.algorithms.ContentLengthAlgorithm;
-import org.jvnet.glassfish.comms.clb.proxy.http.util.PayloadInfo;
+
 import com.sun.grizzly.CallbackHandler;
 import com.sun.grizzly.ConnectorHandler;
 import com.sun.grizzly.Controller;
@@ -38,9 +37,10 @@
 import com.sun.grizzly.filter.ReadFilter;
 import com.sun.grizzly.util.OutputWriter;
 import com.sun.grizzly.util.WorkerThread;
+import com.sun.grizzly.http.algorithms.ContentLengthAlgorithm;
 import java.util.logging.Logger;
 import java.util.logging.Level;
-import org.jvnet.glassfish.comms.clb.proxy.ConvergedProxy;
+import org.jvnet.glassfish.comms.clb.proxy.HttpProxy;
 import org.jvnet.glassfish.comms.clb.proxy.config.ProxyConfig;
 import org.jvnet.glassfish.comms.clb.proxy.api.Endpoint;
 import org.jvnet.glassfish.comms.clb.proxy.api.Protocols;
@@ -54,36 +54,42 @@
  * Handler that will be invoked by the Grizzly 1.5 pipeline code when one
  * of the events READ,WRITE or CONNECT occur for the seclection key corresponding
  * to this handler.
- *
+ *
  * @author rampsarathy
  */
 public class AsyncResponseCallBackHandler implements CallbackHandler<Context> {
-
+
     private ConnectorHandler connectorHandler;
     private Logger _logger = null;
- private SocketChannel clientChannel;
+ private SelectionKey clientKey;
     
     private ReadFilter readFilter;
- private int protocol;
     private ContentLengthAlgorithm algorithm;
     private Endpoint ep;
+ private boolean keepAlive = false;
+ private ConnectionManager connectionManager;
     
     /**
      * This handler requires the connectorhandler that is handing this connection
- * the client socket channel to send the response back to and
+ * the client socket channel to send the response back to and
      * the endpoint.
      */
     public AsyncResponseCallBackHandler(
             Endpoint ep, ConnectorHandler connectorHandler,
- SocketChannel client) {
+ SelectionKey clientkey, boolean keepalive) {
         _logger = ProxyConfig.getInstance().getLogger();
         this.connectorHandler = connectorHandler;
         this.ep = ep;
- this.protocol = ep.getProtocolOrdinal();
- this.clientChannel = client;
+ this.clientKey = clientkey;
+ this.keepAlive = keepalive;
+ this.connectionManager = HttpProxy.getInstance().getConnectionManager();
         algorithm = new ContentLengthAlgorithm();
     }
     
+ public void setKeepAlive(boolean keepalive){
+ this.keepAlive = keepalive;
+ }
+
     /**
      * Called when the connection has been established.
      */
@@ -101,23 +107,21 @@
             connectorHandler.finishConnect(key);
         } catch (Exception e) {
             e.printStackTrace();
- return;
+ //clean
+ return;
         }
         _logger.log(Level.FINE,"Connection created with " + key);
- if (this.protocol == Protocols.TCP) {
- ioEvent.attachment().getController().registerKey(key,
- SelectionKey.OP_READ,
- Controller.Protocol.TCP);
- }else {
- ioEvent.attachment().getController().registerKey(key,
- SelectionKey.OP_READ,
- Controller.Protocol.UDP);
- }
+
+ ioEvent.attachment().getController().registerKey(key,
+ SelectionKey.OP_READ,
+ Controller.Protocol.TCP);
+
     }
     
- public void onRead(IOEvent<Context> ioEvent) {
+ public void onRead(IOEvent<Context> ioEvent) {
         Context ctx = ioEvent.attachment();
         try {
+ SocketChannel clientChannel= (SocketChannel)clientKey.channel();
             readFilter = new ReadFilter();
             readFilter.execute(ctx);
             WorkerThread workerThread = (WorkerThread)Thread.currentThread();
@@ -125,8 +129,8 @@
             algorithm.setSocketChannel((SocketChannel) ctx.getSelectionKey().channel());
             algorithm.preParse(byteBuffer);
             /**
- * Improve the logic below to write as much as we have and
- * then continue parsing in the next read, we should not
+ * Improve the logic below to write as much as we have and
+ * then continue parsing in the next read, we should not
              * wait for the entire message before we start writing.
              */
             if (algorithm.parse(byteBuffer)) {
@@ -136,7 +140,6 @@
                         clientChannel.socket().getRemoteSocketAddress());
                 if ((clientChannel != null) && (clientChannel.isConnected())){
                     OutputWriter.flushChannel(clientChannel, byteBuffer);
- clientChannel.close();
                 } else {
                     _logger.log(Level.FINE,"Client closed channel, " +
                             "cannot write response");
@@ -144,31 +147,45 @@
                 }
                 /**
                  * There are problems in re-using the same client channel
- * for further requests , so closing client
+ * for further requests , so closing client
                  * and server channels for now, Need to improve this..
                  */
- ConvergedProxy.getInstance().getConnectionManager().
- removeClientEndpoint(clientChannel);
- ConvergedProxy.getInstance().getConnectionManager().
- releaseConnection(this.connectorHandler);
- // ctx.getController().cancelKey(ctx.getSelectionKey());
+ // check for keep-alive and then close the channel,
+ // check the connection header
+ connectionManager.removeClientEndpoint(clientKey);
+ connectionManager.releaseConnection(this.connectorHandler);
+ if (!keepAlive){
+ connectionManager.removeClientEndpoint(clientKey);
+ } else{
+ connectionManager.registerClientKey(clientKey);
+ }
+
+ // ctx.getController().cancelKey(ctx.getSelectionKey());
             } else {
                 SelectionKey key = ctx.getSelectionKey();
+ if ((clientChannel != null) && (clientChannel.isConnected())){
+ OutputWriter.flushChannel(clientChannel, byteBuffer);
+ } else {
+ _logger.log(Level.FINE,"Client closed channel, " +
+ "cannot write response");
+ throw new Exception("Socket channel closed");
+ }
                 _logger.log(Level.FINE,"Algorithm says there is more data to write to " +
                         clientChannel.socket().getRemoteSocketAddress());
                 ctx.getController().registerKey(key, SelectionKey.OP_READ);
             }
             algorithm.postParse(byteBuffer);
- } catch (Throwable e) {
+ } catch (Throwable e) {
             ctx.getController().cancelKey(ctx.getSelectionKey());
- ConvergedProxy.getInstance().getConnectionManager().
- releaseConnection(this.connectorHandler);
+ HttpProxy.getInstance().getConnectionManager().
+ releaseConnection(this.connectorHandler);
+ connectionManager.removeClientEndpoint(clientKey);
             e.printStackTrace();
         }
     }
     
     public void onWrite(IOEvent<Context> ioEvent) {
- throw new IllegalStateException("Should not be here!");
+ throw new IllegalStateException("Should not be here!");
     }
     
 }
Index: outbound/ConnectionManager.java
===================================================================
RCS file: /cvs/sailfin/clb/src/main/java/org/jvnet/glassfish/comms/clb/proxy/outbound/ConnectionManager.java,v
retrieving revision 1.3
diff -u -r1.3 ConnectionManager.java
--- outbound/ConnectionManager.java 6 Sep 2007 13:21:13 -0000 1.3
+++ outbound/ConnectionManager.java 27 Sep 2007 11:34:53 -0000
@@ -24,7 +24,16 @@
 
 package org.jvnet.glassfish.comms.clb.proxy.outbound;
 
-import org.jvnet.glassfish.comms.clb.proxy.http.util.PayloadInfo;
+
+// Grizzly 1.0 APIs
+import com.sun.enterprise.web.connector.grizzly.SelectorThread;
+import com.sun.enterprise.web.connector.grizzly.WorkerThreadImpl;
+import java.nio.channels.SelectionKey;
+
+import org.jvnet.glassfish.comms.clb.proxy.ProxyTask;
+import org.jvnet.glassfish.comms.clb.proxy.ProxyTask;
+
+// Grillzy 1.x.x APIs
 import com.sun.grizzly.CallbackHandler;
 import com.sun.grizzly.ConnectorHandler;
 import com.sun.grizzly.ConnectorHandlerPool;
@@ -56,7 +65,7 @@
 
 /**
  * Manages the outbound connections.
- * @author rampsarathy
+ * @author
  */
 public class ConnectionManager {
     
@@ -68,8 +77,8 @@
     /**
      * Table for holding the client to proxy channel mappring.
      */
- private ConcurrentHashMap<SocketChannel, PayloadInfo > clientToproxy =
- new ConcurrentHashMap<SocketChannel, PayloadInfo>();
+ private ConcurrentHashMap<SelectionKey, ProxyTask > clientToproxy =
+ new ConcurrentHashMap<SelectionKey, ProxyTask>();
     /**
      * Cacheable pool of handlers.
      */
@@ -104,6 +113,9 @@
      * Logger
      */
     private Logger _logger = null;
+
+ private SelectorThread selectorThread = null;
+
     /** Creates a new instance of ConnectionManager */
     public ConnectionManager() {
         _logger = ProxyConfig.getInstance().getLogger();
@@ -112,7 +124,7 @@
         numberToReclaim =
             ProxyConfig.getInstance().RECLAIM_CONNECTIONS;
         maxParallelConnections =
- ProxyConfig.getInstance().MAX_PARALLEL_CONNECTIONS;
+ ProxyConfig.getInstance().MAX_PARALLEL_CONNECTIONS;
     }
     
     /**
@@ -122,6 +134,7 @@
         
         controller = new Controller();
         selectorHandler = new TCPSelectorHandler(true);
+
         /*cacheableHandlerPool= new CacheableConnectorHandlerPool(
                 controller, highWaterMark,
                 numberToReclaim, maxParallelConnections);
@@ -144,16 +157,17 @@
     /**
      * Creater a TCP client connection handler.
      */
- private ConnectorHandler createHandlerTCP(PayloadInfo payload, SocketChannel clientchnl) throws IOException {
+ private ConnectorHandler createHandlerTCP(ProxyTask payload, SelectionKey clientkey,
+ boolean keepalive) throws IOException {
         Endpoint ep = payload.getEndpoint();
         SocketAddress remote = ep.getSocketAddress();
         final ConnectorHandler connectorHandler =
                controller.acquireConnectorHandler(Controller.Protocol.TCP);
         CallbackHandler callbackHandler = new AsyncResponseCallBackHandler
- (payload.getEndpoint(),
- connectorHandler, clientchnl);
+ (payload.getEndpoint(), connectorHandler,
+ clientkey, keepalive);
         try {
- connectorHandler.connect( remote, callbackHandler );
+ connectorHandler.connect( remote, callbackHandler );
         } catch (Exception ex) {
             //ex.printStackTrace();
         }
@@ -180,23 +194,22 @@
     /**
      * Utility method for sending a byte buffer over a channel.
      */
- public long send( ByteBuffer bb, PayloadInfo payload,
- SocketChannel clientChannel) throws IOException {
+ public long send( ByteBuffer bb, ProxyTask payload,
+ SelectionKey key, boolean keepalive) throws IOException {
         long bytesWritten = 0;
         int retrycount = 0;
         Endpoint ep = payload.getEndpoint();
- if( ( ep.getProtocolOrdinal() == Protocols.TCP ) ||
- ( ep.getProtocolOrdinal() == Protocols.TLS) ) { //TODO move Protocols
+ if(!ep.isSecure()) { //TODO move Protocols
             _logger.log(Level.FINE,"Get Endpoint " + ep);
             ConnectorHandler handler = proxyTobackend.get(ep);
             _logger.log(Level.FINE,"Handler for " + ep + " is " + handler);
             if(handler == null){
- handler = createHandlerTCP(payload, clientChannel);
+ handler = createHandlerTCP(payload, key, keepalive);
             }
             while (!handler.getUnderlyingChannel().isOpen() &&
                         retrycount < ProxyConfig.MAX_SEND_RETRY){
                 retrycount++;
- handler = createHandlerTCP(payload, clientChannel);
+ handler = createHandlerTCP(payload, key, keepalive);
             }
             
             bb.flip();
@@ -214,16 +227,20 @@
         return bytesWritten;
     }
     
- public void registerBEEnpoint(SocketChannel chnl, PayloadInfo beEndpoint) {
- clientToproxy.put(chnl, beEndpoint);
+ public void setSelectorThread(SelectorThread thread){
+ this.selectorThread = thread;
+ }
+
+ public void registerServerEndpoint(SelectionKey key, ProxyTask beEndpoint) {
+ clientToproxy.put(key, beEndpoint);
     }
     
- public PayloadInfo getBEEndpoint(SocketChannel chnl) {
- return clientToproxy.get(chnl);
+ public ProxyTask getServerEndpoint(SelectionKey key) {
+ return clientToproxy.get(key);
     }
     
- public PayloadInfo removeClientEndpoint(SocketChannel chnl) {
- return clientToproxy.remove(chnl);
+ public ProxyTask removeClientEndpoint(SelectionKey key) {
+ return clientToproxy.remove(key);
     }
     
     public void releaseConnection(ConnectorHandler handler) {
@@ -237,6 +254,18 @@
         }
     }
     
+ public void cancelClientKey(SelectionKey key){
+ if (key.isValid() && (selectorThread != null)){
+ selectorThread.cancelKey(key);
+ }
+ }
+
+ public void registerClientKey(SelectionKey key){
+ if (key.isValid() && (selectorThread != null)){
+ selectorThread.registerKey(key);
+ }
+ }
+
     public void setHighWaterMark(int mark){
         this.highWaterMark = mark;
     }
@@ -256,5 +285,4 @@
     public void setMaxParallelConnections(int maxparallel) {
         this.maxParallelConnections = maxparallel;
     }
-
 }