users@grizzly.java.net

Re: Asynchronous Request Processing with TCPIP

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Fri, 04 Jul 2008 11:42:05 -0400

Salut,

see, I'm not on an Holiday :-)

Vishnuvardhan wrote:
> Hi Jeanfrancois,
>
> I tried SuspendableFilter and I am getting the below error:
>
> ProtocolChain exception
> java.lang.IllegalStateException: Invalid state
> at com.sun.grizzly.util.Utils.findBytes(Utils.java:219)

You get that exception because the bytebuffer is empty. Can you give me
more information about how you construct your ProtocolChain? I suspect
you have:

ReadFilter -> SuspendableFilter -> HL7Filter


> at
> com.sun.grizzly.suspendable.SuspendableFilter.execute(SuspendableFilter.java:299)
>
> at
> com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:137)
>
> at
> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:104)
> at
> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:90)
> at
> com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:67)
>
> at
> com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:56)
>
> at
> com.sun.grizzly.util.WorkerThreadImpl.run(WorkerThreadImpl.java:169)
>
> This is the code I have added:
>
> SuspendableFilter suspendableFilter = new SuspendableFilter();
> final Suspendable suspendable =
> suspendableFilter.suspend("*",-1,null,new SuspendableHandler(){
> public void interupted(Object attachment) {
> }
> public void resumed(Object attachment) {
> System.out.println("Resumed ...");
> }
> public void expired(Object attachment) {
> }
> },Suspend.AFTER);
>
> protocolChain.addFilter(suspendableFilter);
>
> Setting aside this error, here are some difficulties I found when I
> tried to implement this for my HL7 filter.
>
> It is difficult to write some data to the connection from inside the
> resumed() method. I had to do some workaround to bring the Context, the
> responseBuffer inside the resumed() method so that I can use those
> objects for writing. I thought it will be better if the resumed() itself
> passes the Context as one of its argument.

I see. A workaround for you is to probably add a Filter in front of the
SuspendableFilter, but I don't like it (see below)


>
> The attachment is also not much useful. What I wanted is to bring the
> Context object inside the resumed() method. But it is available only
> from ProtocolChain. The ProtocolChain is not available when I invoke
> suspend, so I cannot pass the context as attachment. I hope you can
> understand.

Agree. I need to work to support this inside the SuspendableFilter. I've
missed that use case when you wabt to use the SuspendableHandler for I/O
operation. Let me think of it...

>
> If the suspendable.resume() takes an object as parameter and it is
> passed to resumed() it will be much better. I can use it for passing the
> response data from the place where the response is available to where it
> can be written.
>
> Let me know if there is much simpler way. Your help is much needed

OK I don't want you to waste your time, so let me recap what you are
trying to do and see if we can do it without being blocked by the
"no-so-well-designed-SuspendedHandler". So for now, let's assume we
aren't using the SuspendFilter. So, we have:

ReadFilter -> HL7Filter

Inside your Filter, what is important for you to do is to tell the
SelectorHandler to not register the SelectionKey for any OP_READ when
you are under a suspended more:

ctx.setKeyRegistrationState(KeyRegistrationState.NONE);

Now next problem is that as soon as the ProtocolChain complete the
execution of its ProtocolFilter, the Context object is recycled and
ready to be used by *another* transaction. To keep the "state" of the
transaction, you can do WorkerThread.detach(), which return a
ThreadAttachment object:

https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/util/ThreadAttachment.html

So far so good, as inside your HL7Filter, when you need to suspend a
connection, you might just have to do:

ctx.setKeyRegistrationState(KeyRegistrationState.NONE);
ThreadAttachment ta = workerThread.detach();
SelectionKey key = ctx.getSelectionKey();

Then, store the ThreadAttachment somewhere, and when you are ready to
resume, you can get back your ThreadAttachment and your SelectionKey,
and do:

((SocketChannel)key.channel()).write(...);

But that's too low level. So you can always use the OutputWriter to
flush the data for you:

https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/util/OutputWriter.html

But that's not what you want to do because the write operation will be
blocking, meaning Grizzly will use a temporary Selector pool to make
sure all the bytes are written. It works quite well for the http
protocol, but might not be performant for your protocol. That need to be
tested.

Now the best solution would have been to use an Async Write Queue...the
problem is since the Context is the only Object that can be used for
doing async write, we cannot use async queue :-(

Long email to say that I will add a new API on the Context object that
mark it none shareable amongst transaction. That way you don't have to
do anything I've described above, but instead just do:

Context.suspend()

(Internally it will call
ctx.setKeyRegistrationState(KeyRegistrationState.NONE))

Then when store the Context and re-use it once you are ready, and get
access to the async queue, attributes, etc that were associated with
that object.

Sorry long email :-)

Thanks

-- Jeanfrancois






>
> Thanks
> Vishnu
>
>
> Vishnuvardhan wrote:
>>
>> Thank you very much. I will let you know once it is done.
>>
>>
>> Jeanfrancois Arcand wrote:
>>> Salut,
>>>
>>> Vishnuvardhan wrote:
>>>>
>>>> Just to be more specific, can you please tell me if the below will
>>>> work:
>>>>
>>>> //suspend() method is invoked with ctx as attachment. getting back
>>>> inside the resumed method
>>>> //inside the resumed method().
>>>> public void resumed(Object attachment) {
>>>> Context ctx = (Context) attachment;
>>>>
>>>> ctx.getAsyncQueueWritable().writeToAsyncQueue(responseBuffer);
>>>> }
>>>
>>> Yes
>>>
>>>>
>>>> P.S I need to get as much info before you go for long weekend.
>>>> Please don't mind.
>>>
>>> I'm Quebecois not American ;-), so nooo long week-end :-)
>>>
>>> A+
>>>
>>> -- Jeanfrancois
>>>
>>>>
>>>> Thanks
>>>> Vishnu
>>>>
>>>> Jeanfrancois Arcand wrote:
>>>>> Salut,
>>>>>
>>>>> Vishnuvardhan wrote:
>>>>>> Hi Jeanfrancois,
>>>>>>
>>>>>> Thank you very much. I think that will help.
>>>>>>
>>>>>> A couple of doubts:
>>>>>>
>>>>>> 1. If I want to suspend indefinitely until resume() is invoked,
>>>>>> can I send the timeout value as 0.
>>>>>
>>>>> Set it to -1.
>>>>>
>>>>>
>>>>>>
>>>>>> 2. Can I write the response back from resumed() method?
>>>>>
>>>>> Yes. You can always write when a connection is suspected/resumed.
>>>>>
>>>>> What is the
>>>>>> correct way to write the response back? Also when I write the data
>>>>>> back, the HL7 specific filter should be invoked for OP_WRITE so
>>>>>> that I can wrap the data with hl7 protocol specific characters.
>>>>>
>>>>> You can probably use the Async Queue Writer so you don't need to
>>>>> handle OP_WRITE directly:
>>>>>
>>>>> http://blogs.sun.com/oleksiys/entry/grizzly_1_7_0_presents
>>>>>
>>>>> A+
>>>>>
>>>>> - Jeanfrancois
>>>>>
>>>>>
>>>>>>
>>>>>> -Vishnu
>>>>>>
>>>>>>
>>>>>>
>>>>>> Jeanfrancois Arcand wrote:
>>>>>>> Salut,
>>>>>>>
>>>>>>> Vishnuvardhan wrote:
>>>>>>>> Hi Jeanfrancois
>>>>>>>>
>>>>>>>> Thanks for your help. I looked at the SuspendableFilter. This
>>>>>>>> filter suspends the execution based on the match of the string
>>>>>>>> received. In my case, the input can be any HL7 message. I need
>>>>>>>> to suspend after I receive the HL7 message.
>>>>>>>>
>>>>>>>> I need to have something like this:
>>>>>>>>
>>>>>>>> class SuspendableFilter
>>>>>>>>
>>>>>>>> //inside execute()
>>>>>>>>
>>>>>>>> //read the string received
>>>>>>>> this.suspend();
>>>>>>>> //continue execution after resuming..
>>>>>>>>
>>>>>>>> From some other method.
>>>>>>>>
>>>>>>>> suspendableFilter.resume().
>>>>>>>>
>>>>>>>> Again, thank you very much for considering my request.
>>>>>>>
>>>>>>> I think you can do that by extending the SuspendableFilter and
>>>>>>> suspend on all requests, independently of the matching String.
>>>>>>> Just make sure you are telling the SuspendableFilter to suspend
>>>>>>> *After* executing the next ProtocolFilter in the protocol chain:
>>>>>>>
>>>>>>>> final ProtocolFilter readFilter = new ReadFilter();
>>>>>>>> final SuspendableFilter suspendFilter = new
>>>>>>>> SuspendableFilter();
>>>>>>>> final ProtocolFilter hl7Filter = new HL7Filter();
>>>>>>>> suspendable = suspendFilter.suspend("*", timeout, null,
>>>>>>>> new SuspendableHandler() {
>>>>>>>>
>>>>>>>> public void interupted(Object attachment) {
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void resumed(Object attachment) {
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void expired(Object attachment) {
>>>>>>>> }
>>>>>>>> }, Suspend.AFTER);
>>>>>>>
>>>>>>> What this code snipped will do is read bytes, invoke your
>>>>>>> HL7Filter, then suspend the connection.
>>>>>>>
>>>>>>> Would that help?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> -- Jeanfrancois
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Vishnu
>>>>>>>>
>>>>>>>>
>>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>>> Salut,
>>>>>>>>>
>>>>>>>>> [removing the dev alias]
>>>>>>>>>
>>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>>> Hi
>>>>>>>>>>
>>>>>>>>>> I am working on Open ESB HL7 binding component. We are in the
>>>>>>>>>> processing of migrating the component from Apache Mina
>>>>>>>>>> framework to Grizzly. I would like to know if I can do
>>>>>>>>>> Asynchronous Request Processing with TCP using Grizzly.
>>>>>>>>>>
>>>>>>>>>> This is what I want to do.
>>>>>>>>>> 1. Listen on a particular port.
>>>>>>>>>> 2. Read the incoming message.
>>>>>>>>>> 3. Parse the incoming message.
>>>>>>>>>> 4. Submit the parsed message to the business processing layer.
>>>>>>>>>> 5. Once a response is available, send back the response
>>>>>>>>>> message to the same socket.
>>>>>>>>>
>>>>>>>>> Yes, you can. Take a look at the SuspendableFilter:
>>>>>>>>>
>>>>>>>>> https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/suspendable/SuspendableFilter.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This is what I have done.
>>>>>>>>>> 1. Created a Controller.
>>>>>>>>>> 2. Added a protocol chain with ReadFilter and a parser filter.
>>>>>>>>>> 3. Once the message is available, submitting the message to
>>>>>>>>>> the business processing layer.
>>>>>>>>>> The thread execution ends here.
>>>>>>>>>>
>>>>>>>>>> The response will be available from another thread. Now, I
>>>>>>>>>> want to write the response message back to the same socket. I
>>>>>>>>>> tried to cache the context from 1st thread and tried to use
>>>>>>>>>> the same context for writing. But when I say
>>>>>>>>>> ctx.getSelectionKey().channel(), the program hangs there.
>>>>>>>>>>
>>>>>>>>>> I went through the blog on Asynchronous Request Processing by
>>>>>>>>>> Jean-Francois Arcand's Blog
>>>>>>>>>> <http://weblogs.java.net/blog/jfarcand/>. The framework seems
>>>>>>>>>> to be supporting only HTTP protocol.
>>>>>>>>>>
>>>>>>>>>> Can somebody guide me how to achieve this in TCP IP.
>>>>>>>>>
>>>>>>>>> Take a look at the SuspendableFilter as I think this is really
>>>>>>>>> what you need. You can see some tests here:
>>>>>>>>>
>>>>>>>>> https://grizzly.dev.java.net/nonav/xref-test/com/sun/grizzly/SuspendableTest.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Let us know if that doesn't work. Your scenario is quite common
>>>>>>>>> and I can certainly help improving the SuspendableFilter to
>>>>>>>>> makes it work for you :-)
>>>>>>>>>
>>>>>>>>> A+
>>>>>>>>>
>>>>>>>>> -- Jeanfrancois
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks in advance,
>>>>>>>>>> Vishnu
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Vishnuvardhan Piskalaramesh
>>>>>>>>>> Sun's Open ESB Community (http://open-esb.org)
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>>
>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>
>>>>>
>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>
>>>
>>
>>
>
>