dev@grizzly.java.net

Re: [Proposal - vote] Adding a new ThreadAttachment fro partial read/state implementation

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Tue, 26 Jun 2007 18:04:42 -0400

Hi,

I've committed the implementation and included Charlie's ProtocolParser
idea :-) The svn commit looks like:

> Add support for partial read‥ This commit add:
>
> + The new ThreadAttachment classes described by the proposal on dev_at_grizzly
> + A new interface called ProtocolParser that can be used to find if all the bytes
> are available (and use the ThreadAttachment for persisting the transaction state between the SelectionKey registration on the main Selector.
> + Deprecate StreamAlgorithm interface in favor of the new ProtocolParser
> + Move the rcm module to use the new ProtocolParser interface
> + Added a new abstract classes called ParserProtocolFilter which use a ProtocolParser to decide if all bytes has been read (see ProtocolParserTest for an example).
> + Added new set of API to SelectorHandler to allow configuring a SelectionKeyHandler per SelectorHandler (the default remains the same).
>
> Please review :-)

I've added a unit test so you can see how it works. This makes protocol
implementation quite simple :-)

Thanks

-- Jeanfrancois



Jeanfrancois Arcand wrote:
> Hi Alexey,
>
> Oleksiy Stashok wrote:
>> Hello Jeanfrancois,
>>
>> I like this idea.
>>
>> Just small comment. In code
>>
>> 1 public ThreadAttachment detach(boolean copyState) {
>> 2 try{
>> 3 threadAttachment.setByteBuffer(byteBuffer);
>> 4 threadAttachment.setSSLEngine(sslEngine);
>> 5 threadAttachment.setInputBB(inputBB);
>> 6 threadAttachment.setOutputBB(outputBB);
>> 7 8 return threadAttachment;
>> 9 } finally {
>> 10 // We cannot cache/re-use this object as it might be
>> referenced
>> 11 // by more than one thread. 12 if (copyState){
>> 13 // Re-create a new ByteBuffer
>> 14 byteBuffer =
>> ByteBufferFactory.allocateView(8192,false);
>> 15 threadAttachment = new ThreadAttachment();
>> 16 }
>> 17 threadAttachment.setThreadId(getName() + "-" +
>> getId()); 18 }
>> 19 }
>>
>> probably it makes sense to avoid such code like in lines 8 and 15...
>> from one side you're returning threadAttachment, from other
>> reassigning it in finally block. I looks confusing, at least to me :))
>
> The goal here is to return the 'old' threadAttachment and then re-assign
> an new one. I was struggling trying to come with a clear solution.
> Probably I should use a temporary attribute like:
>
> ThreadAttachment oldTA = threadAttachment;
> return oldTA;
>
>>
>> Good work!
>
> Thanks!
>
> -- Jeanfrancois
>
>>
>> WBR,
>> Alexey.
>>
>> Jeanfrancois Arcand wrote:
>>> Hi,
>>>
>>> I would like to introduce a set of new APIs for handling the case
>>> when an ReadFilter is used and the temporary Selector trick for doing
>>> extra reads cannot be used. The use case is the following:
>>>
>>> + Inside a "ReadFilter", the socketChannel.read(...) reads some bytes
>>> and then starts returning 0 (no more bytes available). In that case,
>>> the SelectionKey must be registered back to its SelectorHandler,
>>> and the state of the transaction needs to be persisted. By state here
>>> I means the byteBuffer that contains the partial read (bytes) and
>>> maybe some application specific data.
>>>
>>> To support that use case, I would like to add two new method to the
>>> WorkerThread interface:
>>>
>>>> 49 /**
>>>> 50 * Detach the current set of attributes (state)
>>>> associated with this instance.
>>>> 51 * Invoking detach(true) will re-create all the
>>>> ByteBuffer associated with
>>>> 52 * this thread, hence this method must be called only
>>>> when required. If
>>>> 53 * you only need to invoke the object, call
>>>> detach(false) instead but make
>>>> 54 * sure you aren't caching or re-using the
>>>> ThreadAttachment with another
>>>> 55 * thread.
>>>> 56 * @param true to copy the attributes into the
>>>> ThreadAttachment and re-create
>>>> 57 * them
>>>> 58 * @return a new ThreadAttachment
>>>> 59 */
>>>> 60 public ThreadAttachment detach(boolean copyState);
>>>> 61
>>>> 62
>>>> 63 /**
>>>> 64 * Attach the ThreadAttachment to this instance. This
>>>> will configure this
>>>> 65 * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>> 66 * @param ThreadAttachment the attachment.
>>>> 67 */
>>>> 68 public void attach(ThreadAttachment threadAttachment);
>>>
>>> Hence when an application needs more bytes and need to register back
>>> to the SelectorHandler:
>>>
>>>
>>> public class MyReadFilter{
>>>
>>> public boolean execute(Context ctx){
>>> ....
>>> WorkerThread wt = ((WorkerThread)Thread.currentThread());
>>> ByteBuffer bb = wt.getByteBuffer();
>>> int length = socketChannel.read(bb);
>>> ...
>>> if (length == 0){
>>>
>>> ThreadAttachment ta = wt.detach(true);
>>> ctx.getSelectionKey().attach(ta);
>>> ....
>>> ctx.setKeyRegistrationState(
>>> Context.KeyRegistrationState.REGISTER);
>>> ...
>>> }
>>>
>>> }
>>>
>>> Internally, the SelectorHandler will do, once bytes are available:
>>>
>>> WorkerThread wt = ((WorkerThread)Thread.currentThread());
>>> wt.attach((ThreadAttachment)selectionKey.attachment());
>>>
>>> and then invoke the Filter Again. Calling wt.attach() will properly
>>> configure the Thread attribute so when MyReadFilter.execute is
>>> called, the previously read bytes will be already available inside
>>> the ByteBuffer.
>>>
>>> I will apply the solution to the SSLReadFilter as well, which
>>> currentlt use the SSLSession to store the state of the transaction.
>>> I'm attaching the diff of the changes.
>>>
>>> Let me know what you think.
>>>
>>> Thanks
>>>
>>> -- Jeanfrancois
>>>
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------------------------------------------------
>>>
>>> Index: WorkerThread.java
>>> ===================================================================
>>> --- WorkerThread.java (revision 308)
>>> +++ WorkerThread.java (working copy)
>>> @@ -25,7 +25,7 @@
>>> import java.nio.ByteBuffer;
>>>
>>> /**
>>> - * Simple interface to allow the addition of <code>Thread</code>
>>> attributes.
>>> + * Simple interface to allow the addition of <code>Thread</code>
>>> attributes.
>>> *
>>> * @author Jean-Francois Arcand
>>> */
>>> @@ -45,4 +45,25 @@
>>> */
>>> public ByteBuffer getByteBuffer();
>>> + + /**
>>> + * Detach the current set of attributes (state) associated with
>>> this instance.
>>> + * Invoking detach(true) will re-create all the ByteBuffer
>>> associated with
>>> + * this thread, hence this method must be called only when
>>> required. If
>>> + * you only need to invoke the object, call detach(false)
>>> instead but make + * sure you aren't caching or re-using the
>>> ThreadAttachment with another
>>> + * thread.
>>> + * @param true to copy the attributes into the ThreadAttachment
>>> and re-create
>>> + * them
>>> + * @return a new ThreadAttachment
>>> + */
>>> + public ThreadAttachment detach(boolean copyState);
>>> + + + /**
>>> + * Attach the ThreadAttachment to this instance. This will
>>> configure this
>>> + * Thread attributes like ByteBuffer, SSLEngine, etc.
>>> + * @param ThreadAttachment the attachment.
>>> + */
>>> + public void attach(ThreadAttachment threadAttachment);
>>> }
>>> Index: WorkerThreadImpl.java
>>> ===================================================================
>>> --- WorkerThreadImpl.java (revision 308)
>>> +++ WorkerThreadImpl.java (working copy)
>>> @@ -70,22 +70,28 @@
>>> /**
>>> * The encrypted ByteBuffer used for handshaking and reading
>>> request bytes.
>>> */
>>> - private ByteBuffer inputBB;
>>> + protected ByteBuffer inputBB;
>>>
>>>
>>> /**
>>> * The encrypted ByteBuffer used for handshaking and writing
>>> response bytes.
>>> */
>>> - private ByteBuffer outputBB;
>>> + protected ByteBuffer outputBB;
>>>
>>>
>>> /**
>>> * The <code>SSLEngine</code> used to manage the SSL over NIO
>>> request.
>>> */
>>> - private SSLEngine sslEngine;
>>> - + protected SSLEngine sslEngine;
>>> + /**
>>> + * The state/attributes on this WorkerThread.
>>> + */
>>> + private ThreadAttachment threadAttachment = new ThreadAttachment();
>>> + + + /**
>>> * Create a Thread that will synchronizes/block on
>>> * <code>Pipeline</code> instance.
>>> * @param threadGroup <code>ThreadGroup</code>
>>> @@ -233,5 +239,53 @@
>>> public void setSSLEngine(SSLEngine sslEngine) {
>>> this.sslEngine = sslEngine;
>>> }
>>> +
>>> + + /**
>>> + * Detach the current set of attributes (state) associated with
>>> this instance.
>>> + * Invoking detach(true) will re-create all the ByteBuffer
>>> associated with
>>> + * this thread, hence this method must be called only when
>>> required. If
>>> + * you only need to invoke the object, call detach(false)
>>> instead but make + * sure you aren't caching or re-using the
>>> ThreadAttachment with another
>>> + * thread.
>>> + * @param true to copy the attributes into the ThreadAttachment
>>> and re-create
>>> + * them.
>>> + * @return a new ThreadAttachment
>>> + */
>>> + public ThreadAttachment detach(boolean copyState) {
>>> + try{
>>> + threadAttachment.setByteBuffer(byteBuffer);
>>> + threadAttachment.setSSLEngine(sslEngine);
>>> + threadAttachment.setInputBB(inputBB);
>>> + threadAttachment.setOutputBB(outputBB);
>>> + + return threadAttachment;
>>> + } finally {
>>> + // We cannot cache/re-use this object as it might be
>>> referenced
>>> + // by more than one thread. + if (copyState){
>>> + // Re-create a new ByteBuffer
>>> + byteBuffer =
>>> ByteBufferFactory.allocateView(8192,false);
>>> + threadAttachment = new ThreadAttachment();
>>> + }
>>> + threadAttachment.setThreadId(getName() + "-" +
>>> getId()); + }
>>> + }
>>> +
>>> + + /**
>>> + * Attach the ThreadAttachment to this instance. This will
>>> configure this
>>> + * Thread attributes like ByteBuffer, SSLEngine, etc.
>>> + * @param ThreadAttachment the attachment.
>>> + */
>>> + public void attach(ThreadAttachment threadAttachment) {
>>> + byteBuffer = threadAttachment.getByteBuffer();
>>> + sslEngine = threadAttachment.getSSLEngine();
>>> + inputBB = threadAttachment.getInputBB();
>>> + outputBB = threadAttachment.getOutputBB();
>>> + + this.threadAttachment = threadAttachment;
>>> + threadAttachment.setThreadId(getName() + "-" + getId());
>>> + }
>>> }
>>>
>>> /*
>>> * The contents of this file are subject to the terms * of the
>>> Common Development and Distribution License * (the License). You
>>> may not use this file except in
>>> * compliance with the License.
>>> * * You can obtain a copy of the license at *
>>> https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>> * See the License for the specific language governing * permissions
>>> and limitations under the License.
>>> * * When distributing Covered Code, include this CDDL * Header
>>> Notice in each file and include the License file * at
>>> glassfish/bootstrap/legal/CDDLv1.0.txt. * If applicable, add the
>>> following below the CDDL Header, * with the fields enclosed by
>>> brackets [] replaced by
>>> * you own identifying information: * "Portions Copyrighted [year]
>>> [name of copyright owner]"
>>> * * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>> */
>>> package com.sun.grizzly.util;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.util.WeakHashMap;
>>> import javax.net.ssl.SSLEngine;
>>>
>>> /**
>>> * This object represent the state of a <code>WorkerThread</code>.
>>> This include
>>> * the ByteBuffer binded to the WorkerThread, application data etc.
>>> * @author Jeanfrancois
>>> */
>>> public class ThreadAttachment {
>>> private long timeout;
>>> private String threadId;
>>> private WeakHashMap<String,Object> map;
>>> private ByteBuffer byteBuffer;
>>> /**
>>> * The encrypted ByteBuffer used for handshaking and reading
>>> request bytes.
>>> */
>>> private ByteBuffer inputBB;
>>>
>>>
>>> /**
>>> * The encrypted ByteBuffer used for handshaking and writing
>>> response bytes.
>>> */
>>> private ByteBuffer outputBB;
>>>
>>>
>>> /**
>>> * The <code>SSLEngine</code> used to manage the SSL over NIO
>>> request.
>>> */
>>> private SSLEngine sslEngine;
>>>
>>> public ThreadAttachment(){
>>> map = new WeakHashMap<String,Object>();
>>> }
>>>
>>> public void setAttribute(String key, Object value){
>>> map.put(key,value);
>>> }
>>>
>>> public Object getAttribute(String key){
>>> return map.get(key);
>>> }
>>> public Object removeAttribute(String key){
>>> return map.remove(key);
>>> }
>>> /**
>>> * Set the <code>ByteBuffer</code> shared this thread
>>> */
>>> public void setByteBuffer(ByteBuffer byteBuffer){
>>> this.byteBuffer = byteBuffer;
>>> }
>>> /**
>>> * Return the <code>ByteBuffer</code> shared this thread
>>> */
>>> public ByteBuffer getByteBuffer(){
>>> return byteBuffer;
>>> }
>>>
>>> /**
>>> * Return the encrypted <code>ByteBuffer</code> used to handle
>>> request.
>>> * @return <code>ByteBuffer</code>
>>> */
>>> public ByteBuffer getInputBB(){
>>> return inputBB;
>>> }
>>> /**
>>> * Set the encrypted <code>ByteBuffer</code> used to handle request.
>>> * @param inputBB <code>ByteBuffer</code>
>>> */ public void setInputBB(ByteBuffer inputBB){
>>> this.inputBB = inputBB;
>>> }
>>>
>>> /**
>>> * Return the encrypted <code>ByteBuffer</code> used to handle
>>> response.
>>> * @return <code>ByteBuffer</code>
>>> */ public ByteBuffer getOutputBB(){
>>> return outputBB;
>>> }
>>> /**
>>> * Set the encrypted <code>ByteBuffer</code> used to handle
>>> response.
>>> * @param outputBB <code>ByteBuffer</code>
>>> */ public void setOutputBB(ByteBuffer outputBB){
>>> this.outputBB = outputBB;
>>> }
>>> /**
>>> * Set the <code>SSLEngine</code>.
>>> * @return <code>SSLEngine</code>
>>> */
>>> public SSLEngine getSSLEngine() {
>>> return sslEngine;
>>> }
>>>
>>> /**
>>> * Get the <code>SSLEngine</code>.
>>> * @param sslEngine <code>SSLEngine</code>
>>> */
>>> public void setSSLEngine(SSLEngine sslEngine) {
>>> this.sslEngine = sslEngine;
>>> } public String getThreadId() {
>>> return threadId;
>>> }
>>>
>>> public void setThreadId(String threadId) {
>>> this.threadId = threadId;
>>> }
>>>
>>> public void setTimeout(long timeout){
>>> this.timeout = timeout;
>>> }
>>> }
>>>
>>>
>>> ------------------------------------------------------------------------
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>