users@grizzly.java.net

Re: Implementing custom StreamAlgorithm

From: Eric Heaton <eheaton_at_iii.com>
Date: Thu, 21 Jun 2007 09:55:29 -0700

Jeanfrancois,

As it turns out, I decided to follow D.J.'s suggestion and instead store my
partially-read message as an attachment on the SelectionKey. This didn't
work initially, because the standard cleartext ReadFilter that I was using
earlier in the ProtocolChain was doing a key.attach(null) first thing in its
execute() method. So, I replaced this ReadFilter with another version of a
"read" filter that didn't remove the SelectionKey attachment, and things are
working just peachy now.

I don't know if my ProtocolFilter is ready for prime-time yet, since I've
got all of my protocol-specific parsing code inside the MesasgeFilter. Other
than that, it looks very much like D.J.'s implementation (thanks again,
D.J.!). The ideal thing might be to extract all of the protocol-specific
parsing into a MessageParser interface that could be injected into the
MessageFilter... I'll try and think about this after my deadline.

I wonder if the "correct" thing for me to be doing is to store state across
calls to execute() in the SelectionKey? My initial idea to use a custom
WorkerThreadImpl that held on to the state wouldn't work, right, because the
WorkerThreads are not assigned to a specific SelectionKey (or are they?).

-Eric

----- Original Message -----
From: "Jeanfrancois Arcand" <Jeanfrancois.Arcand_at_Sun.COM>
To: <users_at_grizzly.dev.java.net>
Sent: Wednesday, June 20, 2007 8:43 AM
Subject: Re: Implementing custom StreamAlgorithm


> Hi Eric,
>
> Eric Heaton wrote:
>> D.J.,
>>
>> This is exactly the response I needed! I'll start with the Controller
>> from the 'grizzly' framework and implement a custom ProtocolFiler as you
>> describe below. I don't need SSL support, so for partial reads, I think
>> I'll instead try a custom Pipeline that creates a custom WorkerThreadImpl
>> that contains the partially read message header/body.
>
> Let us know the result and if you think your Filter can be donated to the
> community, feel free to send it :-)
>
> Thanks!
>
> -- Jeanfrancois
>
>
>>
>> Thanks!
>> -Eric
>>
>> ----- Original Message ----- From: "D. J. Hagberg (Sun)"
>> <Dj.Hagberg_at_Sun.COM>
>> To: <users_at_grizzly.dev.java.net>
>> Sent: Tuesday, June 19, 2007 4:28 PM
>> Subject: Re: Implementing custom StreamAlgorithm
>>
>>
>>> Eric Heaton wrote:
>>> > I wish to implement a custom StreamAlgorithm whose parse() method can
>>> > potentially return false if the incoming request has not been received
>>> > completely into the input buffer. (For the curious, I'm trying to
>>> > write a custom protocol that handles the Typed Parameter Language
>>> > (tpl.sourceforge.net)).
>>> >
>>> > However, when my StreamAlgorithm's parse() method returns false, the
>>> > request is not processed. I tried inspecting the
>>> > ContentLengthAlgorithm's parse() method, but found that this does not
>>> > seem to work either. I create a simple SelectorThread for the
>>> > ContentLengthAlgorithm and POST a request that's greater than the
>>> > default buffer size of 8192 bytes. The request is never processed.
>>>
>>> Eric --
>>>
>>> I hope to post my design work up on my blog soon
>>> (http://blogs.sun.com/djhagberg) but I decided to cut out the
>>> StreamAlgorithm pattern in my application (it seemed like it was more
>>> tightly coupled to http-type protocols) and went with the lower-level
>>> ProtocolFilter API. I have the SSLReadFilter in front of this to decode
>>> SSL into cleartext bytes...
>>>
>>> My Message's have a fixed-length MessageHeader followed by a
>>> variable-length byte[] content.
>>>
>>> There are a few methods here that deal with partial message reads:
>>>
>>> - prepareParseBuffer prepends any remaining bytes from the last read
>>> onto the current ByteBuffer.
>>>
>>> - checkRemainderHeader checks for a previously-read MessageHeader
>>>
>>> - clearRemainder does what it says
>>>
>>> - preserveRemainder preserves any leftover MessageHeader or partially-
>>> read bytes.
>>>
>>> I can't say this code has passed all regression/unit tests yet but it
>>> might help.
>>>
>>> Note it also depends on an SSLSession to act as a Map for partial reads.
>>> If you don't have an SSLSession, you may have something else attached to
>>> or associated with your SelectionKey that could be used to stow
>>> remaining stuff...
>>>
>>> public class MessageProtocolFilter implements ProtocolFilter {
>>> . . . statics, logger, etc. . . .
>>> public static final String REMBUF = "REMBUF";
>>> public static final Stirng REMHEAD = "REMHEAD";
>>>
>>> /**
>>> * Decode raw cleartext bytes into one or more Message's
>>> * if possible, and hand off to dispatcher to do real work.
>>> */
>>> public boolean execute(Context ctx) {
>>> WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>>> currentThread();
>>> SSLSession session = workerThread.getSSLEngine().getSession();
>>> ByteBuffer buf = prepareParseBuffer(workerThread, session);
>>>
>>> // Keep parsing while there is a possibility of decoding msgs
>>> int remain;
>>> boolean hadError = false;
>>> Message m = null;
>>> MessageHeader mh = checkRemainderHeader(session);
>>> while( (remain=buf.limit()-buf.position()) > 0 ) {
>>>
>>> // Have enough bytes to read a new header?
>>> if( mh == null && remain < MessageHeader.SIZE ) {
>>> break;
>>>
>>> // Read a new header if we need it
>>> } else if( tmh == null ) {
>>> mh = MessageHeader.readHeader(buf);
>>> // check if un-parseable header
>>> if( !isValidHeader(mh) ) {
>>> log.severe("Invalid msg header: "+mh);
>>> hadError = true;
>>> break;
>>> }
>>> }
>>>
>>> // check if remainder is too small for message contents
>>> remain = buf.limit()-buf.position();
>>> if( remain < mh.getContentLength() ) {
>>> break;
>>> }
>>>
>>> // build message with header content bytes, dispatch msg
>>> byte[] body = new byte[mh.getContentLength()];
>>> buf.get(body);
>>> m = new Message(mh, body);
>>> if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>>> hadError = true;
>>> break;
>>> }
>>>
>>> m = null;
>>> mh = null;
>>> }
>>>
>>> // If we had an error, need to shut down socket connection
>>> if( hadError ) {
>>> ctx.setKeyRegistrationState(Context.
>>> KeyRegistrationState.CANCEL);
>>> clearRemainder(session);
>>> } else {
>>> ctx.setKeyRegistrationState(Context.
>>> KeyRegistrationState.REGISTER);
>>> preserveRemainder(session, buf, mh);
>>> }
>>>
>>> // Ensure the "working" buffer is cleared before returning
>>> workerThread.getByteBuffer().clear();
>>>
>>> return !hadError;
>>> }
>>>
>>> public boolean postExecute(Context ctx){
>>> return true;
>>> }
>>>
>>> /**
>>> * Check if there are any remaining bytes from a previous read
>>> * that just had a partial message header or message content.
>>> */
>>> private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
>>> SSLSession session) {
>>> ByteBuffer bb = workerThread.getByteBuffer();
>>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>> if( rem != null && rem.position() > 0 ) {
>>> // concatenate remainder with new bytes to be parsed
>>> bb.flip();
>>> int cnt = bb.limit() - bb.position();
>>> int space = rem.limit() - rem.position();
>>> // Make remainder buffer bigger if necessary
>>> if( space < cnt ) {
>>> ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>>> cnt + 512);
>>> rem.flip();
>>> rem2.put(rem);
>>> rem.clear();
>>> rem = rem2;
>>> session.putValue(REMBUF, rem);
>>> }
>>> rem.put(bb);
>>> bb.clear();
>>> bb = rem;
>>> }
>>> bb.flip(); // prepare for read
>>> return bb;
>>> }
>>>
>>> /**
>>> * Check if last time we were able to decode a header but not
>>> * able to read the full message contents.
>>> */
>>> private MessageHeader checkRemainderHeader(SSLSession session) {
>>> MessageHeader mh;
>>> mh = (MessageHeader)session.getValue(REMHEAD);
>>> if( mh != null ) {
>>> session.removeValue(REMHEAD);
>>> }
>>> return mh;
>>> }
>>>
>>> /**
>>> * Remove any "remainder" objects from the SSL session,
>>> * in preparation for shutting down the connection.
>>> */
>>> private void clearRemainder(SSLSession session) {
>>> session.removeValue(REMHEAD);
>>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>> if( rem != null ) {
>>> rem.clear();
>>> session.removeValue(REMBUF);
>>> }
>>> }
>>>
>>> /**
>>> * Preserve any partially-read bytes or header and partial content.
>>> */
>>> private void preserveRemainder(SSLSession session, ByteBuffer bb,
>>> MessageHeader mh) {
>>> if( mh == null ) {
>>> session.removeValue(REMHEAD);
>>> } else {
>>> session.putValue(REMHEAD, mh);
>>> }
>>> int remainCnt = bb.limit()-bb.position();
>>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>> if( rem != null && rem == bb ) {
>>> bb.compact(); // automatically prepares for next append
>>> } else if( rem != null && remainCnt == 0 ) {
>>> rem.clear();
>>> } else if( rem != null ) {
>>> if( rem.limit() != rem.capacity() ) {
>>> rem.flip(); // prepare for appending
>>> }
>>> if( remainCnt > (rem.limit()-rem.position()) ) {
>>> ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>>> remainCnt + 1024);
>>> rem.flip(); // switch to read mode
>>> rem2.put(rem);
>>> rem.clear();
>>> rem = rem2;
>>> }
>>> rem.put(bb);
>>> } else if( remainCnt > 0 ) {
>>> int alloc = 16 * 1024;
>>> if( alloc < remainCnt ) {
>>> alloc = remainCnt + 2*1024;
>>> }
>>> rem = ByteBuffer.allocate(alloc);
>>> rem.put(bb);
>>> }
>>> if( rem != null ) {
>>> session.putValue(REMBUF, rem);
>>> }
>>> }
>>>
>>> . . . More housekeeping methods, logging, etc ...
>>> }
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
>