users@grizzly.java.net

Re: Need help implementing ProtocolParser

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Thu, 24 Apr 2008 10:01:19 -0400

Salut,

would you be interested to blog about your experience? Or would you
agree if we use your class as a sample? What I will need to do is to
document your class and explain what it does, and then we gonna refer
people to it :-)

Thanks!

-- Jeanfrancois

ash2k! wrote:
> Hello, it's me again)
>
> I want to thank everyone who helped me writing this class. Let this code be
> an example of how packet based interaction can be implemented using grizzly:
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
> import java.nio.ByteBuffer;
> import java.util.logging.Level;
>
> /**
> *
> * @author Ash2k
> */
> public class A1ProtocolParser implements ProtocolParser<IncomingPacket> {
>
> static final int HEADERLENGTH = 18;
> static final int STARTMARK = 0x12345678;
> public static volatile int maximumPacketSize = 1024 * 1024;
> private ByteBuffer byteBuffer;
> private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact =
> true;
> private IncomingPacket finishedPacket;
> private final ProgressCallback progressCallback;
>
> public A1ProtocolParser(final ProgressCallback progressCallback) {
> this.progressCallback = progressCallback;
> } //A1ProtocolParser
>
> public boolean isExpectingMoreData() {
> return isExpectingMoreData;
> } //isExpectingMoreData
>
> public boolean hasMoreBytesToParse() {
> return hasMoreBytesToParse;
> } //hasMoreBytesToParse
>
> public IncomingPacket getNextMessage() {
> IncomingPacket ip = finishedPacket;
> finishedPacket = null;
> return ip;
> } //getNextMessage
>
> public boolean hasNextMessage() {
> if (byteBuffer.remaining() < HEADERLENGTH) {
> isExpectingMoreData = true;
> hasMoreBytesToParse = byteBuffer.hasRemaining();
> return false;
> }
> byteBuffer.mark();
> int startmark = byteBuffer.getInt();
> if (startmark != STARTMARK) {
> progressCallback.log(Level.SEVERE, "Bad STARTMARK!");
> //TODO: close connection
> return false;
> }
> ByteBuffer incomingPacketBuffer;
> IncomingPacket incomingPacket;
>
> incomingPacket = new IncomingPacket(byteBuffer.getShort());
> incomingPacket.attachment = byteBuffer.getLong();
> int len = byteBuffer.getInt();
> if (len == 0) {
> isExpectingMoreData = false;
> hasMoreBytesToParse = byteBuffer.hasRemaining();
> finishedPacket = incomingPacket;
>
> return true;
> } else if (len + HEADERLENGTH > maximumPacketSize) {
> //TODO: close connection
> progressCallback.log(Level.SEVERE, String.format(
> "Received packet size (%d) is greater than maximum allowed packet size
> (%d)",
> len + HEADERLENGTH,
> maximumPacketSize));
> return false;
> } else if (len < 0) {
> //TODO: close connection
> progressCallback.log(
> Level.SEVERE,
> String.format("Received packet size is %d", len));
>
> return false;
> }
> if (byteBuffer.remaining() >= len) {
> //we have enough data in buffer to fill this message
> incomingPacket.data = new byte[len];
> incomingPacketBuffer = ByteBuffer.wrap(incomingPacket.data);
>
> int limit = byteBuffer.limit();
> byteBuffer.limit(byteBuffer.position() + len);
> incomingPacketBuffer.put(byteBuffer);
> byteBuffer.limit(limit);
>
> isExpectingMoreData = false;
> finishedPacket = incomingPacket;
> hasMoreBytesToParse = byteBuffer.hasRemaining();
> if (hasMoreBytesToParse) {
> doCompact = true;
> } else {
> byteBuffer.clear();
> doCompact = false;
> }
> } else if (byteBuffer.capacity() - byteBuffer.position() < len) {
> //we dont have enough room for data - lets allocate a bigger buffer
> ByteBuffer newBuffer = ByteBufferFactory.allocateView(HEADERLENGTH +
> len, false);
> byteBuffer.reset();
> newBuffer.put(byteBuffer);
> ((WorkerThread) Thread.currentThread()).setByteBuffer(byteBuffer =
> newBuffer);
> hasMoreBytesToParse = false;
> isExpectingMoreData = true;
> doCompact = false;
> } else {
> //we just hadn't received all the data yet
> byteBuffer.position(byteBuffer.limit());
> byteBuffer.limit(byteBuffer.capacity());
> doCompact = false;
> isExpectingMoreData = true;
> hasMoreBytesToParse = false;
> }
> return !isExpectingMoreData;
> } //hasNextMessage
>
> public void startBuffer(ByteBuffer bb) {
> byteBuffer = bb;
> byteBuffer.flip();
> } //startBuffer
>
> public boolean releaseBuffer() {
> if (doCompact) {
> byteBuffer.compact();
> }
> byteBuffer = null;
>
> return false;
> } //releaseBuffer
> }
>
>
>
> Erik Svensson-3 wrote:
>> Howdy!
>>
>>> Hello Erik,
>>>>> As workaround, I can propose you to not keep retain_buffer in your
>>>>> ProtocolParser.
>>>>> This change will make your ProtocolParser stateless, and not
>>>>> depend on
>>>>> fact, that same instance will be called for next request on the same
>>>>> channel.
>>>>>
>>>>> The idea is to use just single buffer (saved). Please take a look at
>>>>> code bellow. I didn't test it, as don't have your testcase, but you
>>>>> can see the idea there.
>>>> In other words you move the state into the byte buffer associated
>>>> with the
>>>> the current thread.
>>> Correct. We can not guarantee that ProtocolFilter will be the same,
>>> but ByteBuffer will be.
>>> Also, hope, solution with just single buffer can perform faster, as
>>> less copy operations will be executed :)
>>>
>>>>
>>>>> If you will need more help - please send your complete testcase
>>>>> code,
>>>>> so I'll be able to run it.
>>>> I'll test it out with my code. If the currentThread is the same
>>>> thread that
>>>> receives the next read from the relevant socket it will work.
>>> Hope so :)
>> I've now tested it and it works. I didn't copy your code straight
>> off, I rewrote it a bit (attaching)
>> I avoided the compact() in extractMessage(), doing it in releaseBuffer
>> () instead.
>> But it works with my test code. I've sent messages 20k and 25k large
>> and the parser extends the buffer
>> each time. In the 25k example it extends the buffer twice.
>>
>> Here's the code:
>>
>> package se.phlogiston.grizzly.overflow;
>>
>> import se.phlogiston.grizzly.tutorial2.SimpleMessage;
>>
>> import java.nio.ByteBuffer;
>>
>> import com.sun.grizzly.util.ByteBufferFactory;
>> import com.sun.grizzly.util.WorkerThread;
>>
>> /**
>> * Created by IntelliJ IDEA.
>> * User: erik
>> * Date: Mar 14, 2008
>> * Time: 10:05:24 PM
>> * To change this template use File | Settings | File Templates.
>> */
>> public class ProtocolParser implements
>> com.sun.grizzly.ProtocolParser<SimpleMessage> {
>>
>> private ByteBuffer saved;
>> private int position;
>> private int limit;
>> private SimpleMessage mess;
>> private boolean wants_more_data = false;
>> private boolean has_unparsed_data = false;
>>
>> public ProtocolParser() {
>> }
>>
>> public boolean isExpectingMoreData() {
>> return wants_more_data;
>> }
>>
>> public boolean hasMoreBytesToParse() {
>> return has_unparsed_data;
>> }
>>
>> public SimpleMessage getNextMessage() {
>> // the call pattern should be:
>> // hasNextMessage() == true
>> // getNextMessage()
>> // but if not:
>> if (null == mess) {
>> mess = extractMessage();
>> }
>> SimpleMessage tmp = mess;
>> mess = null;
>>
>> return tmp;
>> }
>>
>> public boolean hasNextMessage() {
>>
>> if (null == mess) {
>> mess = extractMessage();
>> }
>>
>> return ( mess != null);
>> }
>>
>> public void startBuffer(ByteBuffer bb) {
>> saved = bb;
>> saved.flip(); // flip the byte buffer for reading. (should have
>> been done already)
>> }
>>
>> public boolean releaseBuffer() {
>> if (wants_more_data) {
>> saved.compact();
>> // check to see if we made room in the buffer...
>> // compact sets the position at the end of the written/moved
>> bytes so remaining()
>> // shows how much more we can stuff into the buffer.
>> if (saved.remaining() == 0 ) {
>> reallocateSaved();
>> }
>> } else {
>> saved.clear();
>> }
>> // Grizzly doesn't seem to care (and doesn't when you look at
>> the code) but we return a factual value anyhoo
>> return wants_more_data;
>> }
>>
>>
>> private void reallocateSaved() {
>> saved.position(position);
>> ByteBuffer tmpBuffer =
>> ByteBufferFactory.allocateView(saved.capacity() * 2,
>> false);
>> tmpBuffer.put(saved);
>> saved = tmpBuffer;
>> ((WorkerThread) Thread.currentThread()).setByteBuffer(saved);
>> }
>>
>> private SimpleMessage extractMessage() {
>> SimpleMessage message = null;
>>
>> if (!saved.hasRemaining()) {
>> // there are no more readable bytes in the buffer
>> // this should mean that we've read all messages in the buffer
>> wants_more_data = false;
>> has_unparsed_data = false;
>> saved.clear(); // clean up our byte buffer. No one else will
>> do it for us
>> return message;
>> }
>> // save the position before we send it off to the parser
>> position = saved.position();
>> message = SimpleMessage.parse(saved);
>> if (message == null) {
>> // not enough bytes for a message
>> // but we know there are bytes so there must be
>> // an incomplete message there
>> if (saved.position() != position) {
>> saved.position(position);
>> }
>> wants_more_data = true;
>> has_unparsed_data = false;
>> } else {
>> // here we have a complete message
>> if (saved.limit() == saved.position()) {
>> // the end of the buffer is reached.
>> wants_more_data = false;
>> has_unparsed_data = false;
>> } else {
>> has_unparsed_data = true;
>> // here we can't accuratly set wants_more_data.
>> // since we don't know if the bytes in the buffer is a
>> // complete message or not
>> }
>> }
>> return message;
>> }
>> }
>>
>>
>> cheers
>>
>> /Erik
>>
>>
>>
>>
>>
>