dev@grizzly.java.net

Re: [Proposal - vote] Adding a new ThreadAttachment fro partial read/state implementation

From: charlie hunt <charlie.hunt_at_sun.com>
Date: Wed, 27 Jun 2007 11:53:03 -0500

I'm thinking we might consider in addition to voting putting this
proposal on the agenda for the next Project Grizzly meeting.

Unfortunately, we won't have the next Project Grizzly meeting until July
11th since next week is July 4th and the USA based members will out of
the office as a result of the holiday. So, that means we wouldn't be
able to talk about in a Project Grizzly meeting until July 11th.

Will it impede our progress if we wait til July 11th for a more formal
review and final vote ?

charlie ...

Jeanfrancois Arcand wrote:
> Hi,
>
> I've committed the implementation and included Charlie's
> ProtocolParser idea :-) The svn commit looks like:
>
>> Add support for partial read‥ This commit add:
>>
>> + The new ThreadAttachment classes described by the proposal on
>> dev_at_grizzly
>> + A new interface called ProtocolParser that can be used to find if
>> all the bytes
>> are available (and use the ThreadAttachment for persisting the
>> transaction state between the SelectionKey registration on the main
>> Selector.
>> + Deprecate StreamAlgorithm interface in favor of the new ProtocolParser
>> + Move the rcm module to use the new ProtocolParser interface
>> + Added a new abstract classes called ParserProtocolFilter which use
>> a ProtocolParser to decide if all bytes has been read (see
>> ProtocolParserTest for an example).
>> + Added new set of API to SelectorHandler to allow configuring a
>> SelectionKeyHandler per SelectorHandler (the default remains the same).
>>
>> Please review :-)
>
> I've added a unit test so you can see how it works. This makes
> protocol implementation quite simple :-)
>
> Thanks
>
> -- Jeanfrancois
>
>
>
> Jeanfrancois Arcand wrote:
>> Hi Alexey,
>>
>> Oleksiy Stashok wrote:
>>> Hello Jeanfrancois,
>>>
>>> I like this idea.
>>>
>>> Just small comment. In code
>>>
>>> 1 public ThreadAttachment detach(boolean copyState) {
>>> 2 try{
>>> 3 threadAttachment.setByteBuffer(byteBuffer);
>>> 4 threadAttachment.setSSLEngine(sslEngine);
>>> 5 threadAttachment.setInputBB(inputBB);
>>> 6 threadAttachment.setOutputBB(outputBB);
>>> 7 8 return threadAttachment;
>>> 9 } finally {
>>> 10 // We cannot cache/re-use this object as it might be
>>> referenced
>>> 11 // by more than one thread. 12 if
>>> (copyState){
>>> 13 // Re-create a new ByteBuffer
>>> 14 byteBuffer =
>>> ByteBufferFactory.allocateView(8192,false);
>>> 15 threadAttachment = new ThreadAttachment();
>>> 16 }
>>> 17 threadAttachment.setThreadId(getName() + "-" +
>>> getId()); 18 }
>>> 19 }
>>>
>>> probably it makes sense to avoid such code like in lines 8 and 15...
>>> from one side you're returning threadAttachment, from other
>>> reassigning it in finally block. I looks confusing, at least to me :))
>>
>> The goal here is to return the 'old' threadAttachment and then
>> re-assign an new one. I was struggling trying to come with a clear
>> solution. Probably I should use a temporary attribute like:
>>
>> ThreadAttachment oldTA = threadAttachment;
>> return oldTA;
>>
>>>
>>> Good work!
>>
>> Thanks!
>>
>> -- Jeanfrancois
>>
>>>
>>> WBR,
>>> Alexey.
>>>
>>> Jeanfrancois Arcand wrote:
>>>> Hi,
>>>>
>>>> I would like to introduce a set of new APIs for handling the case
>>>> when an ReadFilter is used and the temporary Selector trick for
>>>> doing extra reads cannot be used. The use case is the following:
>>>>
>>>> + Inside a "ReadFilter", the socketChannel.read(...) reads some
>>>> bytes and then starts returning 0 (no more bytes available). In
>>>> that case, the SelectionKey must be registered back to its
>>>> SelectorHandler, and the state of the transaction needs to be
>>>> persisted. By state here I means the byteBuffer that contains the
>>>> partial read (bytes) and maybe some application specific data.
>>>>
>>>> To support that use case, I would like to add two new method to the
>>>> WorkerThread interface:
>>>>
>>>>> 49 /**
>>>>> 50 * Detach the current set of attributes (state)
>>>>> associated with this instance.
>>>>> 51 * Invoking detach(true) will re-create all the
>>>>> ByteBuffer associated with
>>>>> 52 * this thread, hence this method must be called only
>>>>> when required. If
>>>>> 53 * you only need to invoke the object, call
>>>>> detach(false) instead but make
>>>>> 54 * sure you aren't caching or re-using the
>>>>> ThreadAttachment with another
>>>>> 55 * thread.
>>>>> 56 * @param true to copy the attributes into the
>>>>> ThreadAttachment and re-create
>>>>> 57 * them
>>>>> 58 * @return a new ThreadAttachment
>>>>> 59 */
>>>>> 60 public ThreadAttachment detach(boolean copyState);
>>>>> 61
>>>>> 62
>>>>> 63 /**
>>>>> 64 * Attach the ThreadAttachment to this instance. This
>>>>> will configure this
>>>>> 65 * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>>> 66 * @param ThreadAttachment the attachment.
>>>>> 67 */
>>>>> 68 public void attach(ThreadAttachment threadAttachment);
>>>>
>>>> Hence when an application needs more bytes and need to register
>>>> back to the SelectorHandler:
>>>>
>>>>
>>>> public class MyReadFilter{
>>>>
>>>> public boolean execute(Context ctx){
>>>> ....
>>>> WorkerThread wt = ((WorkerThread)Thread.currentThread());
>>>> ByteBuffer bb = wt.getByteBuffer();
>>>> int length = socketChannel.read(bb);
>>>> ...
>>>> if (length == 0){
>>>>
>>>> ThreadAttachment ta = wt.detach(true);
>>>> ctx.getSelectionKey().attach(ta);
>>>> ....
>>>> ctx.setKeyRegistrationState(
>>>> Context.KeyRegistrationState.REGISTER);
>>>> ...
>>>> }
>>>>
>>>> }
>>>>
>>>> Internally, the SelectorHandler will do, once bytes are available:
>>>>
>>>> WorkerThread wt = ((WorkerThread)Thread.currentThread());
>>>> wt.attach((ThreadAttachment)selectionKey.attachment());
>>>>
>>>> and then invoke the Filter Again. Calling wt.attach() will properly
>>>> configure the Thread attribute so when MyReadFilter.execute is
>>>> called, the previously read bytes will be already available inside
>>>> the ByteBuffer.
>>>>
>>>> I will apply the solution to the SSLReadFilter as well, which
>>>> currentlt use the SSLSession to store the state of the transaction.
>>>> I'm attaching the diff of the changes.
>>>>
>>>> Let me know what you think.
>>>>
>>>> Thanks
>>>>
>>>> -- Jeanfrancois
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------------------
>>>>
>>>>
>>>> Index: WorkerThread.java
>>>> ===================================================================
>>>> --- WorkerThread.java (revision 308)
>>>> +++ WorkerThread.java (working copy)
>>>> @@ -25,7 +25,7 @@
>>>> import java.nio.ByteBuffer;
>>>>
>>>> /**
>>>> - * Simple interface to allow the addition of <code>Thread</code>
>>>> attributes.
>>>> + * Simple interface to allow the addition of <code>Thread</code>
>>>> attributes.
>>>> *
>>>> * @author Jean-Francois Arcand
>>>> */
>>>> @@ -45,4 +45,25 @@
>>>> */
>>>> public ByteBuffer getByteBuffer();
>>>> + + /**
>>>> + * Detach the current set of attributes (state) associated
>>>> with this instance.
>>>> + * Invoking detach(true) will re-create all the ByteBuffer
>>>> associated with
>>>> + * this thread, hence this method must be called only when
>>>> required. If
>>>> + * you only need to invoke the object, call detach(false)
>>>> instead but make + * sure you aren't caching or re-using the
>>>> ThreadAttachment with another
>>>> + * thread.
>>>> + * @param true to copy the attributes into the
>>>> ThreadAttachment and re-create
>>>> + * them
>>>> + * @return a new ThreadAttachment
>>>> + */
>>>> + public ThreadAttachment detach(boolean copyState);
>>>> + + + /**
>>>> + * Attach the ThreadAttachment to this instance. This will
>>>> configure this
>>>> + * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>> + * @param ThreadAttachment the attachment.
>>>> + */
>>>> + public void attach(ThreadAttachment threadAttachment);
>>>> }
>>>> Index: WorkerThreadImpl.java
>>>> ===================================================================
>>>> --- WorkerThreadImpl.java (revision 308)
>>>> +++ WorkerThreadImpl.java (working copy)
>>>> @@ -70,22 +70,28 @@
>>>> /**
>>>> * The encrypted ByteBuffer used for handshaking and reading
>>>> request bytes.
>>>> */
>>>> - private ByteBuffer inputBB;
>>>> + protected ByteBuffer inputBB;
>>>>
>>>>
>>>> /**
>>>> * The encrypted ByteBuffer used for handshaking and writing
>>>> response bytes.
>>>> */
>>>> - private ByteBuffer outputBB;
>>>> + protected ByteBuffer outputBB;
>>>>
>>>>
>>>> /**
>>>> * The <code>SSLEngine</code> used to manage the SSL over NIO
>>>> request.
>>>> */
>>>> - private SSLEngine sslEngine;
>>>> - + protected SSLEngine sslEngine;
>>>> + /**
>>>> + * The state/attributes on this WorkerThread.
>>>> + */
>>>> + private ThreadAttachment threadAttachment = new
>>>> ThreadAttachment();
>>>> + + + /**
>>>> * Create a Thread that will synchronizes/block on
>>>> * <code>Pipeline</code> instance.
>>>> * @param threadGroup <code>ThreadGroup</code>
>>>> @@ -233,5 +239,53 @@
>>>> public void setSSLEngine(SSLEngine sslEngine) {
>>>> this.sslEngine = sslEngine;
>>>> }
>>>> +
>>>> + + /**
>>>> + * Detach the current set of attributes (state) associated
>>>> with this instance.
>>>> + * Invoking detach(true) will re-create all the ByteBuffer
>>>> associated with
>>>> + * this thread, hence this method must be called only when
>>>> required. If
>>>> + * you only need to invoke the object, call detach(false)
>>>> instead but make + * sure you aren't caching or re-using the
>>>> ThreadAttachment with another
>>>> + * thread.
>>>> + * @param true to copy the attributes into the
>>>> ThreadAttachment and re-create
>>>> + * them.
>>>> + * @return a new ThreadAttachment
>>>> + */
>>>> + public ThreadAttachment detach(boolean copyState) {
>>>> + try{
>>>> + threadAttachment.setByteBuffer(byteBuffer);
>>>> + threadAttachment.setSSLEngine(sslEngine);
>>>> + threadAttachment.setInputBB(inputBB);
>>>> + threadAttachment.setOutputBB(outputBB);
>>>> + + return threadAttachment;
>>>> + } finally {
>>>> + // We cannot cache/re-use this object as it might be
>>>> referenced
>>>> + // by more than one thread. + if
>>>> (copyState){
>>>> + // Re-create a new ByteBuffer
>>>> + byteBuffer =
>>>> ByteBufferFactory.allocateView(8192,false);
>>>> + threadAttachment = new ThreadAttachment();
>>>> + }
>>>> + threadAttachment.setThreadId(getName() + "-" +
>>>> getId()); + }
>>>> + }
>>>> +
>>>> + + /**
>>>> + * Attach the ThreadAttachment to this instance. This will
>>>> configure this
>>>> + * Thread attributes like ByteBuffer, SSLEngine, etc.
>>>> + * @param ThreadAttachment the attachment.
>>>> + */
>>>> + public void attach(ThreadAttachment threadAttachment) {
>>>> + byteBuffer = threadAttachment.getByteBuffer();
>>>> + sslEngine = threadAttachment.getSSLEngine();
>>>> + inputBB = threadAttachment.getInputBB();
>>>> + outputBB = threadAttachment.getOutputBB();
>>>> + + this.threadAttachment = threadAttachment;
>>>> + threadAttachment.setThreadId(getName() + "-" + getId());
>>>> + }
>>>> }
>>>>
>>>> /*
>>>> * The contents of this file are subject to the terms * of the
>>>> Common Development and Distribution License * (the License). You
>>>> may not use this file except in
>>>> * compliance with the License.
>>>> * * You can obtain a copy of the license at *
>>>> https://glassfish.dev.java.net/public/CDDLv1.0.html or
>>>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>>>> * See the License for the specific language governing *
>>>> permissions and limitations under the License.
>>>> * * When distributing Covered Code, include this CDDL * Header
>>>> Notice in each file and include the License file * at
>>>> glassfish/bootstrap/legal/CDDLv1.0.txt. * If applicable, add the
>>>> following below the CDDL Header, * with the fields enclosed by
>>>> brackets [] replaced by
>>>> * you own identifying information: * "Portions Copyrighted [year]
>>>> [name of copyright owner]"
>>>> * * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>>>> */
>>>> package com.sun.grizzly.util;
>>>>
>>>> import java.nio.ByteBuffer;
>>>> import java.util.WeakHashMap;
>>>> import javax.net.ssl.SSLEngine;
>>>>
>>>> /**
>>>> * This object represent the state of a <code>WorkerThread</code>.
>>>> This include
>>>> * the ByteBuffer binded to the WorkerThread, application data etc.
>>>> * @author Jeanfrancois
>>>> */
>>>> public class ThreadAttachment {
>>>> private long timeout;
>>>> private String threadId;
>>>> private WeakHashMap<String,Object> map;
>>>> private ByteBuffer byteBuffer;
>>>> /**
>>>> * The encrypted ByteBuffer used for handshaking and reading
>>>> request bytes.
>>>> */
>>>> private ByteBuffer inputBB;
>>>>
>>>>
>>>> /**
>>>> * The encrypted ByteBuffer used for handshaking and writing
>>>> response bytes.
>>>> */
>>>> private ByteBuffer outputBB;
>>>>
>>>>
>>>> /**
>>>> * The <code>SSLEngine</code> used to manage the SSL over NIO
>>>> request.
>>>> */
>>>> private SSLEngine sslEngine;
>>>>
>>>> public ThreadAttachment(){
>>>> map = new WeakHashMap<String,Object>();
>>>> }
>>>>
>>>> public void setAttribute(String key, Object value){
>>>> map.put(key,value);
>>>> }
>>>>
>>>> public Object getAttribute(String key){
>>>> return map.get(key);
>>>> }
>>>> public Object removeAttribute(String key){
>>>> return map.remove(key);
>>>> }
>>>> /**
>>>> * Set the <code>ByteBuffer</code> shared this thread
>>>> */
>>>> public void setByteBuffer(ByteBuffer byteBuffer){
>>>> this.byteBuffer = byteBuffer;
>>>> }
>>>> /**
>>>> * Return the <code>ByteBuffer</code> shared this thread
>>>> */
>>>> public ByteBuffer getByteBuffer(){
>>>> return byteBuffer;
>>>> }
>>>>
>>>> /**
>>>> * Return the encrypted <code>ByteBuffer</code> used to handle
>>>> request.
>>>> * @return <code>ByteBuffer</code>
>>>> */
>>>> public ByteBuffer getInputBB(){
>>>> return inputBB;
>>>> }
>>>> /**
>>>> * Set the encrypted <code>ByteBuffer</code> used to handle
>>>> request.
>>>> * @param inputBB <code>ByteBuffer</code>
>>>> */ public void setInputBB(ByteBuffer inputBB){
>>>> this.inputBB = inputBB;
>>>> }
>>>>
>>>> /**
>>>> * Return the encrypted <code>ByteBuffer</code> used to handle
>>>> response.
>>>> * @return <code>ByteBuffer</code>
>>>> */ public ByteBuffer getOutputBB(){
>>>> return outputBB;
>>>> }
>>>> /**
>>>> * Set the encrypted <code>ByteBuffer</code> used to handle
>>>> response.
>>>> * @param outputBB <code>ByteBuffer</code>
>>>> */ public void setOutputBB(ByteBuffer outputBB){
>>>> this.outputBB = outputBB;
>>>> }
>>>> /**
>>>> * Set the <code>SSLEngine</code>.
>>>> * @return <code>SSLEngine</code>
>>>> */
>>>> public SSLEngine getSSLEngine() {
>>>> return sslEngine;
>>>> }
>>>>
>>>> /**
>>>> * Get the <code>SSLEngine</code>.
>>>> * @param sslEngine <code>SSLEngine</code>
>>>> */
>>>> public void setSSLEngine(SSLEngine sslEngine) {
>>>> this.sslEngine = sslEngine;
>>>> } public String getThreadId() {
>>>> return threadId;
>>>> }
>>>>
>>>> public void setThreadId(String threadId) {
>>>> this.threadId = threadId;
>>>> }
>>>>
>>>> public void setTimeout(long timeout){
>>>> this.timeout = timeout;
>>>> }
>>>> }
>>>>
>>>>
>>>> ------------------------------------------------------------------------
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>


-- 
Charlie Hunt
Java Performance Engineer
630.285.7708 x47708 (Internal)
<http://java.sun.com/docs/performance/>