dev@grizzly.java.net

Re: [Proposal - vote] Adding a new ThreadAttachment fro partial read/state implementation

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Wed, 27 Jun 2007 13:00:02 -0400

Salut,

charlie hunt wrote:
> I'm thinking we might consider in addition to voting putting this
> proposal on the agenda for the next Project Grizzly meeting.
>
> Unfortunately, we won't have the next Project Grizzly meeting until July
> 11th since next week is July 4th and the USA based members will out of
> the office as a result of the holiday. So, that means we wouldn't be
> able to talk about in a Project Grizzly meeting until July 11th.
>
> Will it impede our progress if we wait til July 11th for a more formal
> review and final vote ?

I would think we should discuss using this email thread. I would like to
do a release soon with all the bug fixes and a couple of cool feature
like this one. So let's keep the discussion by emails.

What do you think?

-- Jeanfrancois


>
> charlie ...
>
> Jeanfrancois Arcand wrote:
>> Hi,
>>
>> I've committed the implementation and included Charlie's
>> ProtocolParser idea :-) The svn commit looks like:
>>
>>> Add support for partial read‥ This commit add:
>>>
>>> + The new ThreadAttachment classes described by the proposal on
>>> dev_at_grizzly
>>> + A new interface called ProtocolParser that can be used to find if
>>> all the bytes
>>> are available (and use the ThreadAttachment for persisting the
>>> transaction state between the SelectionKey registration on the main
>>> Selector.
>>> + Deprecate StreamAlgorithm interface in favor of the new ProtocolParser
>>> + Move the rcm module to use the new ProtocolParser interface
>>> + Added a new abstract classes called ParserProtocolFilter which use
>>> a ProtocolParser to decide if all bytes has been read (see
>>> ProtocolParserTest for an example).
>>> + Added new set of API to SelectorHandler to allow configuring a
>>> SelectionKeyHandler per SelectorHandler (the default remains the same).
>>>
>>> Please review :-)
>>
>> I've added a unit test so you can see how it works. This makes
>> protocol implementation quite simple :-)
>>
>> Thanks
>>
>> -- Jeanfrancois
>>
>>
>>
>> Jeanfrancois Arcand wrote:
>>> Hi Alexey,
>>>
>>> Oleksiy Stashok wrote:
>>>> Hello Jeanfrancois,
>>>>
>>>> I like this idea.
>>>>
>>>> Just small comment. In code
>>>>
>>>> 1 public ThreadAttachment detach(boolean copyState) {
>>>> 2 try{
>>>> 3 threadAttachment.setByteBuffer(byteBuffer);
>>>> 4 threadAttachment.setSSLEngine(sslEngine);
>>>> 5 threadAttachment.setInputBB(inputBB);
>>>> 6 threadAttachment.setOutputBB(outputBB);
>>>> 7 8 return threadAttachment;
>>>> 9 } finally {
>>>> 10 // We cannot cache/re-use this object as it might be
>>>> referenced
>>>> 11 // by more than one thread. 12 if
>>>> (copyState){
>>>> 13 // Re-create a new ByteBuffer
>>>> 14 byteBuffer =
>>>> ByteBufferFactory.allocateView(8192,false);
>>>> 15 threadAttachment = new ThreadAttachment();
>>>> 16 }
>>>> 17 threadAttachment.setThreadId(getName() + "-" +
>>>> getId()); 18 }
>>>> 19 }
>>>>
>>>> probably it makes sense to avoid such code like in lines 8 and 15...
>>>> from one side you're returning threadAttachment, from other
>>>> reassigning it in finally block. I looks confusing, at least to me :))
>>>
>>> The goal here is to return the 'old' threadAttachment and then
>>> re-assign an new one. I was struggling trying to come with a clear
>>> solution. Probably I should use a temporary attribute like:
>>>
>>> ThreadAttachment oldTA = threadAttachment;
>>> return oldTA;
>>>
>>>>
>>>> Good work!
>>>
>>> Thanks!
>>>
>>> -- Jeanfrancois
>>>
>>>>
>>>> WBR,
>>>> Alexey.
>>>>
>>>> Jeanfrancois Arcand wrote:
>>>>> Hi,
>>>>>
>>>>> I would like to introduce a set of new APIs for handling the case
>>>>> when an ReadFilter is used and the temporary Selector trick for
>>>>> doing extra reads cannot be used. The use case is the following:
>>>>>
>>>>> + Inside a "ReadFilter", the socketChannel.read(...) reads some
>>>>> bytes and then starts returning 0 (no more bytes available). In
>>>>> that case, the SelectionKey must be registered back to its
>>>>> SelectorHandler, and the state of the transaction needs to be
>>>>> persisted. By state here I means the byteBuffer that contains the
>>>>> partial read (bytes) and maybe some application specific data.
>>>>>
>>>>> To support that use case, I would like to add two new method to the
>>>>> WorkerThread interface:
>>>>>
>>>>>> 49 /**
>>>>>> 50 * Detach the current set of attributes (state)
>>>>>> associated with this instance.
>>>>>> 51 * Invoking detach(true) will re-create all the
>>>>>> ByteBuffer associated with
>>>>>> 52 * this thread, hence this method must be called only
>>>>>> when required. If
>>>>>> 53 * you only need to invoke the object, call
>>>>>> detach(false) instead but make
>>>>>> 54 * sure you aren't caching or re-using the
>>>>>> ThreadAttachment with another
>>>>>> 55 * thread.
>>>>>> 56 * @param true to copy the attributes into the
>>>>>> ThreadAttachment and re-create
>>>>>> 57 * them
>>>>>> 58 * @return a new ThreadAttachment
>>>>>> 59 */
>>>>>> 60 public ThreadAttachment detach(boolean copyState);
>>>>>> 61
>>>>>> 62
>>>>>> 63 /**
>>>>>> 64 * Attach the ThreadAttachment to this instance. This
>>>>>> will configure this
>>>>>> 65 * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>>>> 66 * @param ThreadAttachment the attachment.
>>>>>> 67 */
>>>>>> 68 public void attach(ThreadAttachment threadAttachment);
>>>>>
>>>>> Hence when an application needs more bytes and need to register
>>>>> back to the SelectorHandler:
>>>>>
>>>>>
>>>>> public class MyReadFilter{
>>>>>
>>>>> public boolean execute(Context ctx){
>>>>> ....
>>>>> WorkerThread wt = ((WorkerThread)Thread.currentThread());
>>>>> ByteBuffer bb = wt.getByteBuffer();
>>>>> int length = socketChannel.read(bb);
>>>>> ...
>>>>> if (length == 0){
>>>>>
>>>>> ThreadAttachment ta = wt.detach(true);
>>>>> ctx.getSelectionKey().attach(ta);
>>>>> ....
>>>>> ctx.setKeyRegistrationState(
>>>>> Context.KeyRegistrationState.REGISTER);
>>>>> ...
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>> Internally, the SelectorHandler will do, once bytes are available:
>>>>>
>>>>> WorkerThread wt = ((WorkerThread)Thread.currentThread());
>>>>> wt.attach((ThreadAttachment)selectionKey.attachment());
>>>>>
>>>>> and then invoke the Filter Again. Calling wt.attach() will properly
>>>>> configure the Thread attribute so when MyReadFilter.execute is
>>>>> called, the previously read bytes will be already available inside
>>>>> the ByteBuffer.
>>>>>
>>>>> I will apply the solution to the SSLReadFilter as well, which
>>>>> currentlt use the SSLSession to store the state of the transaction.
>>>>> I'm attaching the diff of the changes.
>>>>>
>>>>> Let me know what you think.
>>>>>
>>>>> Thanks
>>>>>
>>>>> -- Jeanfrancois
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------------------------------------------------
>>>>>
>>>>>
>>>>> Index: WorkerThread.java
>>>>> ===================================================================
>>>>> --- WorkerThread.java (revision 308)
>>>>> +++ WorkerThread.java (working copy)
>>>>> @@ -25,7 +25,7 @@
>>>>> import java.nio.ByteBuffer;
>>>>>
>>>>> /**
>>>>> - * Simple interface to allow the addition of <code>Thread</code>
>>>>> attributes.
>>>>> + * Simple interface to allow the addition of <code>Thread</code>
>>>>> attributes.
>>>>> *
>>>>> * @author Jean-Francois Arcand
>>>>> */
>>>>> @@ -45,4 +45,25 @@
>>>>> */
>>>>> public ByteBuffer getByteBuffer();
>>>>> + + /**
>>>>> + * Detach the current set of attributes (state) associated
>>>>> with this instance.
>>>>> + * Invoking detach(true) will re-create all the ByteBuffer
>>>>> associated with
>>>>> + * this thread, hence this method must be called only when
>>>>> required. If
>>>>> + * you only need to invoke the object, call detach(false)
>>>>> instead but make + * sure you aren't caching or re-using the
>>>>> ThreadAttachment with another
>>>>> + * thread.
>>>>> + * @param true to copy the attributes into the
>>>>> ThreadAttachment and re-create
>>>>> + * them
>>>>> + * @return a new ThreadAttachment
>>>>> + */
>>>>> + public ThreadAttachment detach(boolean copyState);
>>>>> + + + /**
>>>>> + * Attach the ThreadAttachment to this instance. This will
>>>>> configure this
>>>>> + * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>>> + * @param ThreadAttachment the attachment.
>>>>> + */
>>>>> + public void attach(ThreadAttachment threadAttachment);
>>>>> }
>>>>> Index: WorkerThreadImpl.java
>>>>> ===================================================================
>>>>> --- WorkerThreadImpl.java (revision 308)
>>>>> +++ WorkerThreadImpl.java (working copy)
>>>>> @@ -70,22 +70,28 @@
>>>>> /**
>>>>> * The encrypted ByteBuffer used for handshaking and reading
>>>>> request bytes.
>>>>> */
>>>>> - private ByteBuffer inputBB;
>>>>> + protected ByteBuffer inputBB;
>>>>>
>>>>>
>>>>> /**
>>>>> * The encrypted ByteBuffer used for handshaking and writing
>>>>> response bytes.
>>>>> */
>>>>> - private ByteBuffer outputBB;
>>>>> + protected ByteBuffer outputBB;
>>>>>
>>>>>
>>>>> /**
>>>>> * The <code>SSLEngine</code> used to manage the SSL over NIO
>>>>> request.
>>>>> */
>>>>> - private SSLEngine sslEngine;
>>>>> - + protected SSLEngine sslEngine;
>>>>> + /**
>>>>> + * The state/attributes on this WorkerThread.
>>>>> + */
>>>>> + private ThreadAttachment threadAttachment = new
>>>>> ThreadAttachment();
>>>>> + + + /**
>>>>> * Create a Thread that will synchronizes/block on
>>>>> * <code>Pipeline</code> instance.
>>>>> * @param threadGroup <code>ThreadGroup</code>
>>>>> @@ -233,5 +239,53 @@
>>>>> public void setSSLEngine(SSLEngine sslEngine) {
>>>>> this.sslEngine = sslEngine;
>>>>> }
>>>>> +
>>>>> + + /**
>>>>> + * Detach the current set of attributes (state) associated
>>>>> with this instance.
>>>>> + * Invoking detach(true) will re-create all the ByteBuffer
>>>>> associated with
>>>>> + * this thread, hence this method must be called only when
>>>>> required. If
>>>>> + * you only need to invoke the object, call detach(false)
>>>>> instead but make + * sure you aren't caching or re-using the
>>>>> ThreadAttachment with another
>>>>> + * thread.
>>>>> + * @param true to copy the attributes into the
>>>>> ThreadAttachment and re-create
>>>>> + * them.
>>>>> + * @return a new ThreadAttachment
>>>>> + */
>>>>> + public ThreadAttachment detach(boolean copyState) {
>>>>> + try{
>>>>> + threadAttachment.setByteBuffer(byteBuffer);
>>>>> + threadAttachment.setSSLEngine(sslEngine);
>>>>> + threadAttachment.setInputBB(inputBB);
>>>>> + threadAttachment.setOutputBB(outputBB);
>>>>> + + return threadAttachment;
>>>>> + } finally {
>>>>> + // We cannot cache/re-use this object as it might be
>>>>> referenced
>>>>> + // by more than one thread. + if
>>>>> (copyState){
>>>>> + // Re-create a new ByteBuffer
>>>>> + byteBuffer =
>>>>> ByteBufferFactory.allocateView(8192,false);
>>>>> + threadAttachment = new ThreadAttachment();
>>>>> + }
>>>>> + threadAttachment.setThreadId(getName() + "-" +
>>>>> getId()); + }
>>>>> + }
>>>>> +
>>>>> + + /**
>>>>> + * Attach the ThreadAttachment to this instance. This will
>>>>> configure this
>>>>> + * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>>> + * @param ThreadAttachment the attachment.
>>>>> + */
>>>>> + public void attach(ThreadAttachment threadAttachment) {
>>>>> + byteBuffer = threadAttachment.getByteBuffer();
>>>>> + sslEngine = threadAttachment.getSSLEngine();
>>>>> + inputBB = threadAttachment.getInputBB();
>>>>> + outputBB = threadAttachment.getOutputBB();
>>>>> + + this.threadAttachment = threadAttachment;
>>>>> + threadAttachment.setThreadId(getName() + "-" + getId());
>>>>> + }
>>>>> }
>>>>>
>>>>> /*
>>>>> * The contents of this file are subject to the terms * of the
>>>>> Common Development and Distribution License * (the License). You
>>>>> may not use this file except in
>>>>> * compliance with the License.
>>>>> * * You can obtain a copy of the license at *
>>>>> https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>>> * See the License for the specific language governing *
>>>>> permissions and limitations under the License.
>>>>> * * When distributing Covered Code, include this CDDL * Header
>>>>> Notice in each file and include the License file * at
>>>>> glassfish/bootstrap/legal/CDDLv1.0.txt. * If applicable, add the
>>>>> following below the CDDL Header, * with the fields enclosed by
>>>>> brackets [] replaced by
>>>>> * you own identifying information: * "Portions Copyrighted [year]
>>>>> [name of copyright owner]"
>>>>> * * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>>>> */
>>>>> package com.sun.grizzly.util;
>>>>>
>>>>> import java.nio.ByteBuffer;
>>>>> import java.util.WeakHashMap;
>>>>> import javax.net.ssl.SSLEngine;
>>>>>
>>>>> /**
>>>>> * This object represent the state of a <code>WorkerThread</code>.
>>>>> This include
>>>>> * the ByteBuffer binded to the WorkerThread, application data etc.
>>>>> * @author Jeanfrancois
>>>>> */
>>>>> public class ThreadAttachment {
>>>>> private long timeout;
>>>>> private String threadId;
>>>>> private WeakHashMap<String,Object> map;
>>>>> private ByteBuffer byteBuffer;
>>>>> /**
>>>>> * The encrypted ByteBuffer used for handshaking and reading
>>>>> request bytes.
>>>>> */
>>>>> private ByteBuffer inputBB;
>>>>>
>>>>>
>>>>> /**
>>>>> * The encrypted ByteBuffer used for handshaking and writing
>>>>> response bytes.
>>>>> */
>>>>> private ByteBuffer outputBB;
>>>>>
>>>>>
>>>>> /**
>>>>> * The <code>SSLEngine</code> used to manage the SSL over NIO
>>>>> request.
>>>>> */
>>>>> private SSLEngine sslEngine;
>>>>>
>>>>> public ThreadAttachment(){
>>>>> map = new WeakHashMap<String,Object>();
>>>>> }
>>>>>
>>>>> public void setAttribute(String key, Object value){
>>>>> map.put(key,value);
>>>>> }
>>>>>
>>>>> public Object getAttribute(String key){
>>>>> return map.get(key);
>>>>> }
>>>>> public Object removeAttribute(String key){
>>>>> return map.remove(key);
>>>>> }
>>>>> /**
>>>>> * Set the <code>ByteBuffer</code> shared this thread
>>>>> */
>>>>> public void setByteBuffer(ByteBuffer byteBuffer){
>>>>> this.byteBuffer = byteBuffer;
>>>>> }
>>>>> /**
>>>>> * Return the <code>ByteBuffer</code> shared this thread
>>>>> */
>>>>> public ByteBuffer getByteBuffer(){
>>>>> return byteBuffer;
>>>>> }
>>>>>
>>>>> /**
>>>>> * Return the encrypted <code>ByteBuffer</code> used to handle
>>>>> request.
>>>>> * @return <code>ByteBuffer</code>
>>>>> */
>>>>> public ByteBuffer getInputBB(){
>>>>> return inputBB;
>>>>> }
>>>>> /**
>>>>> * Set the encrypted <code>ByteBuffer</code> used to handle
>>>>> request.
>>>>> * @param inputBB <code>ByteBuffer</code>
>>>>> */ public void setInputBB(ByteBuffer inputBB){
>>>>> this.inputBB = inputBB;
>>>>> }
>>>>>
>>>>> /**
>>>>> * Return the encrypted <code>ByteBuffer</code> used to handle
>>>>> response.
>>>>> * @return <code>ByteBuffer</code>
>>>>> */ public ByteBuffer getOutputBB(){
>>>>> return outputBB;
>>>>> }
>>>>> /**
>>>>> * Set the encrypted <code>ByteBuffer</code> used to handle
>>>>> response.
>>>>> * @param outputBB <code>ByteBuffer</code>
>>>>> */ public void setOutputBB(ByteBuffer outputBB){
>>>>> this.outputBB = outputBB;
>>>>> }
>>>>> /**
>>>>> * Set the <code>SSLEngine</code>.
>>>>> * @return <code>SSLEngine</code>
>>>>> */
>>>>> public SSLEngine getSSLEngine() {
>>>>> return sslEngine;
>>>>> }
>>>>>
>>>>> /**
>>>>> * Get the <code>SSLEngine</code>.
>>>>> * @param sslEngine <code>SSLEngine</code>
>>>>> */
>>>>> public void setSSLEngine(SSLEngine sslEngine) {
>>>>> this.sslEngine = sslEngine;
>>>>> } public String getThreadId() {
>>>>> return threadId;
>>>>> }
>>>>>
>>>>> public void setThreadId(String threadId) {
>>>>> this.threadId = threadId;
>>>>> }
>>>>>
>>>>> public void setTimeout(long timeout){
>>>>> this.timeout = timeout;
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> ------------------------------------------------------------------------
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
>