/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 1997-2008 Sun Microsystems, Inc. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
* may not use this file except in compliance with the License. You can obtain
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
* as provided by Sun in the GPL Version 2 section of the License file that
* accompanied this code. If applicable, add the following below the License
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
*
* Contributor(s):
*
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license." If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above. However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/
package com.sun.enterprise.web.connector.grizzly;
import java.nio.channels.SelectionKey;
/**
* Internal FIFO used by the Worker Threads to pass information
* between Task
objects.
*
* @author Charlie Hunt
*/
public class CircularQueuePipeline extends CircularQueue implements Pipeline {
/**
* The number of thread waiting for a Task
*/
protected int waitingThreads = 0;
/**
* The maximum number of Thread
*/
protected int maxThreads = 20;
/**
* The minimum numbers of WorkerThreadImpl
*/
protected int minThreads = 5;
/**
* The minimum numbers of spare WorkerThreadImpl
*/
protected int minSpareThreads = 2;
/**
* The port used.
*/
protected int port = 8080;
/**
* The number of WorkerThreadImpl
*/
protected int threadCount = 0;
/**
* The name of this Pipeline
*/
protected String name;
/**
* The Thread Priority
*/
protected int priority = Thread.NORM_PRIORITY;
/**
* Has the pipeline already started
*/
protected boolean isStarted = false;
/**
* WorkerThreadImpl
amanged by this pipeline.
*/
protected transient WorkerThreadImpl[] workerThreads;
/**
* Maximum pending connection before refusing requests.
*/
protected int maxQueueSizeInBytes = -1;
/**
* The increment number used when adding new thread.
*/
protected int threadsIncrement = 1;
/**
* The request times out during transaction.
*/
protected int threadsTimeout = Constants.DEFAULT_TIMEOUT;
/**
* The PipelineStatistic
objects used when gathering statistics.
*/
protected transient PipelineStatistic pipelineStat;
// ------------------------------------------------------- Constructor -----/
public CircularQueuePipeline(){
super();
}
public CircularQueuePipeline(int maxThreads, int minThreads, String name,
int port, int priority){
this.maxThreads = maxThreads;
this.port = port;
this.name = name;
this.minThreads = minThreads;
this.priority = priority;
if ( minThreads < minSpareThreads )
minSpareThreads = minThreads;
}
public CircularQueuePipeline(int maxThreads, int minThreads, String name,
int port){
this(maxThreads,minThreads,name,port,Thread.NORM_PRIORITY);
}
// ------------------------------------------------ Lifecycle ------------/
/**
* Init the Pipeline
by initializing the required
* WorkerThreadImpl
. Default value is 10
*/
public synchronized void initPipeline(){
if (minThreads > maxThreads) {
minThreads = maxThreads;
}
workerThreads = new WorkerThreadImpl[maxThreads];
increaseWorkerThread(minThreads, false);
}
/**
* Start the Pipeline
and all associated
* WorkerThreadImpl
*/
public synchronized void startPipeline(){
if (!isStarted) {
for (int i=0; i < minThreads; i++){
workerThreads[i].start();
}
isStarted = true;
}
}
/**
* Stop the Pipeline
and all associated
* WorkerThreadImpl
*/
public synchronized void stopPipeline(){
if (isStarted) {
for (int i=0; i < threadCount; i++){
workerThreads[i].terminate();
}
isStarted = false;
}
notifyAll();
}
/**
* Create new WorkerThreadImpl
. This method must be invoked
* from a synchronized block.
*/
protected void increaseWorkerThread(int increment, boolean startThread){
WorkerThreadImpl workerThread;
int currentCount = threadCount;
int increaseCount = threadCount + increment;
for (int i=currentCount; i < increaseCount; i++){
workerThread = new WorkerThreadImpl(this,
name + "WorkerThread-" + port + "-" + i);
workerThread.setPriority(priority);
if (startThread)
workerThread.start();
workerThreads[i] = workerThread;
threadCount++;
}
}
/**
* Interrupt the Thread
using it thread id
*/
public synchronized boolean interruptThread(long threadID){
ThreadGroup threadGroup = workerThreads[0].getThreadGroup();
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
for (Thread thread: threads){
if ( thread != null && thread.getId() == threadID ){
if ( Thread.State.RUNNABLE != thread.getState()){
try{
thread.interrupt();
return true;
} catch (Throwable t){
// Swallow all exceptions.
}
}
}
}
return false;
}
// ---------------------------------------------------- Queue ------------//
/**
* Add an object to this pipeline
*/
public synchronized void addTask(Task task) {
int queueSize = size();
if ( maxQueueSizeInBytes != -1 && maxQueueSizeInBytes <= queueSize){
task.cancelTask("Maximum Connections Reached: "
+ maxQueueSizeInBytes + " -- Retry later",
"HTTP/1.1 503 Service Unavailable");
task.getSelectorThread().returnTask(task);
return;
}
addLast(task);
notify();
// Create worker threads if we know we will run out of them
if (threadCount < maxThreads && waitingThreads < (queueSize + 1)){
int left = maxThreads - threadCount;
if (threadsIncrement > left){
threadsIncrement = left;
}
increaseWorkerThread(threadsIncrement,true);
}
}
/**
* Return a Task
object available in the pipeline.
* All Threads will synchronize on that method
*/
public synchronized Task getTask() {
if (size() - waitingThreads <= 0) {
try {
waitingThreads++;
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
waitingThreads--;
}
if (pipelineStat != null) {
pipelineStat.gather(size());
}
return poll();
}
/**
* Invoked when the SelectorThread is about to expire a SelectionKey.
* @return true if the SelectorThread should expire the SelectionKey, false
* if not.
*/
public boolean expireKey(SelectionKey key){
return true;
}
/**
* Return true
if the size of this CircularQueue
* minus the current waiting threads is lower than zero.
*/
public boolean isEmpty() {
return (size() - waitingThreads <= 0);
}
// --------------------------------------------------Properties ----------//
/**
* Return the number of waiting threads.
*/
public synchronized int getWaitingThread(){
return waitingThreads;
}
/**
* Set the number of threads used by this pipeline.
*/
public synchronized void setMaxThreads(int maxThreads){
this.maxThreads = maxThreads;
}
/**
* Return the number of threads used by this pipeline.
*/
public synchronized int getMaxThreads(){
return maxThreads;
}
public synchronized int getCurrentThreadCount() {
return threadCount;
}
/**
* Return the curent number of threads that are currently processing
* a task.
*/
public synchronized int getCurrentThreadsBusy(){
return (threadCount - waitingThreads);
}
/**
* Return the maximum spare thread.
*/
public synchronized int getMaxSpareThreads() {
return maxThreads;
}
/**
* Return the minimum spare thread.
*/
public synchronized int getMinSpareThreads() {
return minSpareThreads;
}
/**
* Set the minimum space thread this Pipeline
can handle.
*/
public synchronized void setMinSpareThreads(int minSpareThreads) {
this.minSpareThreads = minSpareThreads;
}
/**
* Set the thread priority of the Pipeline
*/
public synchronized void setPriority(int priority){
this.priority = priority;
}
/**
* Set the name of this Pipeline
*/
public synchronized void setName(String name){
this.name = name;
}
/**
* Return the name of this Pipeline
* @return the name of this Pipeline
*/
public synchronized String getName(){
return name+port;
}
/**
* Set the port used by this Pipeline
* @param port the port used by this Pipeline
*/
public synchronized void setPort(int port){
this.port = port;
}
/**
* Set the minimum thread this Pipeline
will creates
* when initializing.
* @param minThreads the minimum number of threads.
*/
public synchronized void setMinThreads(int minThreads){
this.minThreads = minThreads;
}
@Override
public String toString(){
return "name: " + name + " maxThreads: " + maxThreads
+ " type: " + this.getClass().getName();
}
/**
* Set the number the Pipeline
will use when increasing the
* thread pool
*/
public synchronized void setThreadsIncrement(int threadsIncrement){
this.threadsIncrement = threadsIncrement;
}
/**
* Set the timeout value a thread will use to times out the request.
*/
public synchronized void setThreadsTimeout(int threadsTimeout){
this.threadsTimeout = threadsTimeout;
}
/**
* The number of Task
currently queued
* @return number of queued connections
*/
public synchronized int getTaskQueuedCount(){
return size();
}
/**
* Set the maximum pending connection this Pipeline
* can handle.
*/
public synchronized void setQueueSizeInBytes(int maxQueueSizeInBytesCount){
this.maxQueueSizeInBytes = maxQueueSizeInBytesCount;
}
/**
* Get the maximum pending connection this Pipeline
* can handle.
*/
public synchronized int getQueueSizeInBytes(){
return maxQueueSizeInBytes;
}
/**
* Set the PipelineStatistic
object used
* to gather statistic;
*/
public void setPipelineStatistic(PipelineStatistic pipelineStatistic){
this.pipelineStat = pipelineStatistic;
}
/**
* Return the PipelineStatistic
object used
* to gather statistic;
*/
public PipelineStatistic getPipelineStatistic(){
return pipelineStat;
}
}