Hello, it's me again)
I want to thank everyone who helped me writing this class. Let this code be
an example of how packet based interaction can be implemented using grizzly:
import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.WorkerThread;
import java.nio.ByteBuffer;
import java.util.logging.Level;
/**
*
* @author Ash2k
*/
public class A1ProtocolParser implements ProtocolParser<IncomingPacket> {
static final int HEADERLENGTH = 18;
static final int STARTMARK = 0x12345678;
public static volatile int maximumPacketSize = 1024 * 1024;
private ByteBuffer byteBuffer;
private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact =
true;
private IncomingPacket finishedPacket;
private final ProgressCallback progressCallback;
public A1ProtocolParser(final ProgressCallback progressCallback) {
this.progressCallback = progressCallback;
} //A1ProtocolParser
public boolean isExpectingMoreData() {
return isExpectingMoreData;
} //isExpectingMoreData
public boolean hasMoreBytesToParse() {
return hasMoreBytesToParse;
} //hasMoreBytesToParse
public IncomingPacket getNextMessage() {
IncomingPacket ip = finishedPacket;
finishedPacket = null;
return ip;
} //getNextMessage
public boolean hasNextMessage() {
if (byteBuffer.remaining() < HEADERLENGTH) {
isExpectingMoreData = true;
hasMoreBytesToParse = byteBuffer.hasRemaining();
return false;
}
byteBuffer.mark();
int startmark = byteBuffer.getInt();
if (startmark != STARTMARK) {
progressCallback.log(Level.SEVERE, "Bad STARTMARK!");
//TODO: close connection
return false;
}
ByteBuffer incomingPacketBuffer;
IncomingPacket incomingPacket;
incomingPacket = new IncomingPacket(byteBuffer.getShort());
incomingPacket.attachment = byteBuffer.getLong();
int len = byteBuffer.getInt();
if (len == 0) {
isExpectingMoreData = false;
hasMoreBytesToParse = byteBuffer.hasRemaining();
finishedPacket = incomingPacket;
return true;
} else if (len + HEADERLENGTH > maximumPacketSize) {
//TODO: close connection
progressCallback.log(Level.SEVERE, String.format(
"Received packet size (%d) is greater than maximum allowed packet size
(%d)",
len + HEADERLENGTH,
maximumPacketSize));
return false;
} else if (len < 0) {
//TODO: close connection
progressCallback.log(
Level.SEVERE,
String.format("Received packet size is %d", len));
return false;
}
if (byteBuffer.remaining() >= len) {
//we have enough data in buffer to fill this message
incomingPacket.data = new byte[len];
incomingPacketBuffer = ByteBuffer.wrap(incomingPacket.data);
int limit = byteBuffer.limit();
byteBuffer.limit(byteBuffer.position() + len);
incomingPacketBuffer.put(byteBuffer);
byteBuffer.limit(limit);
isExpectingMoreData = false;
finishedPacket = incomingPacket;
hasMoreBytesToParse = byteBuffer.hasRemaining();
if (hasMoreBytesToParse) {
doCompact = true;
} else {
byteBuffer.clear();
doCompact = false;
}
} else if (byteBuffer.capacity() - byteBuffer.position() < len) {
//we dont have enough room for data - lets allocate a bigger buffer
ByteBuffer newBuffer = ByteBufferFactory.allocateView(HEADERLENGTH +
len, false);
byteBuffer.reset();
newBuffer.put(byteBuffer);
((WorkerThread) Thread.currentThread()).setByteBuffer(byteBuffer =
newBuffer);
hasMoreBytesToParse = false;
isExpectingMoreData = true;
doCompact = false;
} else {
//we just hadn't received all the data yet
byteBuffer.position(byteBuffer.limit());
byteBuffer.limit(byteBuffer.capacity());
doCompact = false;
isExpectingMoreData = true;
hasMoreBytesToParse = false;
}
return !isExpectingMoreData;
} //hasNextMessage
public void startBuffer(ByteBuffer bb) {
byteBuffer = bb;
byteBuffer.flip();
} //startBuffer
public boolean releaseBuffer() {
if (doCompact) {
byteBuffer.compact();
}
byteBuffer = null;
return false;
} //releaseBuffer
}
Erik Svensson-3 wrote:
>
> Howdy!
>
>> Hello Erik,
>>>>
>>
>>>> As workaround, I can propose you to not keep retain_buffer in your
>>>> ProtocolParser.
>>>> This change will make your ProtocolParser stateless, and not
>>>> depend on
>>>> fact, that same instance will be called for next request on the same
>>>> channel.
>>>>
>>>> The idea is to use just single buffer (saved). Please take a look at
>>>> code bellow. I didn't test it, as don't have your testcase, but you
>>>> can see the idea there.
>>>
>>> In other words you move the state into the byte buffer associated
>>> with the
>>> the current thread.
>> Correct. We can not guarantee that ProtocolFilter will be the same,
>> but ByteBuffer will be.
>> Also, hope, solution with just single buffer can perform faster, as
>> less copy operations will be executed :)
>>
>>>
>>>
>>>> If you will need more help - please send your complete testcase
>>>> code,
>>>> so I'll be able to run it.
>>>
>>> I'll test it out with my code. If the currentThread is the same
>>> thread that
>>> receives the next read from the relevant socket it will work.
>> Hope so :)
>
> I've now tested it and it works. I didn't copy your code straight
> off, I rewrote it a bit (attaching)
> I avoided the compact() in extractMessage(), doing it in releaseBuffer
> () instead.
> But it works with my test code. I've sent messages 20k and 25k large
> and the parser extends the buffer
> each time. In the 25k example it extends the buffer twice.
>
> Here's the code:
>
> package se.phlogiston.grizzly.overflow;
>
> import se.phlogiston.grizzly.tutorial2.SimpleMessage;
>
> import java.nio.ByteBuffer;
>
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
>
> /**
> * Created by IntelliJ IDEA.
> * User: erik
> * Date: Mar 14, 2008
> * Time: 10:05:24 PM
> * To change this template use File | Settings | File Templates.
> */
> public class ProtocolParser implements
> com.sun.grizzly.ProtocolParser<SimpleMessage> {
>
> private ByteBuffer saved;
> private int position;
> private int limit;
> private SimpleMessage mess;
> private boolean wants_more_data = false;
> private boolean has_unparsed_data = false;
>
> public ProtocolParser() {
> }
>
> public boolean isExpectingMoreData() {
> return wants_more_data;
> }
>
> public boolean hasMoreBytesToParse() {
> return has_unparsed_data;
> }
>
> public SimpleMessage getNextMessage() {
> // the call pattern should be:
> // hasNextMessage() == true
> // getNextMessage()
> // but if not:
> if (null == mess) {
> mess = extractMessage();
> }
> SimpleMessage tmp = mess;
> mess = null;
>
> return tmp;
> }
>
> public boolean hasNextMessage() {
>
> if (null == mess) {
> mess = extractMessage();
> }
>
> return ( mess != null);
> }
>
> public void startBuffer(ByteBuffer bb) {
> saved = bb;
> saved.flip(); // flip the byte buffer for reading. (should have
> been done already)
> }
>
> public boolean releaseBuffer() {
> if (wants_more_data) {
> saved.compact();
> // check to see if we made room in the buffer...
> // compact sets the position at the end of the written/moved
> bytes so remaining()
> // shows how much more we can stuff into the buffer.
> if (saved.remaining() == 0 ) {
> reallocateSaved();
> }
> } else {
> saved.clear();
> }
> // Grizzly doesn't seem to care (and doesn't when you look at
> the code) but we return a factual value anyhoo
> return wants_more_data;
> }
>
>
> private void reallocateSaved() {
> saved.position(position);
> ByteBuffer tmpBuffer =
> ByteBufferFactory.allocateView(saved.capacity() * 2,
> false);
> tmpBuffer.put(saved);
> saved = tmpBuffer;
> ((WorkerThread) Thread.currentThread()).setByteBuffer(saved);
> }
>
> private SimpleMessage extractMessage() {
> SimpleMessage message = null;
>
> if (!saved.hasRemaining()) {
> // there are no more readable bytes in the buffer
> // this should mean that we've read all messages in the buffer
> wants_more_data = false;
> has_unparsed_data = false;
> saved.clear(); // clean up our byte buffer. No one else will
> do it for us
> return message;
> }
> // save the position before we send it off to the parser
> position = saved.position();
> message = SimpleMessage.parse(saved);
> if (message == null) {
> // not enough bytes for a message
> // but we know there are bytes so there must be
> // an incomplete message there
> if (saved.position() != position) {
> saved.position(position);
> }
> wants_more_data = true;
> has_unparsed_data = false;
> } else {
> // here we have a complete message
> if (saved.limit() == saved.position()) {
> // the end of the buffer is reached.
> wants_more_data = false;
> has_unparsed_data = false;
> } else {
> has_unparsed_data = true;
> // here we can't accuratly set wants_more_data.
> // since we don't know if the bytes in the buffer is a
> // complete message or not
> }
> }
> return message;
> }
> }
>
>
> cheers
>
> /Erik
>
>
>
>
>
--
View this message in context: http://www.nabble.com/Need-help-implementing-ProtocolParser-tp16091789p16847242.html
Sent from the Grizzly - Users mailing list archive at Nabble.com.