Coverage Report - com.sun.grizzly.DefaultPipeline
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultPipeline
52 %
63/122
57 %
23/40
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 package com.sun.grizzly;
 39  
 
 40  
 import com.sun.grizzly.util.WorkerThreadImpl;
 41  
 
 42  
 import java.nio.channels.SelectionKey;
 43  
 import java.util.LinkedList;
 44  
 import java.util.concurrent.Callable;
 45  
 import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
 46  
 import java.util.logging.Level;
 47  
 
 48  
 /**
 49  
  * Simple Thread Pool based on the wait/notify/synchronized mechanism.
 50  
  *
 51  
  * @author Jean-Francois Arcand
 52  
  */
 53  545067
 public class DefaultPipeline extends LinkedList<Callable>
 54  
         implements Pipeline<Callable>{
 55  
     
 56  
     
 57  
     /**
 58  
      * The number of thread waiting for works.
 59  
      */
 60  117
     protected int waitingThreads = 0;
 61  
     
 62  
     
 63  
     /**
 64  
      * The maximum number of Thread
 65  
      */
 66  117
     protected int maxThreads = 20;
 67  
     
 68  
     
 69  
     /**
 70  
      * The minimum numbers of {@link WorkerThreadImpl}
 71  
      */
 72  117
     protected int minThreads = 5;
 73  
     
 74  
     
 75  
     /**
 76  
      * The minimum numbers of spare {@link WorkerThreadImpl}
 77  
      */
 78  117
     protected int minSpareThreads = 2;
 79  
     
 80  
     
 81  
     /**
 82  
      * The port used.
 83  
      */
 84  117
     protected int port = 8080;
 85  
     
 86  
     
 87  
     /**
 88  
      * The number of {@link WorkerThreadImpl}
 89  
      */
 90  117
     protected int threadCount = 0;
 91  
     
 92  
     
 93  
     /**
 94  
      * The name of this Pipeline
 95  
      */
 96  117
     protected String name = "Grizzly";
 97  
     
 98  
     
 99  
     /**
 100  
      * The Thread Priority
 101  
      */
 102  117
     protected int priority = Thread.NORM_PRIORITY;
 103  
     
 104  
     
 105  
     /**
 106  
      * Has the pipeline already started
 107  
      */
 108  117
     protected boolean isStarted = false;
 109  
     
 110  
     
 111  
     /**
 112  
      * {@link WorkerThreadImpl} amanged by this pipeline.
 113  
      */
 114  
     protected transient WorkerThreadImpl[] workerThreads;
 115  
     
 116  
     
 117  
     /**
 118  
      * Maximum pending connection before refusing requests.
 119  
      */
 120  117
     protected int maxQueueSizeInBytes = -1;
 121  
     
 122  
     
 123  
     /**
 124  
      * The increment number used when adding new thread.
 125  
      */
 126  117
     protected int threadsIncrement = 1;
 127  
     
 128  
     /**
 129  
      * The initial ByteBuffer size for newly created WorkerThread instances
 130  
      */
 131  117
     protected int initialByteBufferSize = 8192;
 132  
     
 133  
     
 134  
     /**
 135  
      * The {@link ByteBufferType}
 136  
      */
 137  117
     private ByteBufferType byteBufferType = ByteBufferType.HEAP_VIEW;
 138  
     
 139  
     
 140  
     // ------------------------------------------------------- Constructor -----/
 141  
     
 142  
     public DefaultPipeline(){
 143  117
         super();
 144  117
     }
 145  
     
 146  
     public DefaultPipeline(int maxThreads, int minThreads, String name,
 147  0
             int port, int priority){
 148  
         
 149  0
         this.maxThreads = maxThreads;
 150  0
         this.port = port;
 151  0
         this.name = name;
 152  0
         this.minThreads = minThreads;
 153  0
         this.priority = priority;
 154  
         
 155  0
         if ( minThreads < minSpareThreads )
 156  0
             minSpareThreads = minThreads;
 157  
         
 158  0
     }
 159  
     
 160  
     
 161  
     public DefaultPipeline(int maxThreads, int minThreads, String name,
 162  
             int port){        
 163  0
         this(maxThreads,minThreads,name,port,Thread.NORM_PRIORITY);
 164  0
     }
 165  
     
 166  
     
 167  
     // ------------------------------------------------ Lifecycle ------------/
 168  
     
 169  
     
 170  
     /**
 171  
      * Init the {@link Pipeline} by initializing the required
 172  
      * {@link WorkerThreadImpl}. Default value is {@link #maxThreads}
 173  
      */
 174  
     public synchronized void initPipeline(){
 175  
         
 176  99
         if (isStarted){
 177  30
             return;
 178  
         }
 179  
         
 180  69
         if (minThreads > maxThreads) {
 181  0
             minThreads = maxThreads;
 182  
         }
 183  
         
 184  69
         workerThreads = new WorkerThreadImpl[maxThreads];
 185  69
         increaseWorkerThread(minThreads, false);
 186  69
     }
 187  
     
 188  
     
 189  
     /**
 190  
      * Start the {@link Pipeline} and all associated
 191  
      * {@link WorkerThreadImpl}
 192  
      */
 193  
     public synchronized void startPipeline(){
 194  99
         if (!isStarted) {
 195  414
             for (int i=0; i < minThreads; i++){
 196  345
                 workerThreads[i].start();
 197  
             }
 198  69
             isStarted = true;
 199  
         }
 200  99
     }
 201  
     
 202  
     
 203  
     /**
 204  
      * Stop the {@link Pipeline} and all associated
 205  
      * {@link WorkerThreadImpl}
 206  
      */
 207  
     public synchronized void stopPipeline(){
 208  117
         if (isStarted) {            
 209  69
             isStarted = false;
 210  422
             for (int i=0; i < threadCount; i++){
 211  353
                 workerThreads[i].terminate();
 212  
             }
 213  
         }
 214  117
         threadCount = 0;
 215  117
         waitingThreads = 0;
 216  117
         notifyAll();
 217  117
     }
 218  
     
 219  
     
 220  
     /**
 221  
      * Create new {@link WorkerThreadImpl}. This method must be invoked
 222  
      * from a synchronized block.
 223  
      * @param increment - how many additional {@link WorkerThreadImpl}
 224  
      * objects to add
 225  
      * @param startThread - should newly added {@link WorkerThreadImpl}
 226  
      * objects be started after creation?
 227  
      */
 228  
     protected void increaseWorkerThread(int increment, boolean startThread){
 229  
         WorkerThreadImpl workerThread;
 230  77
         int currentCount = threadCount;
 231  77
         int increaseCount = threadCount + increment;
 232  430
         for (int i=currentCount; i < increaseCount; i++){
 233  353
             workerThread = new WorkerThreadImpl(this,
 234  
                     name + "WorkerThread-"  + port + "-" + i, initialByteBufferSize);
 235  353
             workerThread.setByteBufferType(byteBufferType);
 236  353
             workerThread.setPriority(priority);
 237  
             
 238  353
             if (startThread)
 239  8
                 workerThread.start();
 240  
             
 241  353
             workerThreads[i] = workerThread;
 242  353
             threadCount++;
 243  
         }
 244  77
     }
 245  
     
 246  
     
 247  
     /**
 248  
      * Interrupt the {@link Thread} using it thread id
 249  
      * @param threadID - id of {@link Thread} to interrupt
 250  
      * @return boolean, was Thread interrupted successfully ?
 251  
      */
 252  
     @SuppressWarnings("empty-statement")
 253  
     public synchronized boolean interruptThread(long threadID){
 254  0
         ThreadGroup threadGroup = workerThreads[0].getThreadGroup();
 255  0
         Thread[] threads = new Thread[threadGroup.activeCount()];
 256  0
         threadGroup.enumerate(threads);
 257  
         
 258  0
         for (Thread thread: threads){
 259  0
             if ( thread != null && thread.getId() == threadID ){
 260  0
                 if ( Thread.State.RUNNABLE != thread.getState()){
 261  
                     try{
 262  0
                         thread.interrupt();
 263  0
                         return true;
 264  0
                     } catch (Throwable t){
 265  0
                         Controller.logger().log(Level.FINE,"interruptThread",t);
 266  
                     }
 267  
                 }
 268  
             }
 269  
         }
 270  0
         return false;
 271  
     }
 272  
     
 273  
     
 274  
     // ---------------------------------------------------- Queue ------------//
 275  
     
 276  
     
 277  
     /**
 278  
      * Add an object to this pipeline
 279  
      * @param callable a {@link Callable} to add to this Pipeline
 280  
      * @throws {@link PipelineFullException} if Pipeline is full
 281  
      */
 282  
     public synchronized void execute(Callable callable) throws PipelineFullException {
 283  156189
         int queueSize =  size();
 284  156189
         if (maxQueueSizeInBytes != -1 && maxQueueSizeInBytes < queueSize){
 285  0
             throw new PipelineFullException("Queue is full");
 286  
         }
 287  
         
 288  156189
         addLast(callable);
 289  156189
         notify();
 290  
         
 291  
         // Create worker threads if we know we will run out of them
 292  156189
         if (threadCount < maxThreads && waitingThreads < (queueSize + 1)){
 293  8
             int left = maxThreads - threadCount;
 294  8
             if (threadsIncrement > left){
 295  0
                 threadsIncrement = left;
 296  
             }
 297  8
             increaseWorkerThread(threadsIncrement,true);
 298  
         }
 299  156189
     }
 300  
     
 301  
     
 302  
     /**
 303  
      * Return a {@link Callable} object available in the pipeline.
 304  
      * All Threads will synchronize on that method
 305  
      * @return {@link Callable}
 306  
      */
 307  
     public synchronized Callable waitForIoTask() {
 308  388886
         if (size() - waitingThreads <= 0) {
 309  
             try {
 310  156529
                 waitingThreads++;
 311  156529
                 wait();
 312  0
             }  catch (InterruptedException e)  {
 313  0
                 Thread.currentThread().interrupt();
 314  156528
             }
 315  156528
             waitingThreads--;
 316  
         }
 317  388885
         return poll();
 318  
     }
 319  
     
 320  
     
 321  
     /**
 322  
      * Invoked when the {@link SelectionKeyHandler} is about to expire a SelectionKey.
 323  
      * @param key - A {@link SelectionKey} to expire
 324  
      * @return true if the {@link SelectionKeyHandler} should expire the 
 325  
      *         {@link SelectionKey}, false if not.
 326  
      */
 327  
     public boolean expireKey(SelectionKey key){
 328  0
         return true;
 329  
     }
 330  
     
 331  
     
 332  
     /**
 333  
      * Return <tt>true</tt> if the size of this {@link LinkedList}
 334  
      * minus the current waiting threads is lower than zero.
 335  
      */
 336  
     @Override
 337  
     public boolean isEmpty() {
 338  0
         return  (size() - getWaitingThread() <= 0);
 339  
     }
 340  
     
 341  
     
 342  
     // --------------------------------------------------Properties ----------//
 343  
     
 344  
     
 345  
     /**
 346  
      * Return the number of waiting threads.
 347  
      * @return number of waiting threads
 348  
      */
 349  
     public synchronized int getWaitingThread(){
 350  0
         return waitingThreads;
 351  
     }
 352  
     
 353  
     
 354  
     /**
 355  
      * Set the number of threads used by this pipeline.
 356  
      * @param maxThreads maximum number of threads to use
 357  
      */
 358  
     public synchronized void setMaxThreads(int maxThreads){
 359  1
         this.maxThreads = maxThreads;
 360  1
     }
 361  
     
 362  
     
 363  
     /**
 364  
      * Return the number of threads used by this pipeline.
 365  
      * @return maximum number of threads
 366  
      */
 367  
     public synchronized int getMaxThreads(){
 368  0
         return maxThreads;
 369  
     }
 370  
     
 371  
     /**
 372  
      * Return current thread count
 373  
      * @return current thread count
 374  
      */
 375  
     public synchronized int getCurrentThreadCount() {
 376  0
         return threadCount;
 377  
     }
 378  
     
 379  
     
 380  
     /**
 381  
      * Return the curent number of threads that are currently processing
 382  
      * a task.
 383  
      * @return current busy thread count
 384  
      */
 385  
     public synchronized int getCurrentThreadsBusy(){
 386  0
         return (threadCount - waitingThreads);
 387  
     }
 388  
     
 389  
     
 390  
     /**
 391  
      * Return the maximum spare thread.
 392  
      * @return maximum spare thread count
 393  
      */
 394  
     public synchronized int getMaxSpareThreads() {
 395  0
         return maxThreads;
 396  
     }
 397  
     
 398  
     
 399  
     /**
 400  
      * Return the minimum spare thread.
 401  
      * @return minimum spare thread count
 402  
      */
 403  
     public synchronized int getMinSpareThreads() {
 404  0
         return minSpareThreads;
 405  
     }
 406  
     
 407  
     
 408  
     /**
 409  
      * Set the minimum spare thread this {@link Pipeline} can handle.
 410  
      * @param minSpareThreads minimum number of spare threads to handle
 411  
      */
 412  
     public synchronized void setMinSpareThreads(int minSpareThreads) {
 413  0
         this.minSpareThreads = minSpareThreads;
 414  0
     }
 415  
     
 416  
     
 417  
     /**
 418  
      * Set the thread priority of the {@link Pipeline}
 419  
      * @param priority thread priority to use
 420  
      */
 421  
     public synchronized void setPriority(int priority){
 422  0
         this.priority = priority;
 423  0
     }
 424  
     
 425  
     
 426  
     /**
 427  
      * Set the name of this {@link Pipeline}
 428  
      * @param name Pipeline name to use
 429  
      */
 430  
     public synchronized void setName(String name){
 431  0
         this.name = name;
 432  0
     }
 433  
     
 434  
     
 435  
     /**
 436  
      * Return the name of this {@link Pipeline}
 437  
      * @return the name of this {@link Pipeline}
 438  
      */
 439  
     public synchronized String getName(){
 440  0
         return name+port;
 441  
     }
 442  
     
 443  
     
 444  
     /**
 445  
      * Set the port used by this {@link Pipeline}
 446  
      * @param port the port used by this {@link Pipeline}
 447  
      */
 448  
     public synchronized void setPort(int port){
 449  0
         this.port = port;
 450  0
     }
 451  
     
 452  
     
 453  
     /**
 454  
      * Set the minimum thread this {@link Pipeline} will creates
 455  
      * when initializing.
 456  
      * @param minThreads the minimum number of threads.
 457  
      */
 458  
     public synchronized void setMinThreads(int minThreads){
 459  0
         this.minThreads = minThreads;
 460  0
     }
 461  
     
 462  
     
 463  
     @Override
 464  
     public String toString(){
 465  0
         return "name: " + name + " maxThreads: " + maxThreads
 466  
                 + " type: " + this.getClass().getName();
 467  
     }
 468  
     
 469  
     
 470  
     /**
 471  
      * Set the number the {@link Pipeline} will use when increasing the
 472  
      * thread pool
 473  
      * @param threadsIncrement amount to increase thread pool by
 474  
      */
 475  
     public synchronized void setThreadsIncrement(int threadsIncrement){
 476  0
         this.threadsIncrement = threadsIncrement;
 477  0
     }
 478  
     
 479  
     
 480  
     /**
 481  
      * The number of {@link Task} currently queued
 482  
      * @return number of queued connections
 483  
      */
 484  
     public synchronized int getTaskQueuedCount(){
 485  0
         return size();
 486  
     }
 487  
     
 488  
     
 489  
     /**
 490  
      * Set the maximum pending connection this {@link Pipeline}
 491  
      * can handle.
 492  
      * @param maxQueueSizeInBytesCount maximum queue size (in bytes) this
 493  
      * Pipeline should use
 494  
      */
 495  
     public synchronized void setMaxQueueSize(int maxQueueSizeInBytesCount){
 496  0
         this.maxQueueSizeInBytes = maxQueueSizeInBytesCount;
 497  0
     }
 498  
     
 499  
     
 500  
     /**
 501  
      * Get the maximum pending connections this {@link Pipeline}
 502  
      * can handle.
 503  
      * @return maximum queue size (in bytes) this Pipeline is using
 504  
      */
 505  
     public synchronized int geMaxQueueSize(){
 506  0
         return maxQueueSizeInBytes;
 507  
     }
 508  
     
 509  
     /**
 510  
      * Get the initial WorkerThreadImpl {@link ByteBuffer} size
 511  
      * @return initial WorkerThreadImpl <code>ByteBuffwaitingThreadser</code> size
 512  
      */
 513  
     public synchronized int getInitialByteBufferSize(){
 514  0
         return initialByteBufferSize;
 515  
     }
 516  
     
 517  
     /**
 518  
      * Set the initial WorkerThreadImpl {@link ByteBuffer} size
 519  
      * @param size initial WorkerThreadImpl {@link ByteBuffer} size
 520  
      */
 521  
     public synchronized void setInitialByteBufferSize(int size){
 522  0
         initialByteBufferSize = size;
 523  0
     }
 524  
     
 525  
     
 526  
     
 527  
     /**
 528  
      * The {@link ByteBufferTypel} used to create the {@link ByteBuffer}
 529  
      * associated with {@link WorkerThreadImpl}s created by this instance.
 530  
      * @return The {@link ByteBufferTypel} used to create the {@link ByteBuffer}
 531  
      * associated with {@link WorkerThreadImpl}s created by this instance.
 532  
      */
 533  
     public ByteBufferType getByteBufferType() {
 534  0
         return byteBufferType;
 535  
     }
 536  
 
 537  
     
 538  
     /**
 539  
      * Set the {@link ByteBufferTypel} to use when creating the
 540  
      * {@link ByteBuffer} associated with {@link WorkerThreadImpl}s 
 541  
      * created by this instance.
 542  
      * @param byteBufferType The ByteBuffer type.
 543  
      */
 544  
     public void setByteBufferType(ByteBufferType byteBufferType) {
 545  0
         this.byteBufferType = byteBufferType;
 546  0
     }
 547  
 }