dev@grizzly.java.net

Re: ProtocolParser Issue with Reallocation of ByteBuffer

From: Parker Lord <plord_at_seecontrol.com>
Date: Wed, 6 May 2009 16:23:23 -0700 (PDT)

The full set of files for my test case is included in the post.

The last file is the client to invloke the server. The first set of files
make up the server.
Let me know if you need me to sip them up and attach them.



Jeanfrancois Arcand-2 wrote:
>
> Salut,
>
> Parker Lord wrote:
>> I have been struggling with an issue for a few days now and am at a loss
>> as to what is wrong. I am trying to build a ProtocolParser to parse XML
>> messages which could be larger than 8K. I have read all the relevant
>> postings on the forum here and have tried two different approaches, both
>> of which exhibit the same behavior. I upgrade to 1.9.15 today as well,
>> and still no resolution. My issue is that everything works when I run in
>> the debugger and have a breakpoint set in the code. If I disable
>> breakpoints, I get this error:
>>
>> May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser
>> hasNextMessage
>> INFO: GrizzlyWorker-3 before expand: buffer java.nio.HeapByteBuffer[pos=0
>> lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
>> May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser
>> hasNextMessage
>> INFO: GrizzlyWorker-3 after expand: buffer
>> java.nio.HeapByteBuffer[pos=8192 lim=16384 cap=16384] isExpectingMoreData
>> true hasMoreBytesToParse true
>> May 6, 2009 2:39:29 PM com.sun.grizzly.DefaultProtocolChain
>> executeProtocolFilter
>> SEVERE: ProtocolChain exception
>> java.lang.IllegalStateException: ByteBuffer is full:
>> java.nio.HeapByteBuffer[pos=0 lim=0 cap=16384]
>> at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:143)
>> at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:103)
>> at
>> com.sun.grizzly.filter.ParserProtocolFilter.superExecute(ParserProtocolFilter.java:207)
>> at
>> com.sun.grizzly.filter.ParserProtocolFilter.execute(ParserProtocolFilter.java:131)
>> at
>> com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:136)
>> at
>> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:103)
>> at
>> com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:89)
>> at
>> com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:53)
>> at
>> com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:57)
>> at com.sun.grizzly.ContextTask.run(ContextTask.java:69)
>> at
>> com.sun.grizzly.util.FixedThreadPool$BasicWorker.dowork(FixedThreadPool.java:335)
>> at
>> com.sun.grizzly.util.FixedThreadPool$BasicWorker.run(FixedThreadPool.java:320)
>> at java.lang.Thread.run(Thread.java:595)
>>
>>
>> When I have a breakpoint set (or in a few cases when it works), the
>> output looks like this:
>>
>> May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser
>> hasNextMessage
>> INFO: GrizzlyWorker-2 before expand: buffer java.nio.HeapByteBuffer[pos=0
>> lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
>> May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser
>> hasNextMessage
>> INFO: GrizzlyWorker-2 after expand: buffer
>> java.nio.HeapByteBuffer[pos=8192 lim=16384 cap=16384] isExpectingMoreData
>> true hasMoreBytesToParse true
>> May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser
>> extractMessage
>> INFO: GrizzlyWorker-2 before extract: buffer
>> java.nio.HeapByteBuffer[pos=0 lim=10265 cap=16384] isExpectingMoreData
>> true hasMoreBytesToParse true
>> May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser
>> extractMessage
>> INFO: GrizzlyWorker-2 after extract: buffer
>> java.nio.HeapByteBuffer[pos=10265 lim=10265 cap=16384]
>> isExpectingMoreData true hasMoreBytesToParse true
>>
>> It seems that when the call to hasNextMessage() returns with a larger
>> buffer having been set in the currentThread, when it next retrieves it
>> from the thread and checks it, it seems to now have become full!!!!!
>> Some sort of race condition or thread switching issue is going on, but I
>> cant test since by debugging the code, it works.
>
> Can you share a test case?
>
>
> Another problem I
>> noticed and I am not sure if it is related. In the examples I have seen
>> here, when the buffer is reallocated, hasMoreBytesToParse is set to
>> false. If I set this to false, hasNextMessage does not get called back.
>> You can see this clearly in ParserProtocolFilter postExecute() method:
>>
>> public boolean postExecute(Context context) throws IOException {
>> ProtocolParser parser = (ProtocolParser)
>> context.getAttribute(ProtocolParser.PARSER);
>>
>> if (parser == null) {
>> return true;
>> }
>>
>> if (parser.hasMoreBytesToParse()) {
>> // Need to say that we read successfully since bytes are left
>>
>> context.setAttribute(ProtocolChain.PROTOCOL_CHAIN_POST_INSTRUCTION,
>> ProtocolChainInstruction.REINVOKE);
>> return true;
>> }
>>
>> and then in DefaultProtocolChain postExecuteProtocolFilter() :
>>
>> ProtocolChainInstruction postInstruction =
>> (ProtocolChainInstruction) ctx.removeAttribute(
>> PROTOCOL_CHAIN_POST_INSTRUCTION);
>>
>> if (postInstruction != null &&
>> postInstruction == ProtocolChainInstruction.REINVOKE) {
>> reinvokeChain = true;
>> } else if (continousExecution
>> && currentPosition == protocolFilters.size() -1
>> &&
>> (Boolean)ctx.removeAttribute(ProtocolFilter.SUCCESSFUL_READ)
>> == Boolean.TRUE) {
>> reinvokeChain = true;
>> }
>>
>> return reinvokeChain;
>>
>>
>> The only chance to get reinvoked is to have hasMoreBytesToParse be set
>> to true, not false. Any help would be appreciated. Here is all the
>> source code:
>
> Great I will take a look. How do you stress the server?
>
> Thanks!
>
> -- Jeanfrancois
>
>
>>
>> /* Main Class */
>> package atlantes.connector.test;
>>
>> public class SampleMain
>> {
>> public enum ConnectorProtocol { TCP, UDP };
>> public static int defaultPort = 50100;
>> public static ConnectorProtocol defaultProtocol =
>> ConnectorProtocol.TCP;
>>
>> public static void main (String[] args) throws Exception
>> {
>> ConnectorProtocol protocol;
>> int port;
>>
>> protocol = defaultProtocol;
>> port = defaultPort;
>>
>> new SampleListener ().service(protocol, port);
>> }
>> }
>>
>> /* Sample Listener */
>> package atlantes.connector.test;
>>
>> import java.io.IOException;
>> import java.util.concurrent.ExecutorService;
>>
>> import atlantes.connector.test.SampleMain.ConnectorProtocol;
>>
>> import com.sun.grizzly.*;
>> import com.sun.grizzly.util.PipelineThreadPool;
>>
>> public class SampleListener
>> {
>> public void service(ConnectorProtocol protocol, int port)
>> {
>> final Controller controller = new Controller();
>>
>> final SampleProtocolFilter parser = new SampleProtocolFilter();
>> final SampleMessageFilter filter = new SampleMessageFilter();
>>
>> final ProtocolChain protocolChain = new DefaultProtocolChain();
>> protocolChain.addFilter(parser);
>> protocolChain.addFilter(filter);
>> ((DefaultProtocolChain)protocolChain).setContinuousExecution(true);
>>
>> ProtocolChainInstanceHandler pciHandler = new
>> DefaultProtocolChainInstanceHandler()
>> {
>> public ProtocolChain poll()
>> {
>> return protocolChain;
>> }
>>
>> public boolean offer(ProtocolChain protocolChain)
>> {
>> return false;
>> }
>> };
>>
>> controller.setProtocolChainInstanceHandler(pciHandler);
>>
>> if (protocol.equals(ConnectorProtocol.TCP))
>> {
>> TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
>> tcpHandler.setPort(port);
>> controller.addSelectorHandler(tcpHandler);
>> }
>> else if (protocol.equals(ConnectorProtocol.UDP))
>> {
>> UDPSelectorHandler udpHandler = new UDPSelectorHandler();
>> udpHandler.setPort(port);
>> controller.addSelectorHandler(udpHandler);
>> }
>>
>> ExecutorService threadPool = new PipelineThreadPool();
>> controller.setThreadPool(threadPool);
>>
>> try
>> {
>> controller.start();
>> }
>> catch (IOException e)
>> {
>> e.printStackTrace();
>> }
>> }
>> }
>>
>> /* SampleProtocolFilter */
>>
>> package atlantes.connector.test;
>>
>>
>> import com.sun.grizzly.ProtocolParser;
>> import com.sun.grizzly.filter.ParserProtocolFilter;
>>
>> public class SampleProtocolFilter extends ParserProtocolFilter {
>>
>> @SuppressWarnings("unchecked")
>> @Override
>> public ProtocolParser newProtocolParser() {
>> //return new SolidicaProtocolParser();
>> return new SampleProtocolParser();
>>
>> }
>> }
>>
>> /* SampleProtocolParser */
>> package atlantes.connector.test;
>>
>> import com.sun.grizzly.ProtocolParser;
>> import com.sun.grizzly.util.ByteBufferFactory;
>> import com.sun.grizzly.util.WorkerThread;
>> import java.nio.ByteBuffer;
>> import java.nio.charset.CharacterCodingException;
>> import java.nio.charset.Charset;
>> import java.nio.charset.CharsetDecoder;
>> import java.util.logging.Logger;
>>
>> public class SampleProtocolParser implements ProtocolParser
>> {
>> protected CharsetDecoder f_asciiDecoder =
>> Charset.forName("ISO-8859-1").newDecoder();
>>
>> Logger syslog = Logger.getLogger(SampleProtocolParser.class.getName());
>>
>> private ByteBuffer byteBuffer;
>> private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact =
>> true;
>> private String completeMessage;
>> private boolean logIt = true;
>>
>> public SampleProtocolParser()
>> {};
>>
>> public boolean isExpectingMoreData()
>> {
>> return isExpectingMoreData;
>> }
>> public boolean hasMoreBytesToParse()
>> {
>> return hasMoreBytesToParse;
>> }
>>
>> public String getNextMessage()
>> {
>> String ip = completeMessage;
>> completeMessage = null;
>> return ip;
>> }
>>
>> public boolean hasNextMessage()
>> {
>> byteBuffer.mark();
>>
>> completeMessage = extractMessage(byteBuffer);
>>
>> if (completeMessage != null)
>> {
>> isExpectingMoreData = false;
>> // releaseBuffer() is only called with hasMoreBytesToParse == false
>> // which means compact() would never be called!
>> hasMoreBytesToParse = byteBuffer.hasRemaining();
>> if (hasMoreBytesToParse)
>> {
>> doCompact = true;
>> }
>> else
>> {
>> byteBuffer.clear();
>> doCompact = false;
>> }
>>
>> return true;
>> }
>> else
>> {
>> if (logIt)
>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>> before expand: buffer " + byteBuffer.toString() + " isExpectingMoreData "
>> + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>
>> ByteBuffer newBuffer =
>> ByteBufferFactory.allocateView(byteBuffer.capacity() * 2, false);
>> byteBuffer.reset();
>> newBuffer.put(byteBuffer);
>> ((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer =
>> newBuffer);
>>
>> // If hasMoreBytesToParse is false, we do not get re-invoked!!!
>> //hasMoreBytesToParse = false;
>> hasMoreBytesToParse = true;
>> isExpectingMoreData = true;
>> doCompact = false;
>>
>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>> after expand: buffer " + byteBuffer.toString() + " isExpectingMoreData "
>> + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>
>> }
>>
>> return !isExpectingMoreData;
>> } // hasNextMessage
>>
>> public void startBuffer(ByteBuffer bb)
>> {
>> byteBuffer = bb;
>> byteBuffer.flip();
>> } // startBuffer
>>
>> public boolean releaseBuffer()
>> {
>> if (doCompact)
>> {
>> byteBuffer.compact();
>> }
>> byteBuffer = null;
>>
>> return false;
>> } // releaseBuffer
>>
>> private String extractMessage(ByteBuffer buffer)
>> {
>> final String startOfMessage = "Once upon a time";
>> final String endOfMessage = "The End.";
>> // decode the buffer
>> // Make a copy and convert it to a string to perform our check to see
>> if there is a message present
>> String msg = null;
>> String returnMsg = null;
>> try
>> {
>> ByteBuffer tmp = buffer.duplicate();
>> msg = f_asciiDecoder.decode(tmp).toString();
>> }
>> catch (CharacterCodingException e)
>> {
>> e.printStackTrace();
>> }
>>
>> // Check to see if we have a message terminator
>> int startIndex = msg.indexOf(startOfMessage);
>> int endIndex = msg.indexOf(endOfMessage);
>> if (startIndex > -1 && endIndex > -1) // msg found
>> {
>> if (logIt)
>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>> before extract: buffer " + buffer.toString() + " isExpectingMoreData " +
>> isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>>
>> // Extract the bytes from the buffer
>> endIndex += endOfMessage.length();
>> byte[] bytes = new byte[endIndex - startIndex];
>> buffer.get(bytes, startIndex, endIndex);
>>
>> returnMsg = new String(bytes);
>>
>> //buffer.compact();
>>
>> if (logIt)
>> syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
>> after extract: buffer " + buffer.toString() + " isExpectingMoreData " +
>> isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>> }
>>
>> return returnMsg;
>> }
>> }
>>
>> /* SampleMessageFilter */
>> package atlantes.connector.test;
>>
>> import java.io.IOException;
>>
>> import com.sun.grizzly.Context;
>> import com.sun.grizzly.ProtocolFilter;
>> import com.sun.grizzly.ProtocolParser;
>>
>> public class SampleMessageFilter implements ProtocolFilter
>> {
>> public boolean execute(Context context)
>> {
>> String message = (String)
>> context.removeAttribute(ProtocolParser.MESSAGE);
>>
>> System.out.println(message);
>>
>> return false;
>> }
>>
>> public boolean postExecute(Context context) throws IOException
>> {
>> return true;
>> }
>> }
>>
>> /* ClientSampleMain */
>> package atlantes.connector.test;
>>
>> import java.io.*;
>> import java.net.*;
>>
>> public class ClientSampleMain
>> {
>>
>> public static void main(String[] args) throws IOException
>> {
>>
>> int listenPort = atlantes.connector.test.SampleMain.defaultPort;
>>
>> StringBuffer sb = new StringBuffer();
>>
>> // Create the message which is larger than 8K
>>
>> sb.append("Once upon a time,");
>>
>> for (int i = 0; i < 1024; ++i)
>> {
>> sb.append("0123456789");
>> }
>>
>> sb.append("The End.");
>>
>> // Open the file and parse each line
>> // Skip line starting with a # or empty lines
>> try
>> {
>> Socket socket = new Socket("localhost", listenPort);
>> if (socket == null)
>> {
>> System.out.println("Unable to open socket");
>> return;
>> }
>>
>> PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
>> out.print(sb.toString());
>>
>> out.close();
>> socket.close();
>> socket = null;
>> }
>> catch (IllegalArgumentException ex)
>> {
>> ex.printStackTrace();
>> }
>>
>> return;
>> }
>> }
>>
>>
>>
>> ------------------------------------------------------------------------
>> View this message in context: ProtocolParser Issue with Reallocation of
>> ByteBuffer
>> <http://www.nabble.com/ProtocolParser-Issue-with-Reallocation-of-ByteBuffer-tp23416360p23416360.html>
>> Sent from the Grizzly - Development mailing list archive
>> <http://www.nabble.com/Grizzly---Development-f23248.html> at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>
>
>

-- 
View this message in context: http://www.nabble.com/ProtocolParser-Issue-with-Reallocation-of-ByteBuffer-tp23416360p23417416.html
Sent from the Grizzly - Development mailing list archive at Nabble.com.