Salut,
Parker Lord wrote:
> I have been struggling with an issue for a few days now and am at a loss
> as to what is wrong. I am trying to build a ProtocolParser to parse XML
> messages which could be larger than 8K. I have read all the relevant
> postings on the forum here and have tried two different approaches, both
> of which exhibit the same behavior. I upgrade to 1.9.15 today as well,
> and still no resolution. My issue is that everything works when I run in
> the debugger and have a breakpoint set in the code. If I disable
> breakpoints, I get this error:
>
> May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser hasNextMessage
> INFO: GrizzlyWorker-3 before expand: buffer java.nio.HeapByteBuffer[pos=0 lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
> May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser hasNextMessage
> INFO: GrizzlyWorker-3 after expand: buffer java.nio.HeapByteBuffer[pos=8192 lim=16384 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
> May 6, 2009 2:39:29 PM com.sun.grizzly.DefaultProtocolChain executeProtocolFilter
> SEVERE: ProtocolChain exception
> java.lang.IllegalStateException: ByteBuffer is full: java.nio.HeapByteBuffer[pos=0 lim=0 cap=16384]
> at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:143)
> at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:103)
> at com.sun.grizzly.filter.ParserProtocolFilter.superExecute(ParserProtocolFilter.java:207)
> at com.sun.grizzly.filter.ParserProtocolFilter.execute(ParserProtocolFilter.java:131)
> at com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:136)
> at com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:103)
> at com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:89)
> at com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:53)
> at com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:57)
> at com.sun.grizzly.ContextTask.run(ContextTask.java:69)
> at com.sun.grizzly.util.FixedThreadPool$BasicWorker.dowork(FixedThreadPool.java:335)
> at com.sun.grizzly.util.FixedThreadPool$BasicWorker.run(FixedThreadPool.java:320)
> at java.lang.Thread.run(Thread.java:595)
>
>
> When I have a breakpoint set (or in a few cases when it works), the
> output looks like this:
>
> May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser hasNextMessage
> INFO: GrizzlyWorker-2 before expand: buffer java.nio.HeapByteBuffer[pos=0 lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
> May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser hasNextMessage
> INFO: GrizzlyWorker-2 after expand: buffer java.nio.HeapByteBuffer[pos=8192 lim=16384 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
> May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser extractMessage
> INFO: GrizzlyWorker-2 before extract: buffer java.nio.HeapByteBuffer[pos=0 lim=10265 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
> May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser extractMessage
> INFO: GrizzlyWorker-2 after extract: buffer java.nio.HeapByteBuffer[pos=10265 lim=10265 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
>
> It seems that when the call to hasNextMessage() returns with a larger
> buffer having been set in the currentThread, when it next retrieves it
> from the thread and checks it, it seems to now have become full!!!!!
> Some sort of race condition or thread switching issue is going on, but I
> cant test since by debugging the code, it works.
Can you share a test case?
Another problem I
> noticed and I am not sure if it is related. In the examples I have seen
> here, when the buffer is reallocated, hasMoreBytesToParse is set to
> false. If I set this to false, hasNextMessage does not get called back.
> You can see this clearly in ParserProtocolFilter postExecute() method:
>
> public boolean postExecute(Context context) throws IOException {
> ProtocolParser parser = (ProtocolParser) context.getAttribute(ProtocolParser.PARSER);
>
> if (parser == null) {
> return true;
> }
>
> if (parser.hasMoreBytesToParse()) {
> // Need to say that we read successfully since bytes are left
> context.setAttribute(ProtocolChain.PROTOCOL_CHAIN_POST_INSTRUCTION,
> ProtocolChainInstruction.REINVOKE);
> return true;
> }
>
> and then in DefaultProtocolChain postExecuteProtocolFilter() :
>
> ProtocolChainInstruction postInstruction =
> (ProtocolChainInstruction) ctx.removeAttribute(
> PROTOCOL_CHAIN_POST_INSTRUCTION);
>
> if (postInstruction != null &&
> postInstruction == ProtocolChainInstruction.REINVOKE) {
> reinvokeChain = true;
> } else if (continousExecution
> && currentPosition == protocolFilters.size() -1
> && (Boolean)ctx.removeAttribute(ProtocolFilter.SUCCESSFUL_READ)
> == Boolean.TRUE) {
> reinvokeChain = true;
> }
>
> return reinvokeChain;
>
>
> The only chance to get reinvoked is to have hasMoreBytesToParse be set
> to true, not false. Any help would be appreciated. Here is all the
> source code:
Great I will take a look. How do you stress the server?
Thanks!
-- Jeanfrancois
>
> /* Main Class */
> package atlantes.connector.test;
>
> public class SampleMain
> {
> public enum ConnectorProtocol { TCP, UDP };
> public static int defaultPort = 50100;
> public static ConnectorProtocol defaultProtocol = ConnectorProtocol.TCP;
>
> public static void main (String[] args) throws Exception
> {
> ConnectorProtocol protocol;
> int port;
>
> protocol = defaultProtocol;
> port = defaultPort;
>
> new SampleListener ().service(protocol, port);
> }
> }
>
> /* Sample Listener */
> package atlantes.connector.test;
>
> import java.io.IOException;
> import java.util.concurrent.ExecutorService;
>
> import atlantes.connector.test.SampleMain.ConnectorProtocol;
>
> import com.sun.grizzly.*;
> import com.sun.grizzly.util.PipelineThreadPool;
>
> public class SampleListener
> {
> public void service(ConnectorProtocol protocol, int port)
> {
> final Controller controller = new Controller();
>
> final SampleProtocolFilter parser = new SampleProtocolFilter();
> final SampleMessageFilter filter = new SampleMessageFilter();
>
> final ProtocolChain protocolChain = new DefaultProtocolChain();
> protocolChain.addFilter(parser);
> protocolChain.addFilter(filter);
> ((DefaultProtocolChain)protocolChain).setContinuousExecution(true);
>
> ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler()
> {
> public ProtocolChain poll()
> {
> return protocolChain;
> }
>
> public boolean offer(ProtocolChain protocolChain)
> {
> return false;
> }
> };
>
> controller.setProtocolChainInstanceHandler(pciHandler);
>
> if (protocol.equals(ConnectorProtocol.TCP))
> {
> TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
> tcpHandler.setPort(port);
> controller.addSelectorHandler(tcpHandler);
> }
> else if (protocol.equals(ConnectorProtocol.UDP))
> {
> UDPSelectorHandler udpHandler = new UDPSelectorHandler();
> udpHandler.setPort(port);
> controller.addSelectorHandler(udpHandler);
> }
>
> ExecutorService threadPool = new PipelineThreadPool();
> controller.setThreadPool(threadPool);
>
> try
> {
> controller.start();
> }
> catch (IOException e)
> {
> e.printStackTrace();
> }
> }
> }
>
> /* SampleProtocolFilter */
>
> package atlantes.connector.test;
>
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.filter.ParserProtocolFilter;
>
> public class SampleProtocolFilter extends ParserProtocolFilter {
>
> @SuppressWarnings("unchecked")
> @Override
> public ProtocolParser newProtocolParser() {
> //return new SolidicaProtocolParser();
> return new SampleProtocolParser();
>
> }
> }
>
> /* SampleProtocolParser */
> package atlantes.connector.test;
>
> import com.sun.grizzly.ProtocolParser;
> import com.sun.grizzly.util.ByteBufferFactory;
> import com.sun.grizzly.util.WorkerThread;
> import java.nio.ByteBuffer;
> import java.nio.charset.CharacterCodingException;
> import java.nio.charset.Charset;
> import java.nio.charset.CharsetDecoder;
> import java.util.logging.Logger;
>
> public class SampleProtocolParser implements ProtocolParser
> {
> protected CharsetDecoder f_asciiDecoder = Charset.forName("ISO-8859-1").newDecoder();
>
> Logger syslog = Logger.getLogger(SampleProtocolParser.class.getName());
>
> private ByteBuffer byteBuffer;
> private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact = true;
> private String completeMessage;
> private boolean logIt = true;
>
> public SampleProtocolParser()
> {};
>
> public boolean isExpectingMoreData()
> {
> return isExpectingMoreData;
> }
> public boolean hasMoreBytesToParse()
> {
> return hasMoreBytesToParse;
> }
>
> public String getNextMessage()
> {
> String ip = completeMessage;
> completeMessage = null;
> return ip;
> }
>
> public boolean hasNextMessage()
> {
> byteBuffer.mark();
>
> completeMessage = extractMessage(byteBuffer);
>
> if (completeMessage != null)
> {
> isExpectingMoreData = false;
> // releaseBuffer() is only called with hasMoreBytesToParse == false
> // which means compact() would never be called!
> hasMoreBytesToParse = byteBuffer.hasRemaining();
> if (hasMoreBytesToParse)
> {
> doCompact = true;
> }
> else
> {
> byteBuffer.clear();
> doCompact = false;
> }
>
> return true;
> }
> else
> {
> if (logIt)
> syslog.info(((WorkerThread)Thread.currentThread()).getName() + " before expand: buffer " + byteBuffer.toString() + " isExpectingMoreData " + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>
> ByteBuffer newBuffer = ByteBufferFactory.allocateView(byteBuffer.capacity() * 2, false);
> byteBuffer.reset();
> newBuffer.put(byteBuffer);
> ((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer = newBuffer);
>
> // If hasMoreBytesToParse is false, we do not get re-invoked!!!
> //hasMoreBytesToParse = false;
> hasMoreBytesToParse = true;
> isExpectingMoreData = true;
> doCompact = false;
>
> syslog.info(((WorkerThread)Thread.currentThread()).getName() + " after expand: buffer " + byteBuffer.toString() + " isExpectingMoreData " + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>
> }
>
> return !isExpectingMoreData;
> } // hasNextMessage
>
> public void startBuffer(ByteBuffer bb)
> {
> byteBuffer = bb;
> byteBuffer.flip();
> } // startBuffer
>
> public boolean releaseBuffer()
> {
> if (doCompact)
> {
> byteBuffer.compact();
> }
> byteBuffer = null;
>
> return false;
> } // releaseBuffer
>
> private String extractMessage(ByteBuffer buffer)
> {
> final String startOfMessage = "Once upon a time";
> final String endOfMessage = "The End.";
> // decode the buffer
> // Make a copy and convert it to a string to perform our check to see if there is a message present
> String msg = null;
> String returnMsg = null;
> try
> {
> ByteBuffer tmp = buffer.duplicate();
> msg = f_asciiDecoder.decode(tmp).toString();
> }
> catch (CharacterCodingException e)
> {
> e.printStackTrace();
> }
>
> // Check to see if we have a message terminator
> int startIndex = msg.indexOf(startOfMessage);
> int endIndex = msg.indexOf(endOfMessage);
> if (startIndex > -1 && endIndex > -1) // msg found
> {
> if (logIt)
> syslog.info(((WorkerThread)Thread.currentThread()).getName() + " before extract: buffer " + buffer.toString() + " isExpectingMoreData " + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
>
> // Extract the bytes from the buffer
> endIndex += endOfMessage.length();
> byte[] bytes = new byte[endIndex - startIndex];
> buffer.get(bytes, startIndex, endIndex);
>
> returnMsg = new String(bytes);
>
> //buffer.compact();
>
> if (logIt)
> syslog.info(((WorkerThread)Thread.currentThread()).getName() + " after extract: buffer " + buffer.toString() + " isExpectingMoreData " + isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
> }
>
> return returnMsg;
> }
> }
>
> /* SampleMessageFilter */
> package atlantes.connector.test;
>
> import java.io.IOException;
>
> import com.sun.grizzly.Context;
> import com.sun.grizzly.ProtocolFilter;
> import com.sun.grizzly.ProtocolParser;
>
> public class SampleMessageFilter implements ProtocolFilter
> {
> public boolean execute(Context context)
> {
> String message = (String) context.removeAttribute(ProtocolParser.MESSAGE);
>
> System.out.println(message);
>
> return false;
> }
>
> public boolean postExecute(Context context) throws IOException
> {
> return true;
> }
> }
>
> /* ClientSampleMain */
> package atlantes.connector.test;
>
> import java.io.*;
> import java.net.*;
>
> public class ClientSampleMain
> {
>
> public static void main(String[] args) throws IOException
> {
>
> int listenPort = atlantes.connector.test.SampleMain.defaultPort;
>
> StringBuffer sb = new StringBuffer();
>
> // Create the message which is larger than 8K
>
> sb.append("Once upon a time,");
>
> for (int i = 0; i < 1024; ++i)
> {
> sb.append("0123456789");
> }
>
> sb.append("The End.");
>
> // Open the file and parse each line
> // Skip line starting with a # or empty lines
> try
> {
> Socket socket = new Socket("localhost", listenPort);
> if (socket == null)
> {
> System.out.println("Unable to open socket");
> return;
> }
>
> PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
> out.print(sb.toString());
>
> out.close();
> socket.close();
> socket = null;
> }
> catch (IllegalArgumentException ex)
> {
> ex.printStackTrace();
> }
>
> return;
> }
> }
>
>
>
> ------------------------------------------------------------------------
> View this message in context: ProtocolParser Issue with Reallocation of
> ByteBuffer
> <http://www.nabble.com/ProtocolParser-Issue-with-Reallocation-of-ByteBuffer-tp23416360p23416360.html>
> Sent from the Grizzly - Development mailing list archive
> <http://www.nabble.com/Grizzly---Development-f23248.html> at Nabble.com.