Coverage Report - com.sun.grizzly.util.WorkerThreadImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
WorkerThreadImpl
76 %
71/93
89 %
34/38
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 package com.sun.grizzly.util;
 39  
 
 40  
 import com.sun.grizzly.Controller;
 41  
 import com.sun.grizzly.Pipeline;
 42  
 import java.util.concurrent.Callable;
 43  
 import java.util.logging.Level;
 44  
 import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
 45  
 import com.sun.grizzly.util.ThreadAttachment.Mode;
 46  
 
 47  
 /**
 48  
  * Simple worker thread used for processing HTTP requests. All threads are
 49  
  * synchronized using a {@link Pipeline} object
 50  
  *
 51  
  * @author Jean-Francois Arcand
 52  
  */
 53  
 public class WorkerThreadImpl extends WorkerThread {
 54  
     
 55  
     private static final int DEFAULT_BYTE_BUFFER_SIZE = 8192;
 56  
     
 57  
     /**
 58  
      * What will be run.
 59  
      */
 60  
     protected Runnable target;
 61  
     
 62  
     
 63  
     /**
 64  
      * The {@link Pipeline} on which this thread synchronize.
 65  
      */
 66  
     protected Pipeline<Callable> pipeline;
 67  
     
 68  
     
 69  
     /**
 70  
      * Looing variable.
 71  
      */
 72  352
     protected volatile boolean execute = true;
 73  
     
 74  
     
 75  
     /**
 76  
      * The <code>ThreadGroup</code> used.
 77  
      */
 78  1
     protected final static ThreadGroup threadGroup = new ThreadGroup("Grizzly");
 79  
     
 80  
 
 81  
     /**
 82  
      * The state/attributes on this WorkerThread.
 83  
      */
 84  
     private ThreadAttachment threadAttachment;
 85  
     
 86  
     
 87  
     /**
 88  
      * The ByteBufferType used when creating the ByteBuffer attached to this object.
 89  
      */
 90  352
     private ByteBufferType byteBufferType = ByteBufferType.HEAP_VIEW;
 91  
     
 92  
     
 93  
     /**
 94  
      * The size of the ByteBuffer attached to this object.
 95  
      */
 96  
     private int initialByteBufferSize;
 97  
     
 98  
     
 99  
     /**
 100  
      * Create a Thread that will synchronizes/block on
 101  
      * {@link Pipeline} instance.
 102  
      * @param threadGroup <code>ThreadGroup</code>
 103  
      * @param runnable <code>Runnable</code>
 104  
      */
 105  
     public WorkerThreadImpl(ThreadGroup threadGroup, Runnable runnable){
 106  0
         this(threadGroup, runnable, DEFAULT_BYTE_BUFFER_SIZE);
 107  0
     }
 108  
     
 109  
     /**
 110  
      * Create a Thread that will synchronizes/block on
 111  
      * {@link Pipeline} instance.
 112  
      * @param threadGroup <code>ThreadGroup</code>
 113  
      * @param runnable <code>Runnable</code>
 114  
      * @param initialByteBufferSize initial {@link ByteBuffer} size
 115  
      */
 116  
     public WorkerThreadImpl(ThreadGroup threadGroup, Runnable runnable, 
 117  
             int initialByteBufferSize){
 118  0
         super(threadGroup, runnable);
 119  0
         setDaemon(true);
 120  0
         target = runnable;
 121  0
         this.initialByteBufferSize = initialByteBufferSize;
 122  0
     }
 123  
     
 124  
     /**
 125  
      * Create a Thread that will synchronizes/block on
 126  
      * {@link Pipeline} instance.
 127  
      * @param pipeline {@link Pipeline}
 128  
      * @param name <code>String</code>
 129  
      */
 130  
     public WorkerThreadImpl(Pipeline<Callable> pipeline, String name){
 131  0
         this(pipeline, name, DEFAULT_BYTE_BUFFER_SIZE);
 132  0
     }
 133  
     
 134  
     /**
 135  
      * Create a Thread that will synchronizes/block on
 136  
      * {@link Pipeline} instance.
 137  
      * @param pipeline {@link Pipeline}
 138  
      * @param name <code>String</code>
 139  
      * @param initialByteBufferSize initial {@link ByteBuffer} size
 140  
      */
 141  
     public WorkerThreadImpl(Pipeline<Callable> pipeline, String name, 
 142  
             int initialByteBufferSize){
 143  352
         super(threadGroup, name);
 144  352
         this.pipeline = pipeline;
 145  352
         setDaemon(true);
 146  352
         this.initialByteBufferSize = initialByteBufferSize;
 147  352
     }
 148  
     
 149  
     /**
 150  
      * Create a Thread that will synchronizes/block on
 151  
      * {@link Pipeline} instance.
 152  
      * @param pipeline {@link Pipeline}
 153  
      * @param name <code>String</code>
 154  
      * @param initialByteBufferSize initial {@link ByteBuffer} size
 155  
      */
 156  
     public WorkerThreadImpl(Pipeline<Callable> pipeline, String name,
 157  
             Runnable runnable, int initialByteBufferSize){
 158  0
         super(threadGroup, runnable, name);
 159  0
         target = runnable;
 160  0
         this.pipeline = pipeline;
 161  0
         setDaemon(true);
 162  0
         this.initialByteBufferSize = initialByteBufferSize;
 163  0
     }
 164  
 
 165  
     /**
 166  
      * Execute a {@link Task}.
 167  
      */
 168  
     @Override
 169  
     public void run(){        
 170  352
         if (byteBuffer == null){
 171  352
             byteBuffer = ByteBufferFactory.allocate(byteBufferType,
 172  
                     initialByteBufferSize);
 173  
         }
 174  
         
 175  352
         if (target != null){
 176  0
             target.run();
 177  0
             return;
 178  
         }
 179  
         
 180  357393
         while (execute) {
 181  
             try{
 182  
                 // Wait for a Task to be added to the pipeline.
 183  357041
                 Callable t = pipeline.waitForIoTask();
 184  357041
                 processTask(t);
 185  357041
                 t = null;
 186  0
             } catch (Throwable t) {
 187  0
                 if (execute) {
 188  0
                     Controller.logger().log(Level.SEVERE,
 189  
                             "WorkerThreadImpl unexpected exception: ",t);
 190  
                 } else {
 191  0
                     Controller.logger().log(Level.FINE,
 192  
                             "WorkerThreadImpl unexpected exception, when WorderThread supposed to be closed: ",t);
 193  
                 }
 194  
             } finally {
 195  357041
                 reset();
 196  357041
             }
 197  
         }
 198  351
     }
 199  
     
 200  
     
 201  
     /**
 202  
      * Stop this thread. If this Thread is performing atask, the task will be
 203  
      * completed.
 204  
      */
 205  
     public void terminate(){
 206  352
         execute = false;
 207  352
     }
 208  
             
 209  
     public ThreadAttachment updateAttachment(int mode) {
 210  159383
         ThreadAttachment currentAttachment = getAttachment();
 211  159383
         currentAttachment.reset();
 212  
 
 213  159383
         if ((mode & Mode.BYTE_BUFFER) != 0) {
 214  12
             currentAttachment.setByteBuffer(byteBuffer);
 215  
         }
 216  
 
 217  159383
         if ((mode & Mode.SSL_ENGINE) != 0) {
 218  159380
             currentAttachment.setSSLEngine(sslEngine);
 219  
         }
 220  
 
 221  159383
         if ((mode & Mode.INPUT_BB) != 0) {
 222  159257
             currentAttachment.setInputBB(inputBB);
 223  
         }
 224  
 
 225  159383
         if ((mode & Mode.OUTPUT_BB) != 0) {
 226  311
             currentAttachment.setOutputBB(outputBB);
 227  
         }
 228  
 
 229  159383
         currentAttachment.setMode(mode);
 230  
         
 231  159383
         return currentAttachment;
 232  
     }
 233  
     
 234  
     public ThreadAttachment getAttachment() {
 235  265568
         if (threadAttachment == null) {
 236  132
             threadAttachment = new ThreadAttachment();
 237  
         }
 238  
 
 239  265568
         return threadAttachment;
 240  
     }
 241  
 
 242  
     public ThreadAttachment detach() {
 243  53093
         ThreadAttachment currentAttachment = getAttachment();
 244  53093
         int mode = currentAttachment.getMode();
 245  53093
         updateAttachment(mode);
 246  
         
 247  
         // Re-create a new ByteBuffer
 248  53093
         if ((mode & Mode.BYTE_BUFFER) != 0) {
 249  11
             byteBuffer = ByteBufferFactory.allocate(byteBufferType,
 250  
                     initialByteBufferSize);
 251  
         }
 252  
         
 253  53093
         if ((mode & Mode.SSL_ENGINE) != 0) {
 254  53091
             sslEngine = null;
 255  
         }
 256  
         
 257  53093
         if ((mode & Mode.INPUT_BB) != 0) {
 258  53091
             inputBB = null;
 259  
         }
 260  
         
 261  53093
         if ((mode & Mode.OUTPUT_BB) != 0) {
 262  110
             outputBB = null;
 263  
         }
 264  
         
 265  
         // Switch to the new ThreadAttachment.
 266  53093
         this.threadAttachment = null;
 267  
         
 268  53093
         currentAttachment.deassociate();
 269  53093
         return currentAttachment;
 270  
     }
 271  
 
 272  
 
 273  
     public void attach(ThreadAttachment threadAttachment) {
 274  53083
         threadAttachment.associate();
 275  53083
         int mode = threadAttachment.getMode();
 276  
         
 277  53083
         if ((mode & Mode.BYTE_BUFFER) != 0) {
 278  4
             byteBuffer = threadAttachment.getByteBuffer();
 279  
         }
 280  
         
 281  53083
         if ((mode & Mode.SSL_ENGINE) != 0) {
 282  53081
             sslEngine = threadAttachment.getSSLEngine();
 283  
         }
 284  
         
 285  53083
         if ((mode & Mode.INPUT_BB) != 0) {
 286  53081
             inputBB = threadAttachment.getInputBB();
 287  
         }
 288  
         
 289  53083
         if ((mode & Mode.OUTPUT_BB) != 0) {
 290  103
             outputBB = threadAttachment.getOutputBB();
 291  
         }
 292  
         
 293  53083
         this.threadAttachment = threadAttachment;   
 294  53083
     }
 295  
 
 296  
     
 297  
     /**
 298  
      * The <code>ByteBufferType</code> used to create the {@link ByteBuffer}
 299  
      * associated with this object.
 300  
      * @return The <code>ByteBufferType</code> used to create the {@link ByteBuffer}
 301  
      * associated with this object.
 302  
      */
 303  
     public ByteBufferType getByteBufferType() {
 304  0
         return byteBufferType;
 305  
     }
 306  
 
 307  
     
 308  
     /**
 309  
      * Set the <code>ByteBufferType</code> to use when creating the
 310  
      * {@link ByteBuffer} associated with this object.
 311  
      * @param byteBufferType The ByteBuffer type.
 312  
      */
 313  
     public void setByteBufferType(ByteBufferType byteBufferType) {
 314  352
         this.byteBufferType = byteBufferType;
 315  352
     }
 316  
 
 317  
 
 318  
     /**
 319  
      * Processes the given task.
 320  
      *
 321  
      * @param t the task to process
 322  
      */
 323  
     protected void processTask(Callable t) throws Exception {
 324  357040
         if (t != null){
 325  161370
             t.call();
 326  
         }
 327  357040
     }
 328  
 
 329  
 
 330  
     @Override
 331  
     protected void reset() {
 332  357040
         if (threadAttachment != null) {
 333  
             /** 
 334  
              * ThreadAttachment was created during prev. processing and wasn't
 335  
              * detached. It could happen due to some error - we need to release
 336  
              * the ThreadAttachment association with the current thread
 337  
              */
 338  122
             threadAttachment.deassociate();
 339  
         }
 340  
         
 341  357041
         threadAttachment = null;
 342  357041
         super.reset();
 343  357040
     }
 344  
 }
 345