Coverage Report - com.sun.grizzly.ThreadPoolExecutorServicePipeline
 
Classes in this File Line Coverage Branch Coverage Complexity
ThreadPoolExecutorServicePipeline
0 %
0/60
0 %
0/4
0
ThreadPoolExecutorServicePipeline$1
0 %
0/3
N/A
0
ThreadPoolExecutorServicePipeline$WorkerThreadFactory
0 %
0/7
N/A
0
 
 1  
 /*
 2  
  *
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  *
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  *
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  *
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  *
 24  
  * Contributor(s):
 25  
  *
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 
 39  
 package com.sun.grizzly;
 40  
 
 41  
 import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
 42  
 import com.sun.grizzly.util.WorkerThreadImpl;
 43  
 import java.util.concurrent.Callable;
 44  
 import java.util.concurrent.LinkedBlockingQueue;
 45  
 import java.util.concurrent.ThreadFactory;
 46  
 import java.util.concurrent.ThreadPoolExecutor;
 47  
 import java.util.concurrent.TimeUnit;
 48  
 import java.util.concurrent.atomic.AtomicInteger;
 49  
 
 50  
 
 51  
 /**
 52  
  * {@link Pipeline} implementation, based on {@link ThreadPoolExecutor}
 53  
  *
 54  
  * @author Alexey Stashok
 55  
  */
 56  0
 public class ThreadPoolExecutorServicePipeline extends ThreadPoolExecutor
 57  
         implements Pipeline<Callable> {
 58  
     // Min number of worker threads in a pool
 59  0
     private static int DEFAULT_MIN_THREAD_COUNT = 5;
 60  
 
 61  
     // Max number of worker threads in a pool
 62  0
     private static int DEFAULT_MAX_THREAD_COUNT = 20;
 63  
 
 64  
     // Max number of tasks thread pool can enqueue
 65  0
     private static int DEFAULT_MAX_TASKS_QUEUED = Integer.MAX_VALUE;
 66  
 
 67  
     // Timeout, after which idle thread will be stopped and excluded from pool
 68  0
     private static int DEFAULT_IDLE_THREAD_KEEPALIVE_TIMEOUT = 30000;
 69  
 
 70  
     private String name;
 71  
 
 72  
     /**
 73  
      * The port used.
 74  
      */
 75  
     protected int port;
 76  
 
 77  
     private int maxTasksCount;
 78  
 
 79  0
     private AtomicInteger workerThreadCounter = new AtomicInteger();
 80  
 
 81  
     /**
 82  
      * The Thread Priority
 83  
      */
 84  
     protected int priority;
 85  
 
 86  
     /**
 87  
      * The initial ByteBuffer size for newly created WorkerThread instances
 88  
      */
 89  0
     protected int initialByteBufferSize = 8192;
 90  
 
 91  
     /**
 92  
      * The {@link ByteBufferType}
 93  
      */
 94  0
     private ByteBufferType byteBufferType = ByteBufferType.HEAP_VIEW;
 95  
 
 96  
 
 97  
     public ThreadPoolExecutorServicePipeline() {
 98  0
         this(DEFAULT_MIN_THREAD_COUNT, DEFAULT_MAX_THREAD_COUNT, 
 99  
                 DEFAULT_MAX_TASKS_QUEUED, DEFAULT_IDLE_THREAD_KEEPALIVE_TIMEOUT,
 100  
                 TimeUnit.MILLISECONDS);
 101  0
     }
 102  
 
 103  
     public ThreadPoolExecutorServicePipeline(int minThreads,
 104  
             int maxThreads, int maxTasksCount, long keepAliveTime,
 105  
             TimeUnit unit) {
 106  0
         this(minThreads, maxThreads, maxTasksCount, keepAliveTime, unit, 
 107  
                 "Grizzly", 8080, Thread.NORM_PRIORITY);
 108  0
     }
 109  
 
 110  
     public ThreadPoolExecutorServicePipeline(int minThreads,
 111  
             int maxThreads, int maxTasksCount, long keepAliveTime,
 112  
             TimeUnit unit, String name, int port, int priority) {
 113  0
         super(minThreads, maxThreads, keepAliveTime, unit,
 114  
                 new LinkedBlockingQueue<Runnable>(maxTasksCount));
 115  0
         setThreadFactory(new WorkerThreadFactory(this));
 116  0
         this.maxTasksCount = maxTasksCount;
 117  0
         this.name = name;
 118  0
         this.port = port;
 119  0
         this.priority = priority;
 120  0
     }
 121  
 
 122  
     /**
 123  
      * Set the name of this {@link Pipeline}
 124  
      * @param name Pipeline name to use
 125  
      */
 126  
     public synchronized void setName(String name){
 127  0
         this.name = name;
 128  0
     }
 129  
 
 130  
 
 131  
     /**
 132  
      * Return the name of this {@link Pipeline}
 133  
      * @return the name of this {@link Pipeline}
 134  
      */
 135  
     public synchronized String getName(){
 136  0
         return name+port;
 137  
     }
 138  
 
 139  
 
 140  
     /**
 141  
      * Set the port used by this {@link Pipeline}
 142  
      * @param port the port used by this {@link Pipeline}
 143  
      */
 144  
     public synchronized void setPort(int port){
 145  0
         this.port = port;
 146  0
     }
 147  
 
 148  
     /**
 149  
      * {@inheritDoc}
 150  
      */
 151  
     public void execute(Callable task) throws PipelineFullException {
 152  0
         submit(task);
 153  0
     }
 154  
 
 155  
     /**
 156  
      * {@inheritDoc}
 157  
      */
 158  
     public Callable waitForIoTask() {
 159  0
         Callable callable = null;
 160  
         try {
 161  0
             final Runnable r = this.getQueue().take();
 162  0
             if (r != null) {
 163  0
                 callable = new Callable() {
 164  
                     public Object call() throws Exception {
 165  0
                         r.run();
 166  0
                         return null;
 167  
                     }
 168  
                 };
 169  
             }
 170  0
         } catch (InterruptedException e) {
 171  0
         }
 172  
 
 173  0
         return callable;
 174  
     }
 175  
 
 176  
     /**
 177  
      * {@inheritDoc}
 178  
      */
 179  
     public int getWaitingThread() {
 180  0
         int waitingThreads = getPoolSize() - getActiveCount();
 181  0
         if (waitingThreads < 0) waitingThreads = 0;
 182  
 
 183  0
         return waitingThreads;
 184  
     }
 185  
 
 186  
     /**
 187  
      * {@inheritDoc}
 188  
      */
 189  
     public int getMaxThreads() {
 190  0
         return getMaximumPoolSize();
 191  
     }
 192  
 
 193  
     /**
 194  
      * {@inheritDoc}
 195  
      */
 196  
     public void setMaxThreads(int maxThread) {
 197  0
         setMaximumPoolSize(maxThread);
 198  0
     }
 199  
 
 200  
     /**
 201  
      * {@inheritDoc}
 202  
      */
 203  
     public int getCurrentThreadCount() {
 204  0
         return getPoolSize();
 205  
     }
 206  
 
 207  
     /**
 208  
      * {@inheritDoc}
 209  
      */
 210  
     public int getCurrentThreadsBusy() {
 211  0
         return getActiveCount();
 212  
     }
 213  
 
 214  
     /**
 215  
      * {@inheritDoc}
 216  
      */
 217  
     public void initPipeline() {
 218  0
     }
 219  
 
 220  
     /**
 221  
      * {@inheritDoc}
 222  
      */
 223  
     public void startPipeline() {
 224  0
         this.prestartCoreThread();
 225  0
     }
 226  
 
 227  
     /**
 228  
      * {@inheritDoc}
 229  
      */
 230  
     public void stopPipeline() {
 231  0
         shutdownNow();
 232  0
     }
 233  
 
 234  
     /**
 235  
      * Set the thread priority of the {@link Pipeline}
 236  
      * @param priority thread priority to use
 237  
      */
 238  
     public synchronized void setPriority(int priority){
 239  0
         this.priority = priority;
 240  0
     }
 241  
 
 242  
     /**
 243  
      * {@inheritDoc}
 244  
      */
 245  
     public void setMinThreads(int minThread) {
 246  0
         setCorePoolSize(minThread);
 247  0
     }
 248  
 
 249  
     /**
 250  
      * Get the maximum pending connections this {@link Pipeline}
 251  
      * can handle.
 252  
      * @return maximum queue size (in bytes) this Pipeline is using
 253  
      */
 254  
     public synchronized int getMaxQueueSize(){
 255  0
         return maxTasksCount;
 256  
     }
 257  
 
 258  
     /**
 259  
      * Is not supported for the <tt>ThreadPoolExecutorServicePipeline<tt>.
 260  
      * The value, passed to the cosntructor could not be changed at the runtime.
 261  
      * 
 262  
      * @param maxQueue
 263  
      */
 264  
     public void setMaxQueueSize(int maxQueue) {
 265  0
         throw new UnsupportedOperationException("Value could not be changed!");
 266  
     }
 267  
 
 268  
     /**
 269  
      * Is not supported for the <tt>ThreadPoolExecutorServicePipeline<tt>.
 270  
      * @param threadIncrements
 271  
      */
 272  
     public void setThreadsIncrement(int threadIncrements) {
 273  0
         throw new UnsupportedOperationException("Not supported.");
 274  
     }
 275  
 
 276  
     /**
 277  
      * {@inheritDoc}
 278  
      */
 279  
     public int size() {
 280  0
         return getQueue().size();
 281  
     }
 282  
 
 283  
 
 284  
     /**
 285  
      * The {@link ByteBufferTypel} used to create the {@link ByteBuffer}
 286  
      * associated with {@link WorkerThreadImpl}s created by this instance.
 287  
      * @return The {@link ByteBufferTypel} used to create the {@link ByteBuffer}
 288  
      * associated with {@link WorkerThreadImpl}s created by this instance.
 289  
      */
 290  
     public ByteBufferType getByteBufferType() {
 291  0
         return byteBufferType;
 292  
     }
 293  
 
 294  
 
 295  
     /**
 296  
      * Set the {@link ByteBufferTypel} to use when creating the
 297  
      * {@link ByteBuffer} associated with {@link WorkerThreadImpl}s
 298  
      * created by this instance.
 299  
      * @param byteBufferType The ByteBuffer type.
 300  
      */
 301  
     public void setByteBufferType(ByteBufferType byteBufferType) {
 302  0
         this.byteBufferType = byteBufferType;
 303  0
     }
 304  
 
 305  
 
 306  
     /**
 307  
      * Get the initial WorkerThreadImpl {@link ByteBuffer} size
 308  
      * @return initial WorkerThreadImpl <code>ByteBuffwaitingThreadser</code> size
 309  
      */
 310  
     public synchronized int getInitialByteBufferSize(){
 311  0
         return initialByteBufferSize;
 312  
     }
 313  
 
 314  
     /**
 315  
      * Set the initial WorkerThreadImpl {@link ByteBuffer} size
 316  
      * @param size initial WorkerThreadImpl {@link ByteBuffer} size
 317  
      */
 318  
     public synchronized void setInitialByteBufferSize(int size){
 319  0
         initialByteBufferSize = size;
 320  0
     }
 321  
 
 322  
     protected static class WorkerThreadFactory implements ThreadFactory {
 323  
         private ThreadPoolExecutorServicePipeline threadPool;
 324  
 
 325  
         public WorkerThreadFactory(
 326  0
                 ThreadPoolExecutorServicePipeline threadPool) {
 327  0
             this.threadPool = threadPool;
 328  0
         }
 329  
 
 330  
         public Thread newThread(Runnable r) {
 331  0
             WorkerThreadImpl workerThread = new WorkerThreadImpl(threadPool,
 332  
                     threadPool.name + "WorkerThread-"  + threadPool.port + "-" +
 333  
                     threadPool.workerThreadCounter.getAndIncrement(), r,
 334  
                     threadPool.initialByteBufferSize);
 335  0
             workerThread.setByteBufferType(threadPool.byteBufferType);
 336  0
             workerThread.setPriority(threadPool.priority);
 337  
 
 338  0
             return workerThread;
 339  
         }
 340  
     }
 341  
 }