users@grizzly.java.net

Re: Asynchronous Request Processing with TCPIP

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Fri, 04 Jul 2008 11:53:24 -0400

Salut,

I've filled:

https://grizzly.dev.java.net/issues/show_bug.cgi?id=186

to track the issue. I will go ahead and work on the implementation. The
implementation will also helps fixing the SuspendableHandler issue you
have noted as the Context could be used as an attachment.

Stay tuned...

-- Jeanfrancois

Jeanfrancois Arcand wrote:
> Salut,
>
> see, I'm not on an Holiday :-)
>
> Vishnuvardhan wrote:
>> Hi Jeanfrancois,
>>
>> I tried SuspendableFilter and I am getting the below error:
>>
>> ProtocolChain exception
>> java.lang.IllegalStateException: Invalid state
>> at com.sun.grizzly.util.Utils.findBytes(Utils.java:219)
>
> You get that exception because the bytebuffer is empty. Can you give me
> more information about how you construct your ProtocolChain? I suspect
> you have:
>
> ReadFilter -> SuspendableFilter -> HL7Filter
>
>
>> at
>> com.sun.grizzly.suspendable.SuspendableFilter.execute(SuspendableFilter.java:299)
>>
>> at
>> com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:137)
>>
>> at
>> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:104)
>>
>> at
>> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:90)
>>
>> at
>> com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:67)
>>
>> at
>> com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:56)
>>
>> at
>> com.sun.grizzly.util.WorkerThreadImpl.run(WorkerThreadImpl.java:169)
>>
>> This is the code I have added:
>>
>> SuspendableFilter suspendableFilter = new SuspendableFilter();
>> final Suspendable suspendable =
>> suspendableFilter.suspend("*",-1,null,new SuspendableHandler(){
>> public void interupted(Object attachment) {
>> }
>> public void resumed(Object attachment) {
>> System.out.println("Resumed ...");
>> }
>> public void expired(Object attachment) {
>> }
>> },Suspend.AFTER);
>>
>> protocolChain.addFilter(suspendableFilter);
>>
>> Setting aside this error, here are some difficulties I found when I
>> tried to implement this for my HL7 filter.
>>
>> It is difficult to write some data to the connection from inside the
>> resumed() method. I had to do some workaround to bring the Context,
>> the responseBuffer inside the resumed() method so that I can use those
>> objects for writing. I thought it will be better if the resumed()
>> itself passes the Context as one of its argument.
>
> I see. A workaround for you is to probably add a Filter in front of the
> SuspendableFilter, but I don't like it (see below)
>
>
>>
>> The attachment is also not much useful. What I wanted is to bring the
>> Context object inside the resumed() method. But it is available only
>> from ProtocolChain. The ProtocolChain is not available when I invoke
>> suspend, so I cannot pass the context as attachment. I hope you can
>> understand.
>
> Agree. I need to work to support this inside the SuspendableFilter. I've
> missed that use case when you wabt to use the SuspendableHandler for I/O
> operation. Let me think of it...
>
>>
>> If the suspendable.resume() takes an object as parameter and it is
>> passed to resumed() it will be much better. I can use it for passing
>> the response data from the place where the response is available to
>> where it can be written.
>>
>> Let me know if there is much simpler way. Your help is much needed
>
> OK I don't want you to waste your time, so let me recap what you are
> trying to do and see if we can do it without being blocked by the
> "no-so-well-designed-SuspendedHandler". So for now, let's assume we
> aren't using the SuspendFilter. So, we have:
>
> ReadFilter -> HL7Filter
>
> Inside your Filter, what is important for you to do is to tell the
> SelectorHandler to not register the SelectionKey for any OP_READ when
> you are under a suspended more:
>
> ctx.setKeyRegistrationState(KeyRegistrationState.NONE);
>
> Now next problem is that as soon as the ProtocolChain complete the
> execution of its ProtocolFilter, the Context object is recycled and
> ready to be used by *another* transaction. To keep the "state" of the
> transaction, you can do WorkerThread.detach(), which return a
> ThreadAttachment object:
>
> https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/util/ThreadAttachment.html
>
>
> So far so good, as inside your HL7Filter, when you need to suspend a
> connection, you might just have to do:
>
> ctx.setKeyRegistrationState(KeyRegistrationState.NONE);
> ThreadAttachment ta = workerThread.detach();
> SelectionKey key = ctx.getSelectionKey();
>
> Then, store the ThreadAttachment somewhere, and when you are ready to
> resume, you can get back your ThreadAttachment and your SelectionKey,
> and do:
>
> ((SocketChannel)key.channel()).write(...);
>
> But that's too low level. So you can always use the OutputWriter to
> flush the data for you:
>
> https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/util/OutputWriter.html
>
>
> But that's not what you want to do because the write operation will be
> blocking, meaning Grizzly will use a temporary Selector pool to make
> sure all the bytes are written. It works quite well for the http
> protocol, but might not be performant for your protocol. That need to be
> tested.
>
> Now the best solution would have been to use an Async Write Queue...the
> problem is since the Context is the only Object that can be used for
> doing async write, we cannot use async queue :-(
>
> Long email to say that I will add a new API on the Context object that
> mark it none shareable amongst transaction. That way you don't have to
> do anything I've described above, but instead just do:
>
> Context.suspend()
>
> (Internally it will call
> ctx.setKeyRegistrationState(KeyRegistrationState.NONE))
>
> Then when store the Context and re-use it once you are ready, and get
> access to the async queue, attributes, etc that were associated with
> that object.
>
> Sorry long email :-)
>
> Thanks
>
> -- Jeanfrancois
>
>
>
>
>
>
>>
>> Thanks
>> Vishnu
>>
>>
>> Vishnuvardhan wrote:
>>>
>>> Thank you very much. I will let you know once it is done.
>>>
>>>
>>> Jeanfrancois Arcand wrote:
>>>> Salut,
>>>>
>>>> Vishnuvardhan wrote:
>>>>>
>>>>> Just to be more specific, can you please tell me if the below will
>>>>> work:
>>>>>
>>>>> //suspend() method is invoked with ctx as attachment. getting back
>>>>> inside the resumed method
>>>>> //inside the resumed method().
>>>>> public void resumed(Object attachment) {
>>>>> Context ctx = (Context) attachment;
>>>>>
>>>>> ctx.getAsyncQueueWritable().writeToAsyncQueue(responseBuffer);
>>>>> }
>>>>
>>>> Yes
>>>>
>>>>>
>>>>> P.S I need to get as much info before you go for long weekend.
>>>>> Please don't mind.
>>>>
>>>> I'm Quebecois not American ;-), so nooo long week-end :-)
>>>>
>>>> A+
>>>>
>>>> -- Jeanfrancois
>>>>
>>>>>
>>>>> Thanks
>>>>> Vishnu
>>>>>
>>>>> Jeanfrancois Arcand wrote:
>>>>>> Salut,
>>>>>>
>>>>>> Vishnuvardhan wrote:
>>>>>>> Hi Jeanfrancois,
>>>>>>>
>>>>>>> Thank you very much. I think that will help.
>>>>>>>
>>>>>>> A couple of doubts:
>>>>>>>
>>>>>>> 1. If I want to suspend indefinitely until resume() is invoked,
>>>>>>> can I send the timeout value as 0.
>>>>>>
>>>>>> Set it to -1.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> 2. Can I write the response back from resumed() method?
>>>>>>
>>>>>> Yes. You can always write when a connection is suspected/resumed.
>>>>>>
>>>>>> What is the
>>>>>>> correct way to write the response back? Also when I write the
>>>>>>> data back, the HL7 specific filter should be invoked for OP_WRITE
>>>>>>> so that I can wrap the data with hl7 protocol specific characters.
>>>>>>
>>>>>> You can probably use the Async Queue Writer so you don't need to
>>>>>> handle OP_WRITE directly:
>>>>>>
>>>>>> http://blogs.sun.com/oleksiys/entry/grizzly_1_7_0_presents
>>>>>>
>>>>>> A+
>>>>>>
>>>>>> - Jeanfrancois
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> -Vishnu
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>> Salut,
>>>>>>>>
>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>> Hi Jeanfrancois
>>>>>>>>>
>>>>>>>>> Thanks for your help. I looked at the SuspendableFilter. This
>>>>>>>>> filter suspends the execution based on the match of the string
>>>>>>>>> received. In my case, the input can be any HL7 message. I need
>>>>>>>>> to suspend after I receive the HL7 message.
>>>>>>>>>
>>>>>>>>> I need to have something like this:
>>>>>>>>>
>>>>>>>>> class SuspendableFilter
>>>>>>>>>
>>>>>>>>> //inside execute()
>>>>>>>>>
>>>>>>>>> //read the string received
>>>>>>>>> this.suspend();
>>>>>>>>> //continue execution after resuming..
>>>>>>>>>
>>>>>>>>> From some other method.
>>>>>>>>>
>>>>>>>>> suspendableFilter.resume().
>>>>>>>>>
>>>>>>>>> Again, thank you very much for considering my request.
>>>>>>>>
>>>>>>>> I think you can do that by extending the SuspendableFilter and
>>>>>>>> suspend on all requests, independently of the matching String.
>>>>>>>> Just make sure you are telling the SuspendableFilter to suspend
>>>>>>>> *After* executing the next ProtocolFilter in the protocol chain:
>>>>>>>>
>>>>>>>>> final ProtocolFilter readFilter = new ReadFilter();
>>>>>>>>> final SuspendableFilter suspendFilter = new
>>>>>>>>> SuspendableFilter();
>>>>>>>>> final ProtocolFilter hl7Filter = new HL7Filter();
>>>>>>>>> suspendable = suspendFilter.suspend("*", timeout, null,
>>>>>>>>> new SuspendableHandler() {
>>>>>>>>>
>>>>>>>>> public void interupted(Object attachment) {
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void resumed(Object attachment) {
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void expired(Object attachment) {
>>>>>>>>> }
>>>>>>>>> }, Suspend.AFTER);
>>>>>>>>
>>>>>>>> What this code snipped will do is read bytes, invoke your
>>>>>>>> HL7Filter, then suspend the connection.
>>>>>>>>
>>>>>>>> Would that help?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> -- Jeanfrancois
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Vishnu
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Jeanfrancois Arcand wrote:
>>>>>>>>>> Salut,
>>>>>>>>>>
>>>>>>>>>> [removing the dev alias]
>>>>>>>>>>
>>>>>>>>>> Vishnuvardhan wrote:
>>>>>>>>>>> Hi
>>>>>>>>>>>
>>>>>>>>>>> I am working on Open ESB HL7 binding component. We are in the
>>>>>>>>>>> processing of migrating the component from Apache Mina
>>>>>>>>>>> framework to Grizzly. I would like to know if I can do
>>>>>>>>>>> Asynchronous Request Processing with TCP using Grizzly.
>>>>>>>>>>>
>>>>>>>>>>> This is what I want to do.
>>>>>>>>>>> 1. Listen on a particular port.
>>>>>>>>>>> 2. Read the incoming message.
>>>>>>>>>>> 3. Parse the incoming message.
>>>>>>>>>>> 4. Submit the parsed message to the business processing layer.
>>>>>>>>>>> 5. Once a response is available, send back the response
>>>>>>>>>>> message to the same socket.
>>>>>>>>>>
>>>>>>>>>> Yes, you can. Take a look at the SuspendableFilter:
>>>>>>>>>>
>>>>>>>>>> https://grizzly.dev.java.net/nonav/apidocs/com/sun/grizzly/suspendable/SuspendableFilter.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is what I have done.
>>>>>>>>>>> 1. Created a Controller.
>>>>>>>>>>> 2. Added a protocol chain with ReadFilter and a parser filter.
>>>>>>>>>>> 3. Once the message is available, submitting the message to
>>>>>>>>>>> the business processing layer.
>>>>>>>>>>> The thread execution ends here.
>>>>>>>>>>>
>>>>>>>>>>> The response will be available from another thread. Now, I
>>>>>>>>>>> want to write the response message back to the same socket. I
>>>>>>>>>>> tried to cache the context from 1st thread and tried to use
>>>>>>>>>>> the same context for writing. But when I say
>>>>>>>>>>> ctx.getSelectionKey().channel(), the program hangs there.
>>>>>>>>>>>
>>>>>>>>>>> I went through the blog on Asynchronous Request Processing by
>>>>>>>>>>> Jean-Francois Arcand's Blog
>>>>>>>>>>> <http://weblogs.java.net/blog/jfarcand/>. The framework seems
>>>>>>>>>>> to be supporting only HTTP protocol.
>>>>>>>>>>>
>>>>>>>>>>> Can somebody guide me how to achieve this in TCP IP.
>>>>>>>>>>
>>>>>>>>>> Take a look at the SuspendableFilter as I think this is really
>>>>>>>>>> what you need. You can see some tests here:
>>>>>>>>>>
>>>>>>>>>> https://grizzly.dev.java.net/nonav/xref-test/com/sun/grizzly/SuspendableTest.html
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Let us know if that doesn't work. Your scenario is quite
>>>>>>>>>> common and I can certainly help improving the
>>>>>>>>>> SuspendableFilter to makes it work for you :-)
>>>>>>>>>>
>>>>>>>>>> A+
>>>>>>>>>>
>>>>>>>>>> -- Jeanfrancois
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>> Vishnu
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Vishnuvardhan Piskalaramesh
>>>>>>>>>>> Sun's Open ESB Community (http://open-esb.org)
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>
>>>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>
>>>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>
>>>>
>>>
>>>
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>