users@grizzly.java.net

Re: Need help implementing ProtocolParser

From: ash2k! <ash2kk_at_gmail.com>
Date: Thu, 24 Apr 2008 01:17:41 -0700 (PDT)

Hello, it's me again)

I want to thank everyone who helped me writing this class. Let this code be
an example of how packet based interaction can be implemented using grizzly:

import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.WorkerThread;
import java.nio.ByteBuffer;
import java.util.logging.Level;

/**
 *
 * @author Ash2k
 */
public class A1ProtocolParser implements ProtocolParser<IncomingPacket> {

    static final int HEADERLENGTH = 18;
    static final int STARTMARK = 0x12345678;
    public static volatile int maximumPacketSize = 1024 * 1024;
    private ByteBuffer byteBuffer;
    private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact =
true;
    private IncomingPacket finishedPacket;
    private final ProgressCallback progressCallback;

    public A1ProtocolParser(final ProgressCallback progressCallback) {
        this.progressCallback = progressCallback;
    } //A1ProtocolParser

    public boolean isExpectingMoreData() {
        return isExpectingMoreData;
    } //isExpectingMoreData

    public boolean hasMoreBytesToParse() {
        return hasMoreBytesToParse;
    } //hasMoreBytesToParse

    public IncomingPacket getNextMessage() {
        IncomingPacket ip = finishedPacket;
        finishedPacket = null;
        return ip;
    } //getNextMessage

    public boolean hasNextMessage() {
        if (byteBuffer.remaining() < HEADERLENGTH) {
            isExpectingMoreData = true;
            hasMoreBytesToParse = byteBuffer.hasRemaining();
            return false;
        }
        byteBuffer.mark();
        int startmark = byteBuffer.getInt();
        if (startmark != STARTMARK) {
            progressCallback.log(Level.SEVERE, "Bad STARTMARK!");
            //TODO: close connection
            return false;
        }
        ByteBuffer incomingPacketBuffer;
        IncomingPacket incomingPacket;

        incomingPacket = new IncomingPacket(byteBuffer.getShort());
        incomingPacket.attachment = byteBuffer.getLong();
        int len = byteBuffer.getInt();
        if (len == 0) {
            isExpectingMoreData = false;
            hasMoreBytesToParse = byteBuffer.hasRemaining();
            finishedPacket = incomingPacket;

            return true;
        } else if (len + HEADERLENGTH > maximumPacketSize) {
            //TODO: close connection
            progressCallback.log(Level.SEVERE, String.format(
                    "Received packet size (%d) is greater than maximum allowed packet size
(%d)",
                    len + HEADERLENGTH,
                    maximumPacketSize));
            return false;
        } else if (len < 0) {
            //TODO: close connection
            progressCallback.log(
                    Level.SEVERE,
                    String.format("Received packet size is %d", len));

            return false;
        }
        if (byteBuffer.remaining() >= len) {
            //we have enough data in buffer to fill this message
            incomingPacket.data = new byte[len];
            incomingPacketBuffer = ByteBuffer.wrap(incomingPacket.data);

            int limit = byteBuffer.limit();
            byteBuffer.limit(byteBuffer.position() + len);
            incomingPacketBuffer.put(byteBuffer);
            byteBuffer.limit(limit);

            isExpectingMoreData = false;
            finishedPacket = incomingPacket;
            hasMoreBytesToParse = byteBuffer.hasRemaining();
            if (hasMoreBytesToParse) {
                doCompact = true;
            } else {
                byteBuffer.clear();
                doCompact = false;
            }
        } else if (byteBuffer.capacity() - byteBuffer.position() < len) {
            //we dont have enough room for data - lets allocate a bigger buffer
            ByteBuffer newBuffer = ByteBufferFactory.allocateView(HEADERLENGTH +
len, false);
            byteBuffer.reset();
            newBuffer.put(byteBuffer);
            ((WorkerThread) Thread.currentThread()).setByteBuffer(byteBuffer =
newBuffer);
            hasMoreBytesToParse = false;
            isExpectingMoreData = true;
            doCompact = false;
        } else {
            //we just hadn't received all the data yet
            byteBuffer.position(byteBuffer.limit());
            byteBuffer.limit(byteBuffer.capacity());
            doCompact = false;
            isExpectingMoreData = true;
            hasMoreBytesToParse = false;
        }
        return !isExpectingMoreData;
    } //hasNextMessage

    public void startBuffer(ByteBuffer bb) {
        byteBuffer = bb;
        byteBuffer.flip();
    } //startBuffer

    public boolean releaseBuffer() {
        if (doCompact) {
            byteBuffer.compact();
        }
        byteBuffer = null;

        return false;
    } //releaseBuffer
}



Erik Svensson-3 wrote:
>
> Howdy!
>
>> Hello Erik,
>>>>
>>
>>>> As workaround, I can propose you to not keep retain_buffer in your
>>>> ProtocolParser.
>>>> This change will make your ProtocolParser stateless, and not
>>>> depend on
>>>> fact, that same instance will be called for next request on the same
>>>> channel.
>>>>
>>>> The idea is to use just single buffer (saved). Please take a look at
>>>> code bellow. I didn't test it, as don't have your testcase, but you
>>>> can see the idea there.
>>>
>>> In other words you move the state into the byte buffer associated
>>> with the
>>> the current thread.
>> Correct. We can not guarantee that ProtocolFilter will be the same,
>> but ByteBuffer will be.
>> Also, hope, solution with just single buffer can perform faster, as
>> less copy operations will be executed :)
>>
>>>
>>>
>>>> If you will need more help - please send your complete testcase
>>>> code,
>>>> so I'll be able to run it.
>>>
>>> I'll test it out with my code. If the currentThread is the same
>>> thread that
>>> receives the next read from the relevant socket it will work.
>> Hope so :)
>
> I've now tested it and it works. I didn't copy your code straight
> off, I rewrote it a bit (attaching)
> I avoided the compact() in extractMessage(), doing it in releaseBuffer
> () instead.
> But it works with my test code. I've sent messages 20k and 25k large
> and the parser extends the buffer
> each time. In the 25k example it extends the buffer twice.
>
> Here's the code:
>
> package se.phlogiston.grizzly.overflow;
>
> import se.phlogiston.grizzly.tutorial2.SimpleMessage;
>
> import java.nio.ByteBuffer;
>
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
>
> /**
> * Created by IntelliJ IDEA.
> * User: erik
> * Date: Mar 14, 2008
> * Time: 10:05:24 PM
> * To change this template use File | Settings | File Templates.
> */
> public class ProtocolParser implements
> com.sun.grizzly.ProtocolParser<SimpleMessage> {
>
> private ByteBuffer saved;
> private int position;
> private int limit;
> private SimpleMessage mess;
> private boolean wants_more_data = false;
> private boolean has_unparsed_data = false;
>
> public ProtocolParser() {
> }
>
> public boolean isExpectingMoreData() {
> return wants_more_data;
> }
>
> public boolean hasMoreBytesToParse() {
> return has_unparsed_data;
> }
>
> public SimpleMessage getNextMessage() {
> // the call pattern should be:
> // hasNextMessage() == true
> // getNextMessage()
> // but if not:
> if (null == mess) {
> mess = extractMessage();
> }
> SimpleMessage tmp = mess;
> mess = null;
>
> return tmp;
> }
>
> public boolean hasNextMessage() {
>
> if (null == mess) {
> mess = extractMessage();
> }
>
> return ( mess != null);
> }
>
> public void startBuffer(ByteBuffer bb) {
> saved = bb;
> saved.flip(); // flip the byte buffer for reading. (should have
> been done already)
> }
>
> public boolean releaseBuffer() {
> if (wants_more_data) {
> saved.compact();
> // check to see if we made room in the buffer...
> // compact sets the position at the end of the written/moved
> bytes so remaining()
> // shows how much more we can stuff into the buffer.
> if (saved.remaining() == 0 ) {
> reallocateSaved();
> }
> } else {
> saved.clear();
> }
> // Grizzly doesn't seem to care (and doesn't when you look at
> the code) but we return a factual value anyhoo
> return wants_more_data;
> }
>
>
> private void reallocateSaved() {
> saved.position(position);
> ByteBuffer tmpBuffer =
> ByteBufferFactory.allocateView(saved.capacity() * 2,
> false);
> tmpBuffer.put(saved);
> saved = tmpBuffer;
> ((WorkerThread) Thread.currentThread()).setByteBuffer(saved);
> }
>
> private SimpleMessage extractMessage() {
> SimpleMessage message = null;
>
> if (!saved.hasRemaining()) {
> // there are no more readable bytes in the buffer
> // this should mean that we've read all messages in the buffer
> wants_more_data = false;
> has_unparsed_data = false;
> saved.clear(); // clean up our byte buffer. No one else will
> do it for us
> return message;
> }
> // save the position before we send it off to the parser
> position = saved.position();
> message = SimpleMessage.parse(saved);
> if (message == null) {
> // not enough bytes for a message
> // but we know there are bytes so there must be
> // an incomplete message there
> if (saved.position() != position) {
> saved.position(position);
> }
> wants_more_data = true;
> has_unparsed_data = false;
> } else {
> // here we have a complete message
> if (saved.limit() == saved.position()) {
> // the end of the buffer is reached.
> wants_more_data = false;
> has_unparsed_data = false;
> } else {
> has_unparsed_data = true;
> // here we can't accuratly set wants_more_data.
> // since we don't know if the bytes in the buffer is a
> // complete message or not
> }
> }
> return message;
> }
> }
>
>
> cheers
>
> /Erik
>
>
>
>
>

-- 
View this message in context: http://www.nabble.com/Need-help-implementing-ProtocolParser-tp16091789p16847242.html
Sent from the Grizzly - Users mailing list archive at Nabble.com.