dev@grizzly.java.net

ProtocolParser Issue with Reallocation of ByteBuffer

From: Parker Lord <plord_at_seecontrol.com>
Date: Wed, 6 May 2009 14:57:50 -0700 (PDT)

I have been struggling with an issue for a few days now and am at a loss as
to what is wrong.
I am trying to build a ProtocolParser to parse XML messages which could be
larger than 8K. I have read all the relevant postings on the forum here and
have tried two different approaches, both of which exhibit the same
behavior.
I upgrade to 1.9.15 today as well, and still no resolution.

My issue is that everything works when I run in the debugger and have a
breakpoint set in the code.
If I disable breakpoints, I get this error:



May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser
hasNextMessage
INFO: GrizzlyWorker-3 before expand: buffer java.nio.HeapByteBuffer[pos=0
lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
May 6, 2009 2:39:29 PM atlantes.connector.test.SampleProtocolParser
hasNextMessage
INFO: GrizzlyWorker-3 after expand: buffer java.nio.HeapByteBuffer[pos=8192
lim=16384 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
May 6, 2009 2:39:29 PM com.sun.grizzly.DefaultProtocolChain
executeProtocolFilter
SEVERE: ProtocolChain exception
java.lang.IllegalStateException: ByteBuffer is full:
java.nio.HeapByteBuffer[pos=0 lim=0 cap=16384]
        at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:143)
        at com.sun.grizzly.filter.ReadFilter.execute(ReadFilter.java:103)
        at
com.sun.grizzly.filter.ParserProtocolFilter.superExecute(ParserProtocolFilter.java:207)
        at
com.sun.grizzly.filter.ParserProtocolFilter.execute(ParserProtocolFilter.java:131)
        at
com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolChain.java:136)
        at
com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:103)
        at
com.sun.grizzly.DefaultProtocolChain.execute(DefaultProtocolChain.java:89)
        at
com.sun.grizzly.ProtocolChainContextTask.doCall(ProtocolChainContextTask.java:53)
        at
com.sun.grizzly.SelectionKeyContextTask.call(SelectionKeyContextTask.java:57)
        at com.sun.grizzly.ContextTask.run(ContextTask.java:69)
        at
com.sun.grizzly.util.FixedThreadPool$BasicWorker.dowork(FixedThreadPool.java:335)
        at
com.sun.grizzly.util.FixedThreadPool$BasicWorker.run(FixedThreadPool.java:320)
        at java.lang.Thread.run(Thread.java:595)



When I have a breakpoint set (or in a few cases when it works), the output
looks like this:


May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser
hasNextMessage
INFO: GrizzlyWorker-2 before expand: buffer java.nio.HeapByteBuffer[pos=0
lim=8192 cap=8192] isExpectingMoreData false hasMoreBytesToParse false
May 6, 2009 2:38:41 PM atlantes.connector.test.SampleProtocolParser
hasNextMessage
INFO: GrizzlyWorker-2 after expand: buffer java.nio.HeapByteBuffer[pos=8192
lim=16384 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser
extractMessage
INFO: GrizzlyWorker-2 before extract: buffer java.nio.HeapByteBuffer[pos=0
lim=10265 cap=16384] isExpectingMoreData true hasMoreBytesToParse true
May 6, 2009 2:38:42 PM atlantes.connector.test.SampleProtocolParser
extractMessage
INFO: GrizzlyWorker-2 after extract: buffer
java.nio.HeapByteBuffer[pos=10265 lim=10265 cap=16384] isExpectingMoreData
true hasMoreBytesToParse true


It seems that when the call to hasNextMessage() returns with a larger buffer
having been set in the currentThread, when it next retrieves it from the
thread and checks it, it seems to now have become full!!!!!

Some sort of race condition or thread switching issue is going on, but I
cant test since by debugging the code, it works.

Another problem I noticed and I am not sure if it is related. In the
examples I have seen here, when the buffer is reallocated,
hasMoreBytesToParse is set to false. If I set this to false, hasNextMessage
does not get called back. You can see this clearly in ParserProtocolFilter
postExecute() method:


   public boolean postExecute(Context context) throws IOException {
        ProtocolParser parser = (ProtocolParser)
context.getAttribute(ProtocolParser.PARSER);

        if (parser == null) {
            return true;
        }

        if (parser.hasMoreBytesToParse()) {
            // Need to say that we read successfully since bytes are left
           
context.setAttribute(ProtocolChain.PROTOCOL_CHAIN_POST_INSTRUCTION,
                    ProtocolChainInstruction.REINVOKE);
            return true;
        }


and then in DefaultProtocolChain postExecuteProtocolFilter() :

        ProtocolChainInstruction postInstruction =
                (ProtocolChainInstruction) ctx.removeAttribute(
                PROTOCOL_CHAIN_POST_INSTRUCTION);
        
        if (postInstruction != null &&
                postInstruction == ProtocolChainInstruction.REINVOKE) {
            reinvokeChain = true;
        } else if (continousExecution
            && currentPosition == protocolFilters.size() -1
            && (Boolean)ctx.removeAttribute(ProtocolFilter.SUCCESSFUL_READ)
                == Boolean.TRUE) {
            reinvokeChain = true;
        }

        return reinvokeChain;



The only chance to get reinvoked is to have hasMoreBytesToParse be set to
true, not false.

Any help would be appreciated.
Here is all the source code:



/* Main Class */
package atlantes.connector.test;

public class SampleMain
{
  public enum ConnectorProtocol { TCP, UDP };
  public static int defaultPort = 50100;
  public static ConnectorProtocol defaultProtocol = ConnectorProtocol.TCP;
  
  public static void main (String[] args) throws Exception
  {
    ConnectorProtocol protocol;
    int port;
    
    protocol = defaultProtocol;
    port = defaultPort;

    new SampleListener ().service(protocol, port);
  }
}

/* Sample Listener */
package atlantes.connector.test;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

import atlantes.connector.test.SampleMain.ConnectorProtocol;

import com.sun.grizzly.*;
import com.sun.grizzly.util.PipelineThreadPool;

public class SampleListener
{
  public void service(ConnectorProtocol protocol, int port)
  {
    final Controller controller = new Controller();
    
    final SampleProtocolFilter parser = new SampleProtocolFilter();
    final SampleMessageFilter filter = new SampleMessageFilter();

    final ProtocolChain protocolChain = new DefaultProtocolChain();
    protocolChain.addFilter(parser);
    protocolChain.addFilter(filter);
    ((DefaultProtocolChain)protocolChain).setContinuousExecution(true);

    ProtocolChainInstanceHandler pciHandler = new
DefaultProtocolChainInstanceHandler()
    {
      public ProtocolChain poll()
      {
        return protocolChain;
      }

      public boolean offer(ProtocolChain protocolChain)
      {
        return false;
      }
    };
    
    controller.setProtocolChainInstanceHandler(pciHandler);

    if (protocol.equals(ConnectorProtocol.TCP))
    {
      TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
      tcpHandler.setPort(port);
      controller.addSelectorHandler(tcpHandler);
    }
    else if (protocol.equals(ConnectorProtocol.UDP))
    {
      UDPSelectorHandler udpHandler = new UDPSelectorHandler();
      udpHandler.setPort(port);
      controller.addSelectorHandler(udpHandler);
    }

    ExecutorService threadPool = new PipelineThreadPool();
    controller.setThreadPool(threadPool);
    
    try
    {
      controller.start();
    }
    catch (IOException e)
    {
      e.printStackTrace();
    }
  }
}

/* SampleProtocolFilter */

package atlantes.connector.test;


import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.filter.ParserProtocolFilter;

public class SampleProtocolFilter extends ParserProtocolFilter {

        @SuppressWarnings("unchecked")
  @Override
        public ProtocolParser newProtocolParser() {
                //return new SolidicaProtocolParser();
    return new SampleProtocolParser();

        }
}

/* SampleProtocolParser */
package atlantes.connector.test;

import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.WorkerThread;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.logging.Logger;

public class SampleProtocolParser implements ProtocolParser
{
  protected CharsetDecoder f_asciiDecoder =
Charset.forName("ISO-8859-1").newDecoder();

  Logger syslog = Logger.getLogger(SampleProtocolParser.class.getName());

  private ByteBuffer byteBuffer;
  private boolean isExpectingMoreData, hasMoreBytesToParse, doCompact =
true;
  private String completeMessage;
  private boolean logIt = true;

  public SampleProtocolParser()
  {};

  public boolean isExpectingMoreData()
  {
    return isExpectingMoreData;
  }
  public boolean hasMoreBytesToParse()
  {
    return hasMoreBytesToParse;
  }

  public String getNextMessage()
  {
    String ip = completeMessage;
    completeMessage = null;
    return ip;
  }

  public boolean hasNextMessage()
  {
    byteBuffer.mark();

    completeMessage = extractMessage(byteBuffer);

    if (completeMessage != null)
    {
      isExpectingMoreData = false;
      // releaseBuffer() is only called with hasMoreBytesToParse == false
      // which means compact() would never be called!
      hasMoreBytesToParse = byteBuffer.hasRemaining();
      if (hasMoreBytesToParse)
      {
        doCompact = true;
      }
      else
      {
        byteBuffer.clear();
        doCompact = false;
      }

      return true;
    }
    else
    {
      if (logIt)
        syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
before expand: buffer " + byteBuffer.toString() + " isExpectingMoreData " +
isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);

      ByteBuffer newBuffer =
ByteBufferFactory.allocateView(byteBuffer.capacity() * 2, false);
      byteBuffer.reset();
      newBuffer.put(byteBuffer);
     ((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer =
newBuffer);

      // If hasMoreBytesToParse is false, we do not get re-invoked!!!
      //hasMoreBytesToParse = false;
      hasMoreBytesToParse = true;
      isExpectingMoreData = true;
      doCompact = false;
      
      syslog.info(((WorkerThread)Thread.currentThread()).getName() + " after
expand: buffer " + byteBuffer.toString() + " isExpectingMoreData " +
isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);

    }

    return !isExpectingMoreData;
  } // hasNextMessage

  public void startBuffer(ByteBuffer bb)
  {
    byteBuffer = bb;
    byteBuffer.flip();
  } // startBuffer

  public boolean releaseBuffer()
  {
    if (doCompact)
    {
      byteBuffer.compact();
    }
    byteBuffer = null;

    return false;
  } // releaseBuffer

  private String extractMessage(ByteBuffer buffer)
  {
    final String startOfMessage = "Once upon a time";
    final String endOfMessage = "The End.";
    // decode the buffer
    // Make a copy and convert it to a string to perform our check to see if
there is a message present
    String msg = null;
    String returnMsg = null;
    try
    {
      ByteBuffer tmp = buffer.duplicate();
      msg = f_asciiDecoder.decode(tmp).toString();
    }
    catch (CharacterCodingException e)
    {
      e.printStackTrace();
    }

    // Check to see if we have a message terminator
    int startIndex = msg.indexOf(startOfMessage);
    int endIndex = msg.indexOf(endOfMessage);
    if (startIndex > -1 && endIndex > -1) // msg found
    {
      if (logIt)
        syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
before extract: buffer " + buffer.toString() + " isExpectingMoreData " +
isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);

      // Extract the bytes from the buffer
      endIndex += endOfMessage.length();
      byte[] bytes = new byte[endIndex - startIndex];
      buffer.get(bytes, startIndex, endIndex);

      returnMsg = new String(bytes);
      
      //buffer.compact();
      
      if (logIt)
        syslog.info(((WorkerThread)Thread.currentThread()).getName() + "
after extract: buffer " + buffer.toString() + " isExpectingMoreData " +
isExpectingMoreData + " hasMoreBytesToParse " + hasMoreBytesToParse);
    }

    return returnMsg;
  }
}

/* SampleMessageFilter */
package atlantes.connector.test;

import java.io.IOException;

import com.sun.grizzly.Context;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.ProtocolParser;

public class SampleMessageFilter implements ProtocolFilter
{
  public boolean execute(Context context)
  {
    String message = (String)
context.removeAttribute(ProtocolParser.MESSAGE);

    System.out.println(message);

    return false;
  }

  public boolean postExecute(Context context) throws IOException
  {
    return true;
  }
}

/* ClientSampleMain */
package atlantes.connector.test;

import java.io.*;
import java.net.*;

public class ClientSampleMain
{

  public static void main(String[] args) throws IOException
  {

    int listenPort = atlantes.connector.test.SampleMain.defaultPort;

    StringBuffer sb = new StringBuffer();

    // Create the message which is larger than 8K

    sb.append("Once upon a time,");

    for (int i = 0; i < 1024; ++i)
    {
      sb.append("0123456789");
    }

    sb.append("The End.");

    // Open the file and parse each line
    // Skip line starting with a # or empty lines
    try
    {
      Socket socket = new Socket("localhost", listenPort);
      if (socket == null)
      {
        System.out.println("Unable to open socket");
        return;
      }

      PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
      out.print(sb.toString());

      out.close();
      socket.close();
      socket = null;
    }
    catch (IllegalArgumentException ex)
    {
      ex.printStackTrace();
    }

    return;
  }
}







-- 
View this message in context: http://www.nabble.com/ProtocolParser-Issue-with-Reallocation-of-ByteBuffer-tp23416360p23416360.html
Sent from the Grizzly - Development mailing list archive at Nabble.com.