Coverage Report - com.sun.grizzly.DefaultPipeline
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultPipeline
50 %
64/129
55 %
23/42
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 package com.sun.grizzly;
 39  
 
 40  
 import com.sun.grizzly.util.WorkerThreadImpl;
 41  
 
 42  
 import java.nio.channels.SelectionKey;
 43  
 import java.util.LinkedList;
 44  
 import java.util.concurrent.Callable;
 45  
 import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
 46  
 import java.util.logging.Level;
 47  
 
 48  
 /**
 49  
  * Simple Thread Pool based on the wait/notify/synchronized mechanism.
 50  
  *
 51  
  * @author Jean-Francois Arcand
 52  
  */
 53  518412
 public class DefaultPipeline extends LinkedList<Callable>
 54  
         implements Pipeline<Callable>{
 55  
     
 56  
     
 57  
     /**
 58  
      * The number of thread waiting for works.
 59  
      */
 60  117
     protected int waitingThreads = 0;
 61  
     
 62  
     
 63  
     /**
 64  
      * The maximum number of Thread
 65  
      */
 66  117
     protected int maxThreads = 20;
 67  
     
 68  
     
 69  
     /**
 70  
      * The minimum numbers of {@link WorkerThreadImpl}
 71  
      */
 72  117
     protected int minThreads = 5;
 73  
     
 74  
     
 75  
     /**
 76  
      * The minimum numbers of spare {@link WorkerThreadImpl}
 77  
      */
 78  117
     protected int minSpareThreads = 2;
 79  
     
 80  
     
 81  
     /**
 82  
      * The port used.
 83  
      */
 84  117
     protected int port = 8080;
 85  
     
 86  
     
 87  
     /**
 88  
      * The number of {@link WorkerThreadImpl}
 89  
      */
 90  117
     protected int threadCount = 0;
 91  
     
 92  
     
 93  
     /**
 94  
      * The name of this Pipeline
 95  
      */
 96  117
     protected String name = "Grizzly";
 97  
     
 98  
     
 99  
     /**
 100  
      * The Thread Priority
 101  
      */
 102  117
     protected int priority = Thread.NORM_PRIORITY;
 103  
     
 104  
     
 105  
     /**
 106  
      * Has the pipeline already started
 107  
      */
 108  117
     protected boolean isStarted = false;
 109  
     
 110  
     
 111  
     /**
 112  
      * {@link WorkerThreadImpl} amanged by this pipeline.
 113  
      */
 114  
     protected transient WorkerThreadImpl[] workerThreads;
 115  
     
 116  
     
 117  
     /**
 118  
      * Maximum pending connection before refusing requests.
 119  
      */
 120  117
     protected int maxQueueSizeInBytes = -1;
 121  
     
 122  
     
 123  
     /**
 124  
      * The increment number used when adding new thread.
 125  
      */
 126  117
     protected int threadsIncrement = 1;
 127  
     
 128  
     /**
 129  
      * The initial ByteBuffer size for newly created WorkerThread instances
 130  
      */
 131  117
     protected int initialByteBufferSize = 8192;
 132  
     
 133  
     
 134  
     /**
 135  
      * The {@link ByteBufferType}
 136  
      */
 137  117
     private ByteBufferType byteBufferType = ByteBufferType.HEAP_VIEW;
 138  
     
 139  
     
 140  
     // ------------------------------------------------------- Constructor -----/
 141  
     
 142  
     public DefaultPipeline(){
 143  117
         super();
 144  117
     }
 145  
     
 146  
     public DefaultPipeline(int maxThreads, int minThreads, String name,
 147  0
             int port, int priority){
 148  
         
 149  0
         this.maxThreads = maxThreads;
 150  0
         this.port = port;
 151  0
         this.name = name;
 152  0
         this.minThreads = minThreads;
 153  0
         this.priority = priority;
 154  
         
 155  0
         if ( minThreads < minSpareThreads )
 156  0
             minSpareThreads = minThreads;
 157  
         
 158  0
     }
 159  
     
 160  
     
 161  
     public DefaultPipeline(int maxThreads, int minThreads, String name,
 162  
             int port){        
 163  0
         this(maxThreads,minThreads,name,port,Thread.NORM_PRIORITY);
 164  0
     }
 165  
     
 166  
     
 167  
     // ------------------------------------------------ Lifecycle ------------/
 168  
     
 169  
     
 170  
     /**
 171  
      * Init the {@link Pipeline} by initializing the required
 172  
      * {@link WorkerThreadImpl}. Default value is {@link #maxThreads}
 173  
      */
 174  
     public synchronized void initPipeline(){
 175  
         
 176  99
         if (isStarted){
 177  30
             return;
 178  
         }
 179  
         
 180  69
         if (minThreads > maxThreads) {
 181  0
             minThreads = maxThreads;
 182  
         }
 183  
         
 184  69
         workerThreads = new WorkerThreadImpl[maxThreads];
 185  69
         increaseWorkerThread(minThreads, false);
 186  69
     }
 187  
     
 188  
     
 189  
     /**
 190  
      * Start the {@link Pipeline} and all associated
 191  
      * {@link WorkerThreadImpl}
 192  
      */
 193  
     public synchronized void startPipeline(){
 194  99
         if (!isStarted) {
 195  414
             for (int i=0; i < minThreads; i++){
 196  345
                 workerThreads[i].start();
 197  
             }
 198  69
             isStarted = true;
 199  
         }
 200  99
     }
 201  
     
 202  
     
 203  
     /**
 204  
      * Stop the {@link Pipeline} and all associated
 205  
      * {@link WorkerThreadImpl}
 206  
      */
 207  
     public synchronized void stopPipeline(){
 208  117
         if (isStarted) {            
 209  69
             isStarted = false;
 210  421
             for (int i=0; i < threadCount; i++){
 211  352
                 workerThreads[i].terminate();
 212  
             }
 213  
         }
 214  117
         threadCount = 0;
 215  117
         waitingThreads = 0;
 216  117
         notifyAll();
 217  117
     }
 218  
     
 219  
     
 220  
     /**
 221  
      * Create new {@link WorkerThreadImpl}. This method must be invoked
 222  
      * from a synchronized block.
 223  
      * @param increment - how many additional {@link WorkerThreadImpl}
 224  
      * objects to add
 225  
      * @param startThread - should newly added {@link WorkerThreadImpl}
 226  
      * objects be started after creation?
 227  
      */
 228  
     protected void increaseWorkerThread(int increment, boolean startThread){
 229  
         WorkerThreadImpl workerThread;
 230  76
         int currentCount = threadCount;
 231  76
         int increaseCount = threadCount + increment;
 232  428
         for (int i=currentCount; i < increaseCount; i++){
 233  352
             workerThread = createWorkerThread(
 234  
                     name + "WorkerThread-"  + port + "-" + i,
 235  
                     initialByteBufferSize);
 236  352
             workerThread.setByteBufferType(byteBufferType);
 237  352
             workerThread.setPriority(priority);
 238  
             
 239  352
             if (startThread)
 240  7
                 workerThread.start();
 241  
             
 242  352
             workerThreads[i] = workerThread;
 243  352
             threadCount++;
 244  
         }
 245  76
     }
 246  
     
 247  
     
 248  
     /**
 249  
      * Interrupt the {@link Thread} using it thread id
 250  
      * @param threadID - id of {@link Thread} to interrupt
 251  
      * @return boolean, was Thread interrupted successfully ?
 252  
      */
 253  
     @SuppressWarnings("empty-statement")
 254  
     public synchronized boolean interruptThread(long threadID){
 255  0
         ThreadGroup threadGroup = workerThreads[0].getThreadGroup();
 256  0
         Thread[] threads = new Thread[threadGroup.activeCount()];
 257  0
         threadGroup.enumerate(threads);
 258  
         
 259  0
         for (Thread thread: threads){
 260  0
             if ( thread != null && thread.getId() == threadID ){
 261  0
                 if ( Thread.State.RUNNABLE != thread.getState()){
 262  
                     try{
 263  0
                         thread.interrupt();
 264  0
                         return true;
 265  0
                     } catch (Throwable t){
 266  0
                         Controller.logger().log(Level.FINE,"interruptThread",t);
 267  
                     }
 268  
                 }
 269  
             }
 270  
         }
 271  0
         return false;
 272  
     }
 273  
     
 274  
     
 275  
     // ---------------------------------------------------- Queue ------------//
 276  
     
 277  
     
 278  
     /**
 279  
      * Add a {@link Callable} to the execution queue. If there is thread available,
 280  
      * the Callable will be automatically executed. If there is no thread available,
 281  
      * the Callable will be queued until a thread becomes free. If the queue
 282  
      * reach it's maximum (see {@link #getMaxQueueSize}), the Callable will 
 283  
      * be rejected. 
 284  
      * 
 285  
      * If the Callable is an instance of {@link ContextTask}, the associated
 286  
      * {@link SelectionKey} will be cancelled and the {@link Context} 
 287  
      * recycled. Finally a {@link PipelineFullException} will be propagated.
 288  
      * 
 289  
      * @param callable a {@link Callable} to add to this Pipeline
 290  
      * @throws {@link PipelineFullException} if Pipeline is full
 291  
      */
 292  
     public synchronized void execute(Callable callable) throws PipelineFullException {
 293  161370
         int queueSize =  size();
 294  161370
         if (maxQueueSizeInBytes != -1 && maxQueueSizeInBytes <= queueSize){
 295  0
             Controller.logger().log(Level.WARNING, "All threads are busy and" +
 296  
                     " the waiting queue is full: " + maxQueueSizeInBytes 
 297  
                     + ". Increase the " +
 298  
                     "DefaultPipeline.setMaxQueueSize to remove this message.");
 299  0
             if (callable instanceof ContextTask){
 300  0
                 ContextTask ct = (ContextTask) callable;
 301  0
                 SelectionKey key = ct.getContext().getSelectionKey();
 302  
                 // Cancel the key.
 303  0
                 ct.getContext().getSelectorHandler().getSelectionKeyHandler().cancel(key);
 304  0
                 ct.recycle();
 305  
             }
 306  0
             throw new PipelineFullException("Processing Queue is full");
 307  
         }
 308  
         
 309  161370
         addLast(callable);
 310  161370
         notify();
 311  
         
 312  
         // Create worker threads if we know we will run out of them
 313  161370
         if (threadCount < maxThreads && waitingThreads < (queueSize + 1)){
 314  7
             int left = maxThreads - threadCount;
 315  7
             if (threadsIncrement > left){
 316  0
                 threadsIncrement = left;
 317  
             }
 318  7
             increaseWorkerThread(threadsIncrement,true);
 319  
         }
 320  161370
     }
 321  
     
 322  
     
 323  
     /**
 324  
      * Return a {@link Callable} object available in the pipeline.
 325  
      * All Threads will synchronize on that method
 326  
      * @return {@link Callable}
 327  
      */
 328  
     public synchronized Callable waitForIoTask() {
 329  357042
         if (size() - waitingThreads <= 0) {
 330  
             try {
 331  161708
                 waitingThreads++;
 332  161708
                 wait();
 333  0
             }  catch (InterruptedException e)  {
 334  0
                 Thread.currentThread().interrupt();
 335  161707
             }
 336  161707
             waitingThreads--;
 337  
         }
 338  357041
         return poll();
 339  
     }
 340  
     
 341  
     
 342  
     /**
 343  
      * Invoked when the {@link SelectionKeyHandler} is about to expire a SelectionKey.
 344  
      * @param key - A {@link SelectionKey} to expire
 345  
      * @return true if the {@link SelectionKeyHandler} should expire the 
 346  
      *         {@link SelectionKey}, false if not.
 347  
      */
 348  
     public boolean expireKey(SelectionKey key){
 349  0
         return true;
 350  
     }
 351  
     
 352  
     
 353  
     /**
 354  
      * Return <tt>true</tt> if the size of this {@link LinkedList}
 355  
      * minus the current waiting threads is lower than zero.
 356  
      */
 357  
     @Override
 358  
     public boolean isEmpty() {
 359  0
         return  (size() - getWaitingThread() <= 0);
 360  
     }
 361  
     
 362  
     
 363  
     // --------------------------------------------------Properties ----------//
 364  
     
 365  
     
 366  
     /**
 367  
      * Return the number of waiting threads.
 368  
      * @return number of waiting threads
 369  
      */
 370  
     public synchronized int getWaitingThread(){
 371  0
         return waitingThreads;
 372  
     }
 373  
     
 374  
     
 375  
     /**
 376  
      * Set the number of threads used by this pipeline.
 377  
      * @param maxThreads maximum number of threads to use
 378  
      */
 379  
     public synchronized void setMaxThreads(int maxThreads){
 380  1
         this.maxThreads = maxThreads;
 381  1
     }
 382  
     
 383  
     
 384  
     /**
 385  
      * Return the number of threads used by this pipeline.
 386  
      * @return maximum number of threads
 387  
      */
 388  
     public synchronized int getMaxThreads(){
 389  0
         return maxThreads;
 390  
     }
 391  
     
 392  
     /**
 393  
      * Return current thread count
 394  
      * @return current thread count
 395  
      */
 396  
     public synchronized int getCurrentThreadCount() {
 397  0
         return threadCount;
 398  
     }
 399  
     
 400  
     
 401  
     /**
 402  
      * Return the curent number of threads that are currently processing
 403  
      * a task.
 404  
      * @return current busy thread count
 405  
      */
 406  
     public synchronized int getCurrentThreadsBusy(){
 407  0
         return (threadCount - waitingThreads);
 408  
     }
 409  
     
 410  
     
 411  
     /**
 412  
      * Return the maximum spare thread.
 413  
      * @return maximum spare thread count
 414  
      */
 415  
     public synchronized int getMaxSpareThreads() {
 416  0
         return maxThreads;
 417  
     }
 418  
     
 419  
     
 420  
     /**
 421  
      * Return the minimum spare thread.
 422  
      * @return minimum spare thread count
 423  
      */
 424  
     public synchronized int getMinSpareThreads() {
 425  0
         return minSpareThreads;
 426  
     }
 427  
     
 428  
     
 429  
     /**
 430  
      * Set the minimum spare thread this {@link Pipeline} can handle.
 431  
      * @param minSpareThreads minimum number of spare threads to handle
 432  
      */
 433  
     public synchronized void setMinSpareThreads(int minSpareThreads) {
 434  0
         this.minSpareThreads = minSpareThreads;
 435  0
     }
 436  
     
 437  
     
 438  
     /**
 439  
      * Set the thread priority of the {@link Pipeline}
 440  
      * @param priority thread priority to use
 441  
      */
 442  
     public synchronized void setPriority(int priority){
 443  0
         this.priority = priority;
 444  0
     }
 445  
     
 446  
     
 447  
     /**
 448  
      * Set the name of this {@link Pipeline}
 449  
      * @param name Pipeline name to use
 450  
      */
 451  
     public synchronized void setName(String name){
 452  0
         this.name = name;
 453  0
     }
 454  
     
 455  
     
 456  
     /**
 457  
      * Return the name of this {@link Pipeline}
 458  
      * @return the name of this {@link Pipeline}
 459  
      */
 460  
     public synchronized String getName(){
 461  0
         return name+port;
 462  
     }
 463  
     
 464  
     
 465  
     /**
 466  
      * Set the port used by this {@link Pipeline}
 467  
      * @param port the port used by this {@link Pipeline}
 468  
      */
 469  
     public synchronized void setPort(int port){
 470  0
         this.port = port;
 471  0
     }
 472  
     
 473  
     
 474  
     /**
 475  
      * Set the minimum thread this {@link Pipeline} will creates
 476  
      * when initializing.
 477  
      * @param minThreads the minimum number of threads.
 478  
      */
 479  
     public synchronized void setMinThreads(int minThreads){
 480  0
         this.minThreads = minThreads;
 481  0
     }
 482  
     
 483  
     
 484  
     @Override
 485  
     public String toString(){
 486  0
         return "name: " + name + " maxThreads: " + maxThreads
 487  
                 + " type: " + this.getClass().getName();
 488  
     }
 489  
     
 490  
     
 491  
     /**
 492  
      * Set the number the {@link Pipeline} will use when increasing the
 493  
      * thread pool
 494  
      * @param threadsIncrement amount to increase thread pool by
 495  
      */
 496  
     public synchronized void setThreadsIncrement(int threadsIncrement){
 497  0
         this.threadsIncrement = threadsIncrement;
 498  0
     }
 499  
     
 500  
     
 501  
     /**
 502  
      * The number of {@link Task} currently queued
 503  
      * @return number of queued connections
 504  
      */
 505  
     public synchronized int getTaskQueuedCount(){
 506  0
         return size();
 507  
     }
 508  
     
 509  
     
 510  
     /**
 511  
      * Set the maximum pending connection this {@link Pipeline}
 512  
      * can handle.
 513  
      * @param maxQueueSizeInBytesCount maximum queue size (in bytes) this
 514  
      * Pipeline should use
 515  
      */
 516  
     public synchronized void setMaxQueueSize(int maxQueueSizeInBytesCount){
 517  0
         this.maxQueueSizeInBytes = maxQueueSizeInBytesCount;
 518  0
     }
 519  
     
 520  
     
 521  
     /**
 522  
      * Get the maximum pending connections this {@link Pipeline}
 523  
      * can handle.
 524  
      * @return maximum queue size (in bytes) this Pipeline is using
 525  
      */
 526  
     public synchronized int getMaxQueueSize(){
 527  0
         return maxQueueSizeInBytes;
 528  
     }
 529  
     
 530  
     /**
 531  
      * Get the initial WorkerThreadImpl {@link ByteBuffer} size
 532  
      * @return initial WorkerThreadImpl <code>ByteBuffwaitingThreadser</code> size
 533  
      */
 534  
     public synchronized int getInitialByteBufferSize(){
 535  0
         return initialByteBufferSize;
 536  
     }
 537  
     
 538  
     /**
 539  
      * Set the initial WorkerThreadImpl {@link ByteBuffer} size
 540  
      * @param size initial WorkerThreadImpl {@link ByteBuffer} size
 541  
      */
 542  
     public synchronized void setInitialByteBufferSize(int size){
 543  0
         initialByteBufferSize = size;
 544  0
     }
 545  
     
 546  
     
 547  
     
 548  
     /**
 549  
      * The {@link ByteBufferTypel} used to create the {@link ByteBuffer}
 550  
      * associated with {@link WorkerThreadImpl}s created by this instance.
 551  
      * @return The {@link ByteBufferTypel} used to create the {@link ByteBuffer}
 552  
      * associated with {@link WorkerThreadImpl}s created by this instance.
 553  
      */
 554  
     public ByteBufferType getByteBufferType() {
 555  0
         return byteBufferType;
 556  
     }
 557  
 
 558  
     
 559  
     /**
 560  
      * Set the {@link ByteBufferTypel} to use when creating the
 561  
      * {@link ByteBuffer} associated with {@link WorkerThreadImpl}s 
 562  
      * created by this instance.
 563  
      * @param byteBufferType The ByteBuffer type.
 564  
      */
 565  
     public void setByteBufferType(ByteBufferType byteBufferType) {
 566  0
         this.byteBufferType = byteBufferType;
 567  0
     }
 568  
 
 569  
 
 570  
     /**
 571  
      * Creates an instance of WorkerThreadImpl.
 572  
      *
 573  
      * @param name {@link String}
 574  
      * @param initialByteBufferSize initial {@link ByteBuffer} size
 575  
      */
 576  
     protected WorkerThreadImpl createWorkerThread(String name,
 577  
                     int initialByteBufferSize) {
 578  
 
 579  352
         return new WorkerThreadImpl(this, name, initialByteBufferSize);
 580  
     }
 581  
 }