users@grizzly.java.net

Re: Implementing custom StreamAlgorithm

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Fri, 22 Jun 2007 10:26:36 -0400

Hi Eric

Eric Heaton wrote:
> Jeanfrancois,
>
> As it turns out, I decided to follow D.J.'s suggestion and instead store
> my partially-read message as an attachment on the SelectionKey. This
> didn't work initially, because the standard cleartext ReadFilter that I
> was using earlier in the ProtocolChain was doing a key.attach(null)
> first thing in its execute() method. So, I replaced this ReadFilter with
> another version of a "read" filter that didn't remove the SelectionKey
> attachment, and things are working just peachy now.
>
> I don't know if my ProtocolFilter is ready for prime-time yet, since
> I've got all of my protocol-specific parsing code inside the
> MesasgeFilter. Other than that, it looks very much like D.J.'s
> implementation (thanks again, D.J.!). The ideal thing might be to
> extract all of the protocol-specific parsing into a MessageParser
> interface that could be injected into the MessageFilter... I'll try and
> think about this after my deadline.
>
> I wonder if the "correct" thing for me to be doing is to store state
> across calls to execute() in the SelectionKey?

Yes it is, but make sure the object attached are removed when the key is
canceled, to avoid potential memory leak:

http://weblogs.java.net/blog/jfarcand/archive/2006/06/tricks_and_tips.html

  My initial idea to use a
> custom WorkerThreadImpl that held on to the state wouldn't work, right,
> because the WorkerThreads are not assigned to a specific SelectionKey
> (or are they?).

Yes that's a good idea. I will propose a generic way today to handle
partial read and attachment to SelectionKey. If all goes well I should
have it be the end of the day so you should be able to try it. The
SailFin project is asking for weeks about that feature :-)

Thanks for the feedback!

-- Jeanfrancois


>
> -Eric
>
> ----- Original Message ----- From: "Jeanfrancois Arcand"
> <Jeanfrancois.Arcand_at_Sun.COM>
> To: <users_at_grizzly.dev.java.net>
> Sent: Wednesday, June 20, 2007 8:43 AM
> Subject: Re: Implementing custom StreamAlgorithm
>
>
>> Hi Eric,
>>
>> Eric Heaton wrote:
>>> D.J.,
>>>
>>> This is exactly the response I needed! I'll start with the Controller
>>> from the 'grizzly' framework and implement a custom ProtocolFiler as
>>> you describe below. I don't need SSL support, so for partial reads, I
>>> think I'll instead try a custom Pipeline that creates a custom
>>> WorkerThreadImpl that contains the partially read message header/body.
>>
>> Let us know the result and if you think your Filter can be donated to
>> the community, feel free to send it :-)
>>
>> Thanks!
>>
>> -- Jeanfrancois
>>
>>
>>>
>>> Thanks!
>>> -Eric
>>>
>>> ----- Original Message ----- From: "D. J. Hagberg (Sun)"
>>> <Dj.Hagberg_at_Sun.COM>
>>> To: <users_at_grizzly.dev.java.net>
>>> Sent: Tuesday, June 19, 2007 4:28 PM
>>> Subject: Re: Implementing custom StreamAlgorithm
>>>
>>>
>>>> Eric Heaton wrote:
>>>> > I wish to implement a custom StreamAlgorithm whose parse() method can
>>>> > potentially return false if the incoming request has not been
>>>> received
>>>> > completely into the input buffer. (For the curious, I'm trying to
>>>> > write a custom protocol that handles the Typed Parameter Language
>>>> > (tpl.sourceforge.net)).
>>>> >
>>>> > However, when my StreamAlgorithm's parse() method returns false, the
>>>> > request is not processed. I tried inspecting the
>>>> > ContentLengthAlgorithm's parse() method, but found that this does not
>>>> > seem to work either. I create a simple SelectorThread for the
>>>> > ContentLengthAlgorithm and POST a request that's greater than the
>>>> > default buffer size of 8192 bytes. The request is never processed.
>>>>
>>>> Eric --
>>>>
>>>> I hope to post my design work up on my blog soon
>>>> (http://blogs.sun.com/djhagberg) but I decided to cut out the
>>>> StreamAlgorithm pattern in my application (it seemed like it was
>>>> more tightly coupled to http-type protocols) and went with the
>>>> lower-level ProtocolFilter API. I have the SSLReadFilter in front
>>>> of this to decode SSL into cleartext bytes...
>>>>
>>>> My Message's have a fixed-length MessageHeader followed by a
>>>> variable-length byte[] content.
>>>>
>>>> There are a few methods here that deal with partial message reads:
>>>>
>>>> - prepareParseBuffer prepends any remaining bytes from the last read
>>>> onto the current ByteBuffer.
>>>>
>>>> - checkRemainderHeader checks for a previously-read MessageHeader
>>>>
>>>> - clearRemainder does what it says
>>>>
>>>> - preserveRemainder preserves any leftover MessageHeader or partially-
>>>> read bytes.
>>>>
>>>> I can't say this code has passed all regression/unit tests yet but
>>>> it might help.
>>>>
>>>> Note it also depends on an SSLSession to act as a Map for partial
>>>> reads. If you don't have an SSLSession, you may have something else
>>>> attached to or associated with your SelectionKey that could be used
>>>> to stow remaining stuff...
>>>>
>>>> public class MessageProtocolFilter implements ProtocolFilter {
>>>> . . . statics, logger, etc. . . .
>>>> public static final String REMBUF = "REMBUF";
>>>> public static final Stirng REMHEAD = "REMHEAD";
>>>>
>>>> /**
>>>> * Decode raw cleartext bytes into one or more Message's
>>>> * if possible, and hand off to dispatcher to do real work.
>>>> */
>>>> public boolean execute(Context ctx) {
>>>> WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>>>> currentThread();
>>>> SSLSession session = workerThread.getSSLEngine().getSession();
>>>> ByteBuffer buf = prepareParseBuffer(workerThread, session);
>>>>
>>>> // Keep parsing while there is a possibility of decoding msgs
>>>> int remain;
>>>> boolean hadError = false;
>>>> Message m = null;
>>>> MessageHeader mh = checkRemainderHeader(session);
>>>> while( (remain=buf.limit()-buf.position()) > 0 ) {
>>>>
>>>> // Have enough bytes to read a new header?
>>>> if( mh == null && remain < MessageHeader.SIZE ) {
>>>> break;
>>>>
>>>> // Read a new header if we need it
>>>> } else if( tmh == null ) {
>>>> mh = MessageHeader.readHeader(buf);
>>>> // check if un-parseable header
>>>> if( !isValidHeader(mh) ) {
>>>> log.severe("Invalid msg header: "+mh);
>>>> hadError = true;
>>>> break;
>>>> }
>>>> }
>>>>
>>>> // check if remainder is too small for message contents
>>>> remain = buf.limit()-buf.position();
>>>> if( remain < mh.getContentLength() ) {
>>>> break;
>>>> }
>>>>
>>>> // build message with header content bytes, dispatch msg
>>>> byte[] body = new byte[mh.getContentLength()];
>>>> buf.get(body);
>>>> m = new Message(mh, body);
>>>> if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>>>> hadError = true;
>>>> break;
>>>> }
>>>>
>>>> m = null;
>>>> mh = null;
>>>> }
>>>>
>>>> // If we had an error, need to shut down socket connection
>>>> if( hadError ) {
>>>> ctx.setKeyRegistrationState(Context.
>>>> KeyRegistrationState.CANCEL);
>>>> clearRemainder(session);
>>>> } else {
>>>> ctx.setKeyRegistrationState(Context.
>>>> KeyRegistrationState.REGISTER);
>>>> preserveRemainder(session, buf, mh);
>>>> }
>>>>
>>>> // Ensure the "working" buffer is cleared before returning
>>>> workerThread.getByteBuffer().clear();
>>>>
>>>> return !hadError;
>>>> }
>>>>
>>>> public boolean postExecute(Context ctx){
>>>> return true;
>>>> }
>>>>
>>>> /**
>>>> * Check if there are any remaining bytes from a previous read
>>>> * that just had a partial message header or message content.
>>>> */
>>>> private ByteBuffer prepareParseBuffer(WorkerThreadImpl
>>>> workerThread,
>>>> SSLSession session) {
>>>> ByteBuffer bb = workerThread.getByteBuffer();
>>>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>> if( rem != null && rem.position() > 0 ) {
>>>> // concatenate remainder with new bytes to be parsed
>>>> bb.flip();
>>>> int cnt = bb.limit() - bb.position();
>>>> int space = rem.limit() - rem.position();
>>>> // Make remainder buffer bigger if necessary
>>>> if( space < cnt ) {
>>>> ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>>>> cnt + 512);
>>>> rem.flip();
>>>> rem2.put(rem);
>>>> rem.clear();
>>>> rem = rem2;
>>>> session.putValue(REMBUF, rem);
>>>> }
>>>> rem.put(bb);
>>>> bb.clear();
>>>> bb = rem;
>>>> }
>>>> bb.flip(); // prepare for read
>>>> return bb;
>>>> }
>>>>
>>>> /**
>>>> * Check if last time we were able to decode a header but not
>>>> * able to read the full message contents.
>>>> */
>>>> private MessageHeader checkRemainderHeader(SSLSession session) {
>>>> MessageHeader mh;
>>>> mh = (MessageHeader)session.getValue(REMHEAD);
>>>> if( mh != null ) {
>>>> session.removeValue(REMHEAD);
>>>> }
>>>> return mh;
>>>> }
>>>>
>>>> /**
>>>> * Remove any "remainder" objects from the SSL session,
>>>> * in preparation for shutting down the connection.
>>>> */
>>>> private void clearRemainder(SSLSession session) {
>>>> session.removeValue(REMHEAD);
>>>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>> if( rem != null ) {
>>>> rem.clear();
>>>> session.removeValue(REMBUF);
>>>> }
>>>> }
>>>>
>>>> /**
>>>> * Preserve any partially-read bytes or header and partial content.
>>>> */
>>>> private void preserveRemainder(SSLSession session, ByteBuffer bb,
>>>> MessageHeader mh) {
>>>> if( mh == null ) {
>>>> session.removeValue(REMHEAD);
>>>> } else {
>>>> session.putValue(REMHEAD, mh);
>>>> }
>>>> int remainCnt = bb.limit()-bb.position();
>>>> ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>> if( rem != null && rem == bb ) {
>>>> bb.compact(); // automatically prepares for next append
>>>> } else if( rem != null && remainCnt == 0 ) {
>>>> rem.clear();
>>>> } else if( rem != null ) {
>>>> if( rem.limit() != rem.capacity() ) {
>>>> rem.flip(); // prepare for appending
>>>> }
>>>> if( remainCnt > (rem.limit()-rem.position()) ) {
>>>> ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>>>> remainCnt + 1024);
>>>> rem.flip(); // switch to read mode
>>>> rem2.put(rem);
>>>> rem.clear();
>>>> rem = rem2;
>>>> }
>>>> rem.put(bb);
>>>> } else if( remainCnt > 0 ) {
>>>> int alloc = 16 * 1024;
>>>> if( alloc < remainCnt ) {
>>>> alloc = remainCnt + 2*1024;
>>>> }
>>>> rem = ByteBuffer.allocate(alloc);
>>>> rem.put(bb);
>>>> }
>>>> if( rem != null ) {
>>>> session.putValue(REMBUF, rem);
>>>> }
>>>> }
>>>>
>>>> . . . More housekeeping methods, logging, etc ...
>>>> }
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>
>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>