users@grizzly.java.net

Re: Asynchronous Request Processing with TCPIP

From: Vishnuvardhan <Vishnuvardhan.Piskalaramesh_at_Sun.COM>
Date: Fri, 04 Jul 2008 16:51:52 +0530

Hi Jeanfrancois,

I tried SuspendableFilter and I am getting the below error:

ProtocolChain exception
java.lang.IllegalStateException: Invalid state
        at com.sun.grizzly.util.Utils.findBytes(Utils.java:219)
        at
com.sun.grizzly.suspendable.SuspendableFilter.execute(SuspendableFilter.java:299)
        at
com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:137)
        at
com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:104)
        at
com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:90)
        at
com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:67)
        at
com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:56)
        at
com.sun.grizzly.util.WorkerThreadImpl.run(WorkerThreadImpl.java:169)

This is the code I have added:

            SuspendableFilter suspendableFilter = new SuspendableFilter();
            final Suspendable suspendable =
suspendableFilter.suspend("*",-1,null,new SuspendableHandler(){
                public void interupted(Object attachment) {
                }
                public void resumed(Object attachment) {
                        System.out.println("Resumed ...");
                }
                public void expired(Object attachment) {
                }
            },Suspend.AFTER);

            protocolChain.addFilter(suspendableFilter);

Setting aside this error, here are some difficulties I found when I
tried to implement this for my HL7 filter.

It is difficult to write some data to the connection from inside the
resumed() method. I had to do some workaround to bring the Context, the
responseBuffer inside the resumed() method so that I can use those
objects for writing. I thought it will be better if the resumed() itself
passes the Context as one of its argument.

The attachment is also not much useful. What I wanted is to bring the
Context object inside the resumed() method. But it is available only
from ProtocolChain. The ProtocolChain is not available when I invoke
suspend, so I cannot pass the context as attachment. I hope you can
understand.

If the suspendable.resume() takes an object as parameter and it is
passed to resumed() it will be much better. I can use it for passing the
response data from the place where the response is available to where it
can be written.

Let me know if there is much simpler way. Your help is much needed

Thanks
Vishnu


Vishnuvardhan wrote:
>
> Thank you very much. I will let you know once it is done.
>
>
> Jeanfrancois Arcand wrote:
>> Salut,
>>
>> Vishnuvardhan wrote:
>>>
>>> Just to be more specific, can you please tell me if the below will
>>> work:
>>>
>>> //suspend() method is invoked with ctx as attachment. getting back
>>> inside the resumed method
>>> //inside the resumed method().
>>> public void resumed(Object attachment) {
>>> Context ctx = (Context) attachment;
>>>
>>> ctx.getAsyncQueueWritable().writeToAsyncQueue(responseBuffer);
>>> }
>>
>> Yes
>>
>>>
>>> P.S I need to get as much info before you go for long weekend.
>>> Please don't mind.
>>
>> I'm Quebecois not American ;-), so nooo long week-end :-)
>>
>> A+
>>
>> -- Jeanfrancois
>>
>>>
>>> Thanks
>>> Vishnu
>>>
>>> Jeanfrancois Arcand wrote:
>>>> Salut,
>>>>
>>>> Vishnuvardhan wrote:
>>>>> Hi Jeanfrancois,
>>>>>
>>>>> Thank you very much. I think that will help.
>>>>>
>>>>> A couple of doubts:
>>>>>
>>>>> 1. If I want to suspend indefinitely until resume() is invoked,
>>>>> can I send the timeout value as 0.
>>>>
>>>> Set it to -1.
>>>>
>>>>
>>>>>
>>>>> 2. Can I write the response back from resumed() method?
>>>>
>>>> Yes. You can always write when a connection is suspected/resumed.
>>>>
>>>> What is the
>>>>> correct way to write the response back? Also when I write the data
>>>>> back, the HL7 specific filter should be invoked for OP_WRITE so
>>>>> that I can wrap the data with hl7 protocol specific characters.
>>>>
>>>> You can probably use the Async Queue Writer so you don't need to
>>>> handle OP_WRITE directly:
>>>>
>>>> http://blogs.sun.com/oleksiys/entry/grizzly_1_7_0_presents
>>>>
>>>> A+
>>>>
>>>> - Jeanfrancois
>>>>
>>>>
>>>>>
>>>>> -Vishnu
>>>>>
>>>>>
>>>>>
>>>>> Jeanfrancois Arcand wrote:
>>>>>> Salut,
>>>>>>
>>>>>> Vishnuvardhan wrote:
>>>>>>> Hi Jeanfrancois
>>>>>>>
>>>>>>> Thanks for your help. I looked at the SuspendableFilter. This
>>>>>>> filter suspends the execution based on the match of the string
>>>>>>> received. In my case, the input can be any HL7 message. I need
>>>>>>> to suspend after I receive the HL7 message.
>>>>>>>
>>>>>>> I need to have something like this:
>>>>>>>
>>>>>>> class SuspendableFilter
>>>>>>>
>>>>>>> //inside execute()
>>>>>>>
>>>>>>> //read the string received
>>>>>>> this.suspend();
>>>>>>> //continue execution after resuming..
>>>>>>>
>>>>>>> From some other method.
>>>>>>>
>>>>>>> suspendableFilter.resume().
>>>>>>>
>>>>>>> Again, thank you very much for considering my request.
>>>>>>
>>>>>> I think you can do that by extending the SuspendableFilter and
>>>>>> suspend on all requests, independently of the matching String.
>>>>>> Just make sure you are telling the SuspendableFilter to suspend
>>>>>> *After* executing the next ProtocolFilter in the protocol chain:
>>>>>>
>>>>>>> final ProtocolFilter readFilter = new ReadFilter();
>>>>>>> final SuspendableFilter suspendFilter = new
>>>>>>> SuspendableFilter();
>>>>>>> final ProtocolFilter hl7Filter = new HL7Filter();
>>>>>>> suspendable = suspendFilter.suspend("*", timeout, null,
>>>>>>> new SuspendableHandler() {
>>>>>>>
>>>>>>> public void interupted(Object attachment) {
>>>>>>> }
>>>>>>>
>>>>>>> public void resumed(Object attachment) {
>>>>>>> }
>>>>>>>
>>>>>>> public void expired(Object attachment) {
>>>>>>> }
>>>>>>> }, Suspend.AFTER);
>>>>>>
>>>>>> What this code snipped will do is read bytes, invoke your
>>>>>> HL7Filter, then suspend the connection.
>>>>>>
>>>>>> Would that help?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> -- Jeanfrancois
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Vishnu
>>>>>>>
>>>>>>>
>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>> Salut,
>>>>>>>>
>>>>>>>> [removing the dev alias]
>>>>>>>>
>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> I am working on Open ESB HL7 binding component. We are in the
>>>>>>>>> processing of migrating the component from Apache Mina
>>>>>>>>> framework to Grizzly. I would like to know if I can do
>>>>>>>>> Asynchronous Request Processing with TCP using Grizzly.
>>>>>>>>>
>>>>>>>>> This is what I want to do.
>>>>>>>>> 1. Listen on a particular port.
>>>>>>>>> 2. Read the incoming message.
>>>>>>>>> 3. Parse the incoming message.
>>>>>>>>> 4. Submit the parsed message to the business processing layer.
>>>>>>>>> 5. Once a response is available, send back the response
>>>>>>>>> message to the same socket.
>>>>>>>>
>>>>>>>> Yes, you can. Take a look at the SuspendableFilter:
>>>>>>>>
>>>>>>>> https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/suspendable/SuspendableFilter.html
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is what I have done.
>>>>>>>>> 1. Created a Controller.
>>>>>>>>> 2. Added a protocol chain with ReadFilter and a parser filter.
>>>>>>>>> 3. Once the message is available, submitting the message to
>>>>>>>>> the business processing layer.
>>>>>>>>> The thread execution ends here.
>>>>>>>>>
>>>>>>>>> The response will be available from another thread. Now, I
>>>>>>>>> want to write the response message back to the same socket. I
>>>>>>>>> tried to cache the context from 1st thread and tried to use
>>>>>>>>> the same context for writing. But when I say
>>>>>>>>> ctx.getSelectionKey().channel(), the program hangs there.
>>>>>>>>>
>>>>>>>>> I went through the blog on Asynchronous Request Processing by
>>>>>>>>> Jean-Francois Arcand's Blog
>>>>>>>>> <http://weblogs.java.net/blog/jfarcand/>. The framework seems
>>>>>>>>> to be supporting only HTTP protocol.
>>>>>>>>>
>>>>>>>>> Can somebody guide me how to achieve this in TCP IP.
>>>>>>>>
>>>>>>>> Take a look at the SuspendableFilter as I think this is really
>>>>>>>> what you need. You can see some tests here:
>>>>>>>>
>>>>>>>> https://grizzly.dev.java.net/nonav/xref-test/com/sun/grizzly/SuspendableTest.html
>>>>>>>>
>>>>>>>>
>>>>>>>> Let us know if that doesn't work. Your scenario is quite common
>>>>>>>> and I can certainly help improving the SuspendableFilter to
>>>>>>>> makes it work for you :-)
>>>>>>>>
>>>>>>>> A+
>>>>>>>>
>>>>>>>> -- Jeanfrancois
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks in advance,
>>>>>>>>> Vishnu
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Vishnuvardhan Piskalaramesh
>>>>>>>>> Sun's Open ESB Community (http://open-esb.org)
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>
>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>>
>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>
>>>>
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>>
>
>


-- 
Vishnuvardhan Piskalaramesh
Sun's Open ESB Community (http://open-esb.org)