dev@grizzly.java.net

Re: ByteBufferRead & Writer [was (Re: [Issue 84] Need leak-free bytebuffer management in Grizzly)]

From: Ken Cavanaugh <Ken.Cavanaugh_at_Sun.COM>
Date: Thu, 22 May 2008 18:02:43 -0700

Jeanfrancois Arcand wrote:
>
>
> Ken Cavanaugh wrote:
>> jfarcand_at_dev.java.net wrote:
>>> https://grizzly.dev.java.net/issues/show_bug.cgi?id=84
>>>
>>>
>>>
>>> User jfarcand changed the following:
>>>
>>> What |Old value |New value
>>> ================================================================================
>>>
>>> Assigned to|issues_at_grizzly |oleksiys
>>> --------------------------------------------------------------------------------
>>>
>>> Target milestone|milestone 1 |1.8.0
>>> --------------------------------------------------------------------------------
>>>
>>>
>>>
>>>
>>>
>>> ------- Additional comments from jfarcand_at_dev.java.net Thu May 22
>>> 16:39:59 +0000 2008 -------
>>> Let's see if we can have something for 1.8.0
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: issues-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: issues-help_at_grizzly.dev.java.net
>>>
>>>
>> I thought this is an interesting bug (or perhaps RFE?). I've been
>> doing some work for parallel marshaling
>> that may be relevant here.
>>
>> Part of the infrastructure behind my project is two classes:
>> ByteBufferWriter and
>> ByteBufferReader. Ignoring many details, ByteBufferWriter looks like:
>>
>> public class ByteBufferWriter {
>> public interface BufferHandler {
>> /** Dispose of the current ByteBuffer and return a new one.
>> */
>> EmergeBuffer overflow( EmergeBuffer current ) ;
>>
>> /** Dispose of current buffer. Called
>> * when closing a ByteBufferWriter.
>> */
>> void close( EmergeBuffer current ) ;
>> }
>>
>> ByteBufferWriter( BufferHandler handler, EmergeBuffer buffer ) {
>> ... }
>> // The rest of the API has void putXXX( XXX data ) calls
>> for the various
>> // primitive data types, plus putXXXArray( XXX[] data ) for
>> arrays of primitives.
>> // I also have a VarOctet type that we can ignore for now.
>> ...
>> }
>>
>>
>> The EmergeBuffer is a wrapper around a byte[]-backed indirect
>> ByteBuffer (I have no intention
>> of using direct ByteBuffers, because Charlie's work should make them
>> irrelevant at some point).
>> Its interface is:
>>
>> public class EmergeBuffer {
>> /** Allocate a buffer that has room for dataSize data, and also
>> * may prepend up to headerSize bytes before the start of the data.
>> */
>> public EmergeBuffer( int headerSize, int dataSize ) { ... }
>>
>> public ByteBuffer buffer() { ... }
>>
>> /** Reset the buffer for reading. Moves ByteBuffer position back
>> to 0.
>> */
>> public ByteBuffer reset() { ... }
>>
>> /** Prepend data from header.position() to header.limit() to the
>> * current buffer. This will change the object instance returned
>> by buffer()!
>> * @throws IllegalArgumentException if header.remaining() >
>> headerSize.
>> */
>> public ByteBuffer prepend( ByteBuffer header ) { ... }
>> }
>>
>> EmergeBuffer is implemented in such a way that the maximum amount of
>> data
>> copied is headerSize (it uses two internal ByteBuffers, one that
>> hides the space
>> reserved for the header, and a second one that slices the first after
>> the header,
>> so the position 0 of the second (which is returned by the buffer()
>> call) is
>> position headerSize of the first.
>>
>> I created EmergeBuffer to solve an annoying problem in the
>> ByteBufferReader:
>> dealing with split primitive types. This occurs when writing
>> primitives to a
>> stream, but the underlying transport delivers the data at arbitrary
>> boundaries,
>> possibly in the middle of a primitve. Since the maximum primitive
>> size is 8 bytes,
>> that is the most data that is every copied on a read.
>>
>> Getting back to the ByteBufferWriter, it's operation is quite simple:
>> whenever a
>> write method is called, a check is first made to see if enough space
>> is left in the
>> current buffer (in the case of an array, all the data that fits is
>> written to the
>> current buffer). When no more data can be written, but we still have
>> data to
>> write, the overflow method on the handler is called, allowing the
>> handler to
>> do whatever it needs to do with the buffer. This could include:
>>
>> * write the data to a file
>> * internally queue it in a pipe of some sort
>> * write the data to a communication channel of some sort
>
> I like that one. MINA has a similar mechanism except it keep forever
> growing the buffer internally, which I don't like and cause OOM quite
> easily.
>
>
>
>>
>> This allows hiding all of the ByteBuffer management behind a stream API
>> implemented on top of the ByteBufferWriter.
>> I should probably define an interface that includes just the write
>> operations
>> in this case. java.io.DataOutput is somewhat similar, but includes
>> higher-level
>> details like writeUTF and lacks methods for arrays of primitives
>> (which are important
>> for efficiency reasons). org.omg.CORBA.DataOutputStream includes
>> arrays of primitives,
>> but because it's part of CORBA, it includes CORBA types not relevant
>> to other applications,
>> and has IDL-style method names
>>
>> ByteBufferReader looks like:
>>
>> public class ByteBufferReader {
>> /** Create a new ByteBufferReader.
>> * @param timeout Time in milliseconds to wait for more data.
>> * @throws TimeoutException when a read operation times out
>> waiting for more data.
>> */
>> public ByteBufferReader( long timeout ) { ... }
>
> Can you elaborate on the timeout here? Under which condition the
> timeout exception will happens? Should ByteBufferWriter has a timeout
> as well?
If one of the read operations blocks on the internal ensure method (which is
blocked on a concurrent Queue poll( timeout ) method) for more than timeout
msec, the exception occurs. This is not strictly necessary, but the
poll() method
on LinkedBlockingQueue returns immediately if the queue is empty, while
poll( timeout ) blocks until more data is available, or a timeout occurs.
At this point, I'm just choosing the most natural implementation for my
needs.
>
>
>>
>> /** Allocate a buffer that reserves a large enough header to
>> handle the largest
>> * primitive type.
>> */ public static EmergeBuffer allocate( int size ) { ... }
>>
>> /** Add more data for the reader thread to read.
>> */
>> public void dataReceived( EmergeBuffer bb ) { ... }
>>
>> // The rest of the API contains getXXX( XXX arg ) and
>> getXXXArray( XXX[] arg )
>> // methods similar to the write case. Note that we assume that
>> the length of an
>> // array is known, so that the correct array size can be
>> allocated before the
>> // getXXXArray call. For example, many protocols may marshal the
>> array length
>> // in some form before the array contents. }
>>
>>
>> A ByteBufferReader could be attached to a connection, so that buffers
>> are appended
>> to the ByteBufferReader as they are received.
>
> This looks quite useful for a non blocking http parser.....actually
> for any parser. The ProtocolParser interface would be a good candidate
> for using this stuff.
>
> As in the writer case,
>> ByteBufferReader
>> should implement an appropriate API.
>>
>> One thing missing from the ByteBufferReader API is a handler with a
>> dispose( EmergeBuffer ) method
>> for handling buffers that we have finished using. This doesn't
>> matter much for my
>> current implementation, but would be important if the underlying
>> buffer management
>> is using direct ByteBuffers.
>>
>> Currently these classes are fairly well tested, in the case where a
>> writer writes to
>> a pipe from which a reader reads synchronously. There are some
>> places that need
>> to be addressed in the code to make it properly MT safe.
>
> Wow. Why not committing them into the workspace? I suspect we can use
> those in a couple of place (for sure we can have a special ReadFilter
> that use it).
I'll look into adding these to grizzly. I also took a look at some
simple changes today
to support multi-threaded use of these classes in a producer/consumer model.

Do we have support for TestNG unit tests in
Grizzly? My current test is part of my parallel marshaling suite, but
it's completely
standalone, so it can easily be used elsewhere.

Let's have more discussion about how to build on this in order to put
together
good ByteBuffer management, and hide it behind a stream API.

I envision something like this:

interface ByteBufferReader
interface ByteBufferWriter
class ByteBufferReaderDelagateImpl implements ByteBufferReader
    (just delegates to an instance of ByteBufferReader)
class ByteBufferReaderQueueImpl implements ByteBuffer Reader
    (that is what is currently called ByteBufferReader)
class ByteBufferWriterDelegateImpl implements ByteBufferWriter
    (as in the reader case)
class ByteBufferWriterHandlerImpl implements ByteBufferWriter
    (the current ByteBufferWriter)

Then, we look at something like

class ByteBufferReaderStreamImpl
class ByteBufferWriterStreamImpl

which hide the details of channels and ByteBuffers completely,
building on the classes above. But we still need to decide what to
do for ByteBuffer management, which is probably a good area to get
some feedback from Charlie and you. Basically the stream impls just
delegate to an appropriately initialized Queue/Handler impl
and deal with buffer management internally, putting all of the buffer
management bits in one easy-to-debug place.
>
> +1 for adding them, most probably under
> grizzly/modules/grizzly/src/main/java/com/sun/grizzly/util or
> something like com.sun.grizzly.bb....
>
> Thanks!
>
Ken.