users@grizzly.java.net

Re: WG: Re: Asynchronous Request Processing with TCPIP

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Fri, 04 Jul 2008 12:44:22 -0400

Salut,

the docs is hard to read, so I'm attaching the diffs.

Thanks.

-- Jeanfrancois

Jeanfrancois Arcand wrote:
> Salut,
>
> John ROM wrote:
>> Hi Jeanfrancois,
>> just curious if I could also profit (-:
>>
>> I have a similar usecase. I also would like to call Context from
>> a different Thread and use AsychWriter,AttributeHolder...
>>
>> But I need the SelectionKey to still get OP_Reads..
>>
>> So more a Context.detach(); So I wonder if you could do somethink like:
>> suspend() {
>> detach(); ctx.setKeyRegistrationState(KeyRegistrationState.NONE))
>
> Here you means KeyRegistrationState,REGISTER, right?
>
>
>> }
>> ....
>>
>> latter I would give Context back to the Context Pool...
>
> So far here is what I'm proposing for the API:
>
>> /**
>> * Suspend the execution of this {_at_link Context}. Suspending the
>> execution
>> * allow application to store the current instance, and re-use it
>> later
>> * by not only the Thread used when called suspend, but also from
>> any other Thread.
>> * A suspended Context will not be re-used by any other
>> transaction and Thread. * A suspended Context will keep its
>> current state intact, meaning its
>> * SelectionKey, attributes, SelectorHandler, etc, will not
>> change. Internally,
>> * The Context will not be recyled and will not be re-used by any
>> Thread.
>> * * When invoked this method will automatically set the
>> * {_at_link Context#setKeyRegistrationState} to {_at_link KeyRegistrationState}
>> * to KeyRegistrationState.NONE.
>> * * Invoking this method many times as not effect once
>> suspended. */
>> public void suspend();
>> /**
>> * Return <tt>true</tt> if this Context has been suspended by
>> * invoking {_at_link suspend}. When suspended, invoking {_at_link
>> Context#recycle}
>> * will throw an {_at_link IllegalStateException}
>> * @return <tt>true</tt> if this Context has been suspended
>> */
>> public boolean isSuspended();
>> /**
>> * Resume a {_at_link #suspend}ed {_at_link Context}. Invoking this
>> method will
>> * automatically clean the state of this Context and mark it as a
>> candidate
>> * for being re-used by another Thread and connection. *
>> * When invoked this method will automatically set the * {_at_link
>> Context#setKeyRegistrationState} to {_at_link KeyRegistrationState}
>> * to KeyRegistrationState.REGISTER and automatically re-enable
>> read and * write operations. */
>> public void resume();
>> /**
>> * Cancel a {_at_link #suspend}ed {_at_link Context}. Invoking this
>> method will
>> * automatically clean the state of this Context and mark it as a
>> candidate
>> * for being re-used by another Thread and connection. *
>> * When invoked this method will automatically close the underlying
>> * connection (represented by its {_at_link SelectionKey}.
>> */
>> public void cancel();
>
> So in your case I would think you can do:
>
> Context.suspend();
> Thread.detach();
>
> // We suspend, but we still want to get OP_READ event
> ctx.setKeyRegistrationState(KeyRegistrationState.REGISTER);
>
>
> How does it sound? Let me know as I will make sure I can support your
> scenario before commiting the code :-)
>
> Thanks
>
> -- Jeanfrancois
>
>
>
>
>
>
>
>
>>
>>>> do anything I've described above, but instead just do:
>>>>
>>>> Context.suspend()
>>>>
>>>> (Internally it will call
>>>> ctx.setKeyRegistrationState(KeyRegistrationState.NONE))
>>>>
>>>> Then when store the Context and re-use it once you are ready, and
>>>> get access to the async queue, attributes, etc that were associated
>>>> with that object.
>>>>
>>>> Sorry long email :-)
>>>>
>>>> Thanks
>>>>
>>>> -- Jeanfrancois
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>> Thanks
>>>>> Vishnu
>>>>>
>>>>>
>>>>> Vishnuvardhan wrote:
>>>>>> Thank you very much. I will let you know once it is done.
>>>>>>
>>>>>>
>>>>>> Jeanfrancois Arcand wrote:
>>>>>>> Salut,
>>>>>>>
>>>>>>> Vishnuvardhan wrote:
>>>>>>>> Just to be more specific, can you please tell me if the below will
>>>>>>>> work:
>>>>>>>>
>>>>>>>> //suspend() method is invoked with ctx as attachment. getting back
>>>>>>>> inside the resumed method
>>>>>>>> //inside the resumed method().
>>>>>>>> public void resumed(Object attachment) {
>>>>>>>> Context ctx = (Context) attachment;
>>>>>>>>
>>>>>>>> ctx.getAsyncQueueWritable().writeToAsyncQueue(responseBuffer);
>>>>>>>> }
>>>>>>> Yes
>>>>>>>
>>>>>>>> P.S I need to get as much info before you go for long weekend.
>>>>>>>> Please don't mind.
>>>>>>> I'm Quebecois not American ;-), so nooo long week-end :-)
>>>>>>>
>>>>>>> A+
>>>>>>>
>>>>>>> -- Jeanfrancois
>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Vishnu
>>>>>>>>
>>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>>> Salut,
>>>>>>>>>
>>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>>> Hi Jeanfrancois,
>>>>>>>>>>
>>>>>>>>>> Thank you very much. I think that will help.
>>>>>>>>>>
>>>>>>>>>> A couple of doubts:
>>>>>>>>>>
>>>>>>>>>> 1. If I want to suspend indefinitely until resume() is
>>>>>>>>>> invoked, can I send the timeout value as 0.
>>>>>>>>> Set it to -1.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> 2. Can I write the response back from resumed() method?
>>>>>>>>> Yes. You can always write when a connection is suspected/resumed.
>>>>>>>>>
>>>>>>>>> What is the
>>>>>>>>>> correct way to write the response back? Also when I write the
>>>>>>>>>> data back, the HL7 specific filter should be invoked for
>>> OP_WRITE
>>>>>>>>>> so that I can wrap the data with hl7 protocol specific
>>> characters.
>>>>>>>>> You can probably use the Async Queue Writer so you don't need
>>>>>>>>> to handle OP_WRITE directly:
>>>>>>>>>
>>>>>>>>> http://blogs.sun.com/oleksiys/entry/grizzly_1_7_0_presents
>>>>>>>>>
>>>>>>>>> A+
>>>>>>>>>
>>>>>>>>> - Jeanfrancois
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> -Vishnu
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>>>>> Salut,
>>>>>>>>>>>
>>>>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>>>>> Hi Jeanfrancois
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your help. I looked at the SuspendableFilter. This
>>>>>>>>>>>> filter suspends the execution based on the match of the string
>>>>>>>>>>>> received. In my case, the input can be any HL7 message. I need
>>>>>>>>>>>> to suspend after I receive the HL7 message.
>>>>>>>>>>>>
>>>>>>>>>>>> I need to have something like this:
>>>>>>>>>>>>
>>>>>>>>>>>> class SuspendableFilter
>>>>>>>>>>>>
>>>>>>>>>>>> //inside execute()
>>>>>>>>>>>>
>>>>>>>>>>>> //read the string received
>>>>>>>>>>>> this.suspend();
>>>>>>>>>>>> //continue execution after resuming..
>>>>>>>>>>>>
>>>>>>>>>>>> From some other method.
>>>>>>>>>>>>
>>>>>>>>>>>> suspendableFilter.resume().
>>>>>>>>>>>>
>>>>>>>>>>>> Again, thank you very much for considering my request.
>>>>>>>>>>> I think you can do that by extending the SuspendableFilter
>>>>>>>>>>> and suspend on all requests, independently of the matching
>>>>>>>>>>> String. Just make sure you are telling the SuspendableFilter
>>>>>>>>>>> to suspend
>>>>>>>>>>> *After* executing the next ProtocolFilter in the protocol
>>> chain:
>>>>>>>>>>>> final ProtocolFilter readFilter = new ReadFilter();
>>>>>>>>>>>> final SuspendableFilter suspendFilter = new
>>>>>>>>>>>> SuspendableFilter();
>>>>>>>>>>>> final ProtocolFilter hl7Filter = new HL7Filter();
>>>>>>>>>>>> suspendable = suspendFilter.suspend("*", timeout,
>>> null,
>>>>>>>>>>>> new SuspendableHandler() {
>>>>>>>>>>>>
>>>>>>>>>>>> public void interupted(Object attachment) {
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> public void resumed(Object attachment) {
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> public void expired(Object attachment) {
>>>>>>>>>>>> }
>>>>>>>>>>>> }, Suspend.AFTER);
>>>>>>>>>>> What this code snipped will do is read bytes, invoke your
>>>>>>>>>>> HL7Filter, then suspend the connection.
>>>>>>>>>>>
>>>>>>>>>>> Would that help?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> -- Jeanfrancois
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Vishnu
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>>>>>>> Salut,
>>>>>>>>>>>>>
>>>>>>>>>>>>> [removing the dev alias]
>>>>>>>>>>>>>
>>>>>>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>>>>>>> Hi
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am working on Open ESB HL7 binding component. We are in
>>> the
>>>>>>>>>>>>>> processing of migrating the component from Apache Mina
>>>>>>>>>>>>>> framework to Grizzly. I would like to know if I can do
>>>>>>>>>>>>>> Asynchronous Request Processing with TCP using Grizzly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is what I want to do.
>>>>>>>>>>>>>> 1. Listen on a particular port.
>>>>>>>>>>>>>> 2. Read the incoming message.
>>>>>>>>>>>>>> 3. Parse the incoming message.
>>>>>>>>>>>>>> 4. Submit the parsed message to the business processing
>>> layer.
>>>>>>>>>>>>>> 5. Once a response is available, send back the response
>>>>>>>>>>>>>> message to the same socket.
>>>>>>>>>>>>> Yes, you can. Take a look at the SuspendableFilter:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>> https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/suspendable/S
>>> uspendableFilter.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is what I have done.
>>>>>>>>>>>>>> 1. Created a Controller.
>>>>>>>>>>>>>> 2. Added a protocol chain with ReadFilter and a parser
>>> filter.
>>>>>>>>>>>>>> 3. Once the message is available, submitting the message
>>>>>>>>>>>>>> to the business processing layer.
>>>>>>>>>>>>>> The thread execution ends here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The response will be available from another thread. Now, I
>>>>>>>>>>>>>> want to write the response message back to the same socket.
>>> I
>>>>>>>>>>>>>> tried to cache the context from 1st thread and tried to
>>>>>>>>>>>>>> use the same context for writing. But when I say
>>>>>>>>>>>>>> ctx.getSelectionKey().channel(), the program hangs there.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I went through the blog on Asynchronous Request Processing
>>> by
>>>>>>>>>>>>>> Jean-Francois Arcand's Blog
>>>>>>>>>>>>>> <http://weblogs.java.net/blog/jfarcand/>. The framework
>>> seems
>>>>>>>>>>>>>> to be supporting only HTTP protocol.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can somebody guide me how to achieve this in TCP IP.
>>>>>>>>>>>>> Take a look at the SuspendableFilter as I think this is
>>> really
>>>>>>>>>>>>> what you need. You can see some tests here:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>> https://grizzly.dev.java.net/nonav/xref-test/com/sun/grizzly/Suspendable
>>> Test.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let us know if that doesn't work. Your scenario is quite
>>>>>>>>>>>>> common and I can certainly help improving the
>>>>>>>>>>>>> SuspendableFilter to makes it work for you :-)
>>>>>>>>>>>>>
>>>>>>>>>>>>> A+
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Jeanfrancois
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>>>>> Vishnu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Vishnuvardhan Piskalaramesh
>>>>>>>>>>>>>> Sun's Open ESB Community (http://open-esb.org)
>>>>>>>>>>>>>
>>> ---------------------------------------------------------------------
>>>>>>>>>>>>> To unsubscribe, e-mail:
>>> users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>>>> For additional commands, e-mail:
>>> users-help_at_grizzly.dev.java.net
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>>> For additional commands, e-mail:
>>> users-help_at_grizzly.dev.java.net
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>


Index: src/main/java/com/sun/grizzly/Context.java
===================================================================
--- src/main/java/com/sun/grizzly/Context.java (revision 1291)
+++ src/main/java/com/sun/grizzly/Context.java (working copy)
@@ -180,7 +180,14 @@
      */
     private AsyncQueueWritable asyncQueueWritable;
     
+
     /**
+ * Is this context suspended.
+ */
+ private boolean isSuspended = false;
+
+
+ /**
      * Constructor
      */
     public Context() {
@@ -343,6 +350,11 @@
      * Recycle this instance.
      */
     public void recycle(){
+ if (isSuspended){
+ throw new IllegalStateException("The Context has been marked as " +
+ "suspended and cannot be recycled");
+ }
+
         getProtocolChainInstanceHandler().offer(protocolChain);
         key = null;
         keyRegistrationState = KeyRegistrationState.REGISTER;
@@ -354,6 +366,7 @@
         if (attributes != null) {
             attributes.clear();
         }
+ isSuspended = false;
     }
     
     
@@ -768,4 +781,78 @@
                     condition, readPostProcessor);
         }
     }
+
+
+ /**
+ * Suspend the execution of this {_at_link Context}. Suspending the execution
+ * allow application to store the current instance, and re-use it later
+ * by not only the Thread used when called suspend, but also from any other Thread.
+ * A suspended Context will not be re-used by any other transaction and Thread.
+ * A suspended Context will keep its current state intact, meaning its
+ * SelectionKey, attributes, SelectorHandler, etc, will not change. Internally,
+ * The Context will not be recyled and will not be re-used by any Thread.
+ *
+ * When invoked this method will automatically set the
+ * {_at_link Context#setKeyRegistrationState} to {_at_link KeyRegistrationState}
+ * to KeyRegistrationState.NONE.
+ *
+ * Invoking this method many times as not effect once suspended.
+ */
+ public void suspend(){
+ if (isSuspended) return;
+ isSuspended = true;
+ setKeyRegistrationState(keyRegistrationState.NONE);
+ }
+
+
+ /**
+ * Return <tt>true</tt> if this Context has been suspended by
+ * invoking {_at_link suspend}. When suspended, invoking {_at_link Context#recycle}
+ * will throw an {_at_link IllegalStateException}
+ * @return <tt>true</tt> if this Context has been suspended
+ */
+ public boolean isSuspended(){
+ return isSuspended;
+ }
+
+
+ /**
+ * Resume a {_at_link #suspend}ed {_at_link Context}. Invoking this method will
+ * automatically clean the state of this Context and mark it as a candidate
+ * for being re-used by another Thread and connection.
+ *
+ * When invoked this method will automatically set the
+ * {_at_link Context#setKeyRegistrationState} to {_at_link KeyRegistrationState}
+ * to KeyRegistrationState.REGISTER and automatically re-enable read and
+ * write operations.
+ *
+ * If the Context hasn't been suspended, calling that method has no effet.
+ */
+ public void resume(){
+ if (!isSuspended) return;
+ isSuspended = false;
+ selectorHandler.register(key, SelectionKey.OP_READ);
+ recycle();
+ getController().returnContext(this);
+ }
+
+
+ /**
+ * Cancel a {_at_link #suspend}ed {_at_link Context}. Invoking this method will
+ * automatically clean the state of this Context and mark it as a candidate
+ * for being re-used by another Thread and connection.
+ *
+ * When invoked this method will automatically close the underlying
+ * connection (represented by its {_at_link SelectionKey}.
+ *
+ * If the Context hasn't been suspended, calling that method has no effet.
+ */
+ public void cancel(){
+ if (!isSuspended) return;
+ isSuspended = false;
+ selectorHandler.getSelectionKeyHandler().cancel(key);
+ recycle();
+ getController().returnContext(this);
+ }
+
 }