Salut,
Thanks Gustav!! Comment inline
gustav trede wrote:
> Hello,
>
> please give feedback on this CometSelector removal patch that also contains:
>
> Comet now always recycles the processortask.
> one hashmap for the comethandler / comettask storage in cometcontext
> instead of 2.
> successful resume now calls onterminate.
>
> the code is semi prepared for next patch that wil move io, key cancel
> socket close and other attachment triggered logic , apt flush etc out
> from the selector thread to workers at postSelect.
>
> --
> regards
> gustav trede
>
>
>
> ------------------------------------------------------------------------
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>
>
> Property changes on: .
> ___________________________________________________________________
> Added: svn:ignore
> + target
> ..merge.diff
>
>
>
> Property changes on: contribs/bundles
> ___________________________________________________________________
> Added: svn:ignore
> + target
>
>
>
> Property changes on: contribs/bundles/grizzly-httpservice-bundle
> ___________________________________________________________________
> Added: svn:ignore
> + target
>
>
> Index: modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java
> ===================================================================
> --- modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java (revision 2805)
> +++ modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java (working copy)
> @@ -44,6 +44,7 @@
> import java.util.Collection;
> import java.util.concurrent.CountDownLatch;
> import com.sun.grizzly.http.SelectorThread;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.util.logging.Level;
>
> @@ -81,7 +82,7 @@
> }
> });
>
> - new Thread() {
> + new WorkerThreadImpl(new Runnable() {
> @Override
> public void run() {
> try {
> @@ -89,7 +90,7 @@
> } catch (Exception ex) {
> }
> }
> - }.start();
> + }).start();
>
> try {
> latch.await();
> Index: modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java
> ===================================================================
> --- modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java (revision 2805)
> +++ modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java (working copy)
> @@ -46,6 +46,7 @@
> import com.sun.grizzly.tcp.http11.GrizzlyAdapter;
> import com.sun.grizzly.tcp.http11.GrizzlyRequest;
> import com.sun.grizzly.tcp.http11.GrizzlyResponse;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.DataInputStream;
> import java.io.IOException;
> import java.io.InputStream;
> @@ -1006,7 +1007,7 @@
> t.printStackTrace();
> }
>
> - new Thread(){
> + new WorkerThreadImpl(new Runnable(){
> @Override
> public void run(){
> try {
> @@ -1022,8 +1023,8 @@
> res.resume();
> }
> }
> - }.start();;
> - }
> + }).start();
> + }
> });
>
> try {
> Index: modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java
> ===================================================================
> --- modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java (revision 2805)
> +++ modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java (working copy)
> @@ -440,7 +440,22 @@
> // being time.
> protected boolean handleKeepAliveBlockingThread = false;
>
> + /**
> + * false prevents the selectionkey from being re registered after async is done.
> + * if false then the socket lifecycle ending needs to be handled by you
> + */
[minor ] Can you clarify that comment? "be handled by you" ? :-) Also
use Lowercase Uppsercase words (SelectionKey)
> + protected boolean reRegisterSelectionKey = true;
>
> + /**
> + * true if asyncprotocolfilter should cancel the selectionkey
> + */
> + protected boolean aptCancelKey;
> +
> + /**
> + * used by asyncprotocolfilter
> + */
> + private final TaskEvent<ProcessorTask> event = new TaskEvent<ProcessorTask>(this);
> +
> // ----------------------------------------------------- Constructor ---- //
>
> public ProcessorTask(){
> @@ -523,8 +538,10 @@
> process(inputStream,
> outputStream);
> } catch(Throwable ex){
> - logger.log(Level.FINE,
> + if (logger.isLoggable(Level.FINE)){
> + logger.log(Level.FINE,
> sm.getString("processorTask.errorProcessingRequest"), ex);
> + }
> } finally {
> terminateProcess();
> }
> @@ -658,11 +675,7 @@
> if (response.isSuspended()){
> WorkerThread wt = (WorkerThread)Thread.currentThread();
> wt.getAttachment().setAttribute("suspend",Boolean.TRUE);
> -
> - ((SelectorThreadKeyHandler) selectorHandler.
> - getSelectionKeyHandler()).resetExpiration();
> - key.attach(response.getResponseAttachment());
> -
> + key.attach(response.getResponseAttachment());
Can you explain here why resetExpiration is gone? Just want to make sure
as this is used by the new Servlet 3.0 Async implementation in GlassFish.
> return;
> }
>
> @@ -674,8 +687,10 @@
> adapter.afterService(request,response);
> } catch (Exception ex) {
> error = true;
> - logger.log(Level.FINEST,
> - sm.getString("processorTask.errorFinishingRequest"), ex);
> + if (logger.isLoggable(Level.FINEST)){
> + logger.log(Level.FINEST,
> + sm.getString("processorTask.errorFinishingRequest"), ex);
> + }
> }
>
> // Finish the handling of the request
> @@ -773,7 +788,7 @@
> WorkerThread workerThread = (WorkerThread)Thread.currentThread();
> KeepAliveThreadAttachment k =
> (KeepAliveThreadAttachment) workerThread.getAttachment();
> - k.setActiveThreadTimeout(transactionTimeout);
> + k.setIdleTimeoutDelay(transactionTimeout);
So we are using setIdle instead of setActiveThread now. Will it do the
same behavior, e.g. the setActiveThreadTimeout was used to
kill/interrupt WorkerThread that block more longer that X time. Since
all unit test are passing, I assume the mechanism still work?
>
> inputBuffer.parseHeaders();
>
> @@ -850,20 +865,10 @@
> // control how Grizzly ARP extension handle their asynchronous
> // behavior, we must make sure we are never called twice.
> if (asyncSemaphore.tryAcquire(0, TimeUnit.SECONDS)) {
> - // Nobody is listening, avoid extra operation.
> - if (getTaskListener() == null){
> - return;
> - }
> -
> - TaskEvent<ProcessorTask> event = new TaskEvent<ProcessorTask>();
> - if (error) {
> - event.setStatus(TaskEvent.ERROR);
> - } else {
> - event.setStatus(TaskEvent.COMPLETED);
> - }
> - event.attach(this);
> - getTaskListener().taskEvent(event);
> - event.attach(null);
> + if (getTaskListener() != null){
> + event.setStatus(error?TaskEvent.ERROR:TaskEvent.COMPLETED);
> + getTaskListener().taskEvent(event);
> + }
> }
> } catch (InterruptedException ex) {
> if (logger.isLoggable(Level.WARNING)){
> @@ -929,10 +934,11 @@
> try {
> outputBuffer.commit();
> } catch (IOException ex) {
> - logger.log(Level.FINEST,
> + if (logger.isLoggable(Level.FINEST)){
> + logger.log(Level.FINEST,
> sm.getString("processorTask.nonBlockingError"), ex);
> - // Set error flag
> error = true;
> + }
> }
>
> } else if (actionCode == ActionCode.ACTION_ACK) {
> @@ -951,7 +957,6 @@
> try {
> outputBuffer.sendAck();
> } catch (IOException e) {
> - // Set error flag
> error = true;
> }
> }
> @@ -965,10 +970,11 @@
> try {
> outputBuffer.endRequest();
> } catch (IOException e) {
> - logger.log(Level.FINEST,
> + if (logger.isLoggable(Level.FINEST)){
> + logger.log(Level.FINEST,
> sm.getString("processorTask.nonBlockingError"), e);
> - // Set error flag
> - error = true;
> + error = true;
> + }
> }
> } else if (actionCode == ActionCode.ACTION_RESET) {
>
> @@ -1094,7 +1100,7 @@
> }
> } catch (Exception e) {
> logger.log(Level.WARNING,
> - sm.getString("processorTask.exceptionSSLcert"),e);
> + sm.getString("processorTask.exceptionSSLcert"),e);
> }
> }
> } else if ( actionCode == ActionCode.ACTION_POST_REQUEST ) {
> @@ -1103,18 +1109,15 @@
> try{
> handler.handle(request,Interceptor.RESPONSE_PROCEEDED);
> } catch(IOException ex){
> - logger.log(Level.FINEST,
> - "Handler exception",ex);
> + logger.log(Level.FINEST,"Handler exception",ex);
> }
> }
> } else if ( actionCode == ActionCode.CANCEL_SUSPENDED_RESPONSE ) {
> key.attach(null);
> } else if ( actionCode == ActionCode.RESET_SUSPEND_TIMEOUT ) {
> - if (key.attachment() instanceof Response.ResponseAttachment){
> - Response.ResponseAttachment ra = ((Response.ResponseAttachment)key.attachment());
> - if (ra != null){
> - ra.resetTimeout();
> - }
> + Object attachment = key.attachment();
> + if (attachment instanceof Response.ResponseAttachment){
> + ((Response.ResponseAttachment)attachment).resetTimeout();
> }
> } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH ) {
> if (key != null) {
> @@ -1913,6 +1916,8 @@
> setTaskListener(null);
> socket = null;
> dropConnection = false;
> + reRegisterSelectionKey = true;
> + aptCancelKey = false;
> key = null;
> }
>
> @@ -2301,5 +2306,40 @@
> public void setUseChunking(boolean useChunking) {
> this.useChunking = useChunking;
> }
> +
> + /**
> + * true if SelectionKey should be reregistered with Selector after async is done
> + * default is true.
> + * if false then the socket lifecycle ending needs to be handled by you
[minor] fix comments
> + * @param reRegisterSelectionKey
[minor] add missing javadoc
> + */
> + public void setReRegisterSelectionKey(boolean reRegisterSelectionKey) {
> + this.reRegisterSelectionKey = reRegisterSelectionKey;
> + }
> +
> + /**
> + * true if SelectionKey should be reregistered with Selector after async is done.
> + * default is true.
> + * if false then the socket lifecycle ending needs to be handled by you
> + * @return
[minor] add missing javadoc
> + */
> + public boolean getReRegisterSelectionKey() {
> + return reRegisterSelectionKey;
> + }
> +
> +
> + /**
> + * true if asyncprotocolfilter should cancel the selectionkey
> + */
> + public void setAptCancelKey(boolean aptCancelKey) {
> + this.aptCancelKey = aptCancelKey;
> + }
> +
> + /**
> + * true if asyncprotocolfilter should cancel the selectionkey
> + */
[minor] rename AsyncProtocolFilter
> + public boolean getAptCancelKey() {
> + return aptCancelKey;
> + }
> }
>
> Index: modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java
> ===================================================================
> --- modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java (revision 2805)
> +++ modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java (working copy)
> @@ -39,6 +39,9 @@
> package com.sun.grizzly.http;
>
> import com.sun.grizzly.util.ThreadAttachment;
> +import java.nio.channels.SelectionKey;
> +import java.util.logging.Level;
> +import java.util.logging.Logger;
>
> /**
> * Add keep alive counting mechanism to the {_at_link ThreadAttachement}.
> @@ -46,8 +49,9 @@
> * @author Jeanfrancois Arcand
> */
> public class KeepAliveThreadAttachment extends ThreadAttachment{
> - private int keepAliveCount = 0;
> -
> + protected final static Logger logger = SelectorThread.logger();
> +
> + private int keepAliveCount;
> /**
> * The stats object used to gather statistics.
> */
> @@ -88,4 +92,22 @@
> keepAliveCount = 0;
> }
>
> + @Override
> + public void release(SelectionKey selectionKey) {
> + super.release(selectionKey);
> + resetKeepAliveCount();
> + }
> +
> +
> + @Override
> + public boolean timedOut(SelectionKey selectionKey) {
> + Thread t = activeThread();
> + if (t != null) {
> + if (logger.isLoggable(Level.WARNING)) {
> + logger.log(Level.WARNING, "Interrupting idle Thread: " + t.getName());
> + }
> + t.interrupt();
> + }
> + return true;
> + }
> }
> Index: modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java
> ===================================================================
> --- modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (revision 2805)
> +++ modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (working copy)
> @@ -38,13 +38,11 @@
> package com.sun.grizzly.http;
>
> import com.sun.grizzly.DefaultSelectionKeyHandler;
> -import com.sun.grizzly.tcp.Response;
> -import com.sun.grizzly.tcp.Response.ResponseAttachment;
> +import com.sun.grizzly.SelectionKeyHandler;
> import com.sun.grizzly.util.Copyable;
> import com.sun.grizzly.util.SelectionKeyAttachment;
> import java.nio.channels.SelectionKey;
> import java.util.Iterator;
> -import java.util.logging.Level;
>
> /**
> * Default HTTP {_at_link SelectionKeyHandler} implementation
> @@ -71,6 +69,19 @@
> }
>
> @Override
> + public void cancel(SelectionKey key) {
> + if (key != null) {
> + if (selectorThread.getThreadPool() instanceof StatsThreadPool) {
> + if (selectorThread.isMonitoringEnabled() &&
> + ((StatsThreadPool) selectorThread.getThreadPool()).getStatistic().decrementOpenConnectionsCount(key.channel())) {
> + selectorThread.getRequestGroupInfo().decreaseCountOpenConnections();
> + }
> + }
> + super.cancel(key);
> + }
> + }
> +
> + @Override
> public void doRegisterKey(SelectionKey key, int ops, long currentTime) {
> Object attachment = key.attachment();
> if (attachment instanceof KeepAliveThreadAttachment) {
> @@ -85,40 +96,12 @@
> }
> key.interestOps(key.interestOps() | ops);
> }
> -
> - @Override
> - public void cancel(SelectionKey key) {
> - if (key == null) {
> - return;
> - }
> - if (selectorThread.getThreadPool() instanceof StatsThreadPool) {
> - if (selectorThread.isMonitoringEnabled() &&
> - ((StatsThreadPool) selectorThread.getThreadPool()).getStatistic().decrementOpenConnectionsCount(key.channel())) {
> - selectorThread.getRequestGroupInfo().decreaseCountOpenConnections();
> - }
> - }
> -
> - Object attachment = key.attachment();
> - if (attachment instanceof KeepAliveThreadAttachment) {
> - KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment;
> - k.resetKeepAliveCount();
> - }
> - super.cancel(key);
> - }
> -
> +
> /**
> - * Reset the expiration time
> - */
> - public void resetExpiration() {
> - nextKeysExpiration = 0;
> - }
> -
> - /**
> * {_at_inheritDoc}
> */
> @Override
> public void expire(Iterator<SelectionKey> iterator) {
> - //must check for timeout, attachments can have such interest
> final long currentTime = System.currentTimeMillis();
> if (currentTime < nextKeysExpiration) {
> return;
> @@ -129,51 +112,21 @@
> if (!key.isValid()) {
> continue;
> }
> +
> Object attachment = key.attachment();
> if (attachment != null) {
> long expire = getExpirationStamp(attachment);
> - if (expire == SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
> - continue;
> - }
>
> - long idleLimit, activeThreadTimeout;
> - if (attachment instanceof KeepAliveThreadAttachment) {
> - activeThreadTimeout = ((KeepAliveThreadAttachment) attachment).getActiveThreadTimeout();
> + if (expire != SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
> + long idleLimit = getIdleLimit(attachment);
>
> - if (activeThreadTimeout != SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
> - idleLimit = activeThreadTimeout;
> - } else {
> - idleLimit = ((SelectionKeyAttachment) attachment).getIdleTimeoutDelay();
> - if (idleLimit == SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
> - //this is true when attachment class dont have idletimeoutdelay configured.
> - idleLimit = timeout;
> - }
> - }
> - } else {
> - idleLimit = timeout;
> + if (idleLimit != -1 && currentTime - expire >= idleLimit &&
> + (!(attachment instanceof SelectionKeyAttachment) ||
> + ((SelectionKeyAttachment)attachment).timedOut(key))){
> + // selectorHandler.addPendingKeyCancel(key);
Remove the commented code here.
> + cancel(key);
> + }
> }
> - if (idleLimit == -1) {
> - continue;
> - }
> -
> - if (currentTime - expire >= idleLimit) {
> - if (attachment instanceof Response.ResponseAttachment) {
> - ((ResponseAttachment) attachment).timeout();
> - key.attach(null);
> - continue;
> - }
> -
> - if (attachment instanceof KeepAliveThreadAttachment) {
> - KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment;
> - if (k.activeThread() != null) {
> - if (logger.isLoggable(Level.WARNING)) {
> - logger.log(Level.WARNING, "Interrupting idle Thread: " + k.activeThread().getName());
> - }
> - k.activeThread().interrupt();
> - }
> - }
> - cancel(key);
> - }
> }
> }
> }
> @@ -184,14 +137,26 @@
> *
> * @param {_at_link SelectionKey}
> */
> - private long getExpirationStamp(Object attachment) {
> + protected long getExpirationStamp(Object attachment) {
> if (attachment instanceof Long) {
> return (Long) attachment;
> - } else if (attachment instanceof SelectionKeyAttachment) {
> + }
> + if (attachment instanceof SelectionKeyAttachment) {
> return ((SelectionKeyAttachment) attachment).getTimeout();
> - } else if (attachment instanceof Response.ResponseAttachment) {
> - return ((Response.ResponseAttachment) attachment).getExpirationTime() - timeout;
> }
> return SelectionKeyAttachment.UNLIMITED_TIMEOUT;
> }
> +
> + /**
> + * returns idle limit
[Minor] Javadoc -> Return the idle limit + @return
> + */
> + private long getIdleLimit(Object attachment){
> + if (attachment instanceof SelectionKeyAttachment){
> + long idleLimit = ((SelectionKeyAttachment) attachment).getIdleTimeoutDelay();
> + if (idleLimit != SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
> + return idleLimit;
> + }
> + }
> + return timeout;
> + }
> }
> Index: modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java
> ===================================================================
> --- modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java (revision 2805)
> +++ modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java (working copy)
> @@ -110,11 +110,12 @@
> */
> public boolean execute(Context ctx) throws IOException{
> HttpWorkerThread workerThread = ((HttpWorkerThread)Thread.currentThread());
> +
> + SelectionKey key = ctx.getSelectionKey();
> +
> + setSelectionKeyTimeout(key, Long.MAX_VALUE);
>
> - setSelectionKeyTimeout(ctx.getSelectionKey(), Long.MAX_VALUE);
> -
> - StreamAlgorithm streamAlgorithm =
> - workerThread.getStreamAlgorithm();
> + StreamAlgorithm streamAlgorithm = workerThread.getStreamAlgorithm();
> if (streamAlgorithm == null){
> try{
> streamAlgorithm = (StreamAlgorithm)algorithmClass
> @@ -142,8 +143,7 @@
> inputStream = createByteBufferInputStream();
> }
> configureByteBufferInputStream(inputStream, ctx, workerThread);
> -
> - SelectionKey key = ctx.getSelectionKey();
> +
> SocketChannel socketChannel = (SocketChannel) key.channel();
> streamAlgorithm.setChannel(socketChannel);
>
> @@ -160,17 +160,14 @@
> ctx.setKeyRegistrationState(Context.KeyRegistrationState.NONE);
>
> if (streamAlgorithm.parse(byteBuffer)){
> - ProcessorTask processor =
> - selectorThread.getProcessorTask();
> + ProcessorTask processor = selectorThread.getProcessorTask();
> configureProcessorTask(processor, ctx, workerThread,
> - streamAlgorithm.getHandler(), inputStream);
> -
> + streamAlgorithm.getHandler(), inputStream);
> try{
> selectorThread.getAsyncHandler().handle(processor);
> } catch (Throwable ex){
> logger.log(Level.INFO,"Processor exception",ex);
> - ctx.setKeyRegistrationState(
> - Context.KeyRegistrationState.CANCEL);
> + ctx.setKeyRegistrationState(Context.KeyRegistrationState.CANCEL);
> return false;
> }
> }
> @@ -196,18 +193,24 @@
>
> InputReader is = (InputReader) processor.getInputStream();
> is.getByteBuffer().clear();
> - byteBufferStreams.offer(is);
> -
> + byteBufferStreams.offer(is);
> +
> SelectorThread selectorThread = processor.getSelectorThread();
> - if (processor.isKeepAlive() && !processor.isError()){
> - setSelectionKeyTimeout(processor.getSelectionKey(), Long.MIN_VALUE);
> -
> - selectorThread.registerKey(processor.getSelectionKey());
> - } else {
> - selectorThread.cancelKey(processor.getSelectionKey());
> + boolean cancelkey = processor.getAptCancelKey() || processor.isError()
> + || !processor.isKeepAlive();
> + try{
> + if (!cancelkey){
> + if (processor.getReRegisterSelectionKey()){
> + setSelectionKeyTimeout(processor.getSelectionKey(), Long.MIN_VALUE);
> + selectorThread.registerKey(processor.getSelectionKey());
> + }
When getReResgisterSelectionKey() return false, we do nothing with the
key, right? But we recycle the processor later, so are we missing a case
here?
> + }else{
> + selectorThread.cancelKey(processor.getSelectionKey());
> + }
> + }finally{
> + processor.recycle();
> + selectorThread.returnTask(processor);
> }
> - processor.recycle();
> - selectorThread.returnTask(processor);
> }
> }
>
> Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java
> ===================================================================
> --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java (revision 2805)
> +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java (working copy)
> @@ -74,13 +74,11 @@
> ((CometHandler)cometHandler).onInterrupt((CometEvent)cometEvent);
> break;
> case CometEvent.NOTIFY:
> - ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent);
> - break;
> case CometEvent.READ:
> - ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent);
> - break;
> case CometEvent.WRITE:
> + synchronized(cometHandler){
> ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent);
> + }
> break;
> case CometEvent.INITIALIZE:
> ((CometHandler)cometHandler).onInitialize((CometEvent)cometEvent);
> @@ -92,7 +90,7 @@
> throw new IllegalStateException();
> }
> } catch (IOException ex){
> - Controller.logger().log(Level.WARNING,"",ex);
> + Controller.logger().log(Level.FINE,"",ex);
> }
> }
> }
> Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java
> ===================================================================
> --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (revision 2805)
> +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (working copy)
> @@ -38,10 +38,7 @@
>
> package com.sun.enterprise.web.connector.grizzly.comet;
>
> -import com.sun.grizzly.comet.CometTask;
> import java.io.IOException;
> -import java.util.Iterator;
> -import java.util.logging.Level;
>
> /**
> * The main object used by {_at_link CometHandler}.
> @@ -56,7 +53,7 @@
> * @author Jeanfrancois Arcand
> * @deprecated use {_at_link CometContext}
> */
> -public class CometContext<E> extends com.sun.grizzly.comet.CometContext<E>{
> +public class CometContext extends com.sun.grizzly.comet.CometContext{
>
> private final CometEvent eventInitialize;
>
> @@ -67,8 +64,8 @@
> */
> public CometContext(String contextPath, int continuationType) {
> super(contextPath, continuationType);
> - this.eventInterrupt = new CometEvent<E>(CometEvent.INTERRUPT,this);
> - this.eventInitialize = new CometEvent<E>(CometEvent.INITIALIZE,this);
> + this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this);
> + this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this);
> }
>
>
> @@ -124,13 +121,8 @@
> * {_at_inheritDoc}
> */
> @Override
> - public void notify(final E attachment) throws IOException {
> - CometEvent event = new CometEvent<E>(CometEvent.NOTIFY,this);
> - event.attach(attachment);
> - Iterator<com.sun.grizzly.comet.CometHandler> iterator = handlers.keySet().iterator();
> - notificationHandler.setBlockingNotification(blockingNotification);
> - notificationHandler.notify((com.sun.grizzly.comet.CometEvent)event,iterator);
> - resetSuspendIdleTimeout();
> + public void notify(final Object attachment) throws IOException {
> + super.notify(attachment);
> }
>
>
> @@ -146,14 +138,14 @@
> * {_at_inheritDoc}
> */
> @Override
> - public void notify(final E attachment,final int eventType,final int cometHandlerID)
> + public void notify(final Object attachment,final int eventType,final int cometHandlerID)
> throws IOException{
> CometHandler cometHandler = getCometHandler(cometHandlerID);
>
> if (cometHandler == null){
> throw new IllegalStateException(INVALID_COMET_HANDLER);
> }
> - CometEvent event = new CometEvent<E>(eventType,this);
> + CometEvent event = new CometEvent(eventType,this);
> event.attach(attachment);
>
> notificationHandler.setBlockingNotification(blockingNotification);
> @@ -173,30 +165,5 @@
> protected void initialize(com.sun.grizzly.comet.CometHandler handler) throws IOException {
> ((com.sun.enterprise.web.connector.grizzly.comet.CometHandler)handler).onInitialize(eventInitialize);
> }
> -
> - /**
> - * Interrupt a {_at_link CometHandler} by invoking {_at_link CometHandler#onInterrupt}
> - */
> - @Override
> - protected boolean interrupt(CometTask task,boolean removecomethandler, boolean resume) {
> - boolean status = true;
> - try{
> - if (removecomethandler){
> - status = (handlers.remove(task.getCometHandler()) != null);
> - if (status && resume){
> - ((com.sun.enterprise.web.connector.grizzly.comet.CometHandler)
> - task.getCometHandler()).onInterrupt(eventInterrupt);
> - }else{
> - logger.finer(ALREADY_REMOVED);
> - }
> - }
> - } catch (Throwable ex){
> - status = false;
> - logger.log(Level.FINE,"Unable to interrupt",ex);
> - }finally{
> - activeTasks.remove(task);
> - return status;
> - }
> - }
> }
> -
> +
> Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java
> ===================================================================
> --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java (revision 2805)
> +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java (working copy)
> @@ -87,36 +87,10 @@
> */
> @Override
> public CometContext register(String topic, int type){
> - // Double checked locking used used to prevent the otherwise static/global
> - // locking, cause example code does heavy usage of register calls
> - // for existing topics from http get calls etc.
> - CometContext cometContext = (CometContext)activeContexts.get(topic);
> - if (cometContext == null){
> - synchronized(activeContexts){
> - cometContext = (CometContext)activeContexts.get(topic);
> - if (cometContext == null){
> - cometContext = (CometContext)cometContextCache.poll();
> - if (cometContext != null)
> - cometContext.setTopic(topic);
> - if (cometContext == null){
> - cometContext = new CometContext(topic, type);
> - NotificationHandler notificationHandler
> - = new DefaultNotificationHandler();
> - cometContext.setNotificationHandler(notificationHandler);
> - if (notificationHandler != null && (notificationHandler
> - instanceof DefaultNotificationHandler)){
> - ((DefaultNotificationHandler)notificationHandler)
> - .setThreadPool(threadPool);
> - }
> - }
> - activeContexts.put(topic,cometContext);
> - }
> - }
> - }
> - return cometContext;
> + return (CometContext) super.register(topic, type);
> }
> +
>
> -
> /**
> * {_at_inheritDoc}
> */
> Index: modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java
> ===================================================================
> --- modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (revision 2805)
> +++ modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (working copy)
> @@ -57,11 +57,13 @@
> import com.sun.grizzly.tcp.http11.InternalOutputBuffer;
> import com.sun.grizzly.tcp.http11.filters.VoidOutputFilter;
> import com.sun.grizzly.util.LoggerUtils;
> +import com.sun.grizzly.util.SelectionKeyAttachment;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import com.sun.grizzly.util.buf.ByteChunk;
> import com.sun.grizzly.util.http.MimeHeaders;
> import java.io.IOException;
> +import java.nio.channels.SelectionKey;
> import java.util.Locale;
> -
> import java.nio.channels.SocketChannel;
> import java.util.concurrent.Semaphore;
> import java.util.logging.Level;
> @@ -892,21 +894,20 @@
> }
>
>
> - public static class ResponseAttachment<A>{
> -
> - private A attachment;
> + public static class ResponseAttachment<A> extends SelectionKeyAttachment
> + implements Runnable {
> +
> private CompletionHandler<? super A> completionHandler;
> - private Long timeout;
> - private Long expiration;
> - private Response response;
> + private final A attachment;
> + private final long idletimeoutdelay;
> + private final Response response;
>
> - public ResponseAttachment(Long timeout,A attachment,
> + public ResponseAttachment(long idletimeoutdelay,A attachment,
> CompletionHandler<? super A> completionHandler, Response response){
> - this.timeout = timeout;
> + this.idletimeoutdelay = idletimeoutdelay;
> this.attachment = attachment;
> this.completionHandler = completionHandler;
> - this.response = response;
> -
> + this.response = response;
> resetTimeout();
> }
>
> @@ -914,21 +915,20 @@
> return attachment;
> }
>
> -
> public CompletionHandler<? super A> getCompletionHandler() {
> return completionHandler;
> }
>
> public void resetTimeout(){
> - expiration = System.currentTimeMillis() + timeout;
> + timeout = System.currentTimeMillis();
> }
> -
> -
> - public Long getExpirationTime() {
> - return expiration;
> +
> + @Override
> + public long getIdleTimeoutDelay() {
> + return idletimeoutdelay;
> }
> +
>
> -
> public void resume(){
> completionHandler.resumed(attachment);
> try{
> @@ -939,11 +939,21 @@
> LoggerUtils.getLogger().log(Level.FINEST,"resume",ex);
> }
> }
> -
> -
> - public void timeout(){
> +
> + @Override
> + public boolean timedOut(SelectionKey Key) {
> + Key.attach(null);
> + run();
> + //((WorkerThreadImpl)Thread.currentThread()).
> + // getPendingIOhandler().addPendingIO(this);
Remove commented code. Is the getPendingIOHandler() still used somewhere?
> + return false;
> + }
> +
> + @Override
> + public void run() {
> timeout(true);
> }
> +
>
> public void timeout(boolean forceClose){
> // If the buffers are empty, commit the response header
> Index: modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java
> ===================================================================
> --- modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java (revision 2805)
> +++ modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java (working copy)
> @@ -49,7 +49,7 @@
> public abstract class SelectionKeyAttachment {
> public static final long UNLIMITED_TIMEOUT = Long.MIN_VALUE;
>
> - private long timeout = UNLIMITED_TIMEOUT;
> + protected long timeout = UNLIMITED_TIMEOUT;
>
> public static Object getAttachment(SelectionKey key) {
> Object attachment = key.attachment();
> @@ -60,25 +60,45 @@
> return attachment;
> }
>
> - public long getTimeout() {
> - return timeout;
> - }
> -
> /**
> * returns the idle timeout delay.
> * default it returns Long.MIN_VALUE , meaning null.
> * -1 means no timeout.
> - * subclass need to implement it.
> + * subclass need to override it.
> * @return
> */
> public long getIdleTimeoutDelay(){
> return UNLIMITED_TIMEOUT;
> }
>
> + /**
> + * subclass need to override this method for it to work.
Subclass (S uppsecase)
> + * Long.MIN_VALUE means null , and default value will be used.
> + * -1 means no timeout.
> + * @param idletimeoutdelay
> + */
> + public void setIdleTimeoutDelay(long idletimeoutdelay){
> + throw new IllegalStateException("setIdleTimeoutDelay not implemented in subclass");
> + }
> +
> +
> + public long getTimeout() {
> + return timeout;
> + }
> +
> public void setTimeout(long timeout) {
> this.timeout = timeout;
> }
>
> + /**
> + * called when idle timeout detected.
> + * return true if key should be canceled.
Add @return
> + */
> + public boolean timedOut(SelectionKey Key){
> + return true;
> + }
> +
> +
> public void release(SelectionKey selectionKey) {
> timeout = UNLIMITED_TIMEOUT;
> }
> Index: modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java
> ===================================================================
> --- modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (revision 2805)
> +++ modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (working copy)
> @@ -53,7 +53,7 @@
> */
> public class ThreadAttachment extends SelectionKeyActionAttachment
> implements AttributeHolder {
> -
> +
> /**
> * The maximum time this object can be associated with an active {_at_link Thread}
> */
> @@ -310,18 +310,19 @@
> * Set the time, in milliseconds, this object can be attached to a {_at_link Thread}
> * @param the time, in milliseconds, this object can be attached to a {_at_link Thread}
> */
> - public void setActiveThreadTimeout(long activeThreadTimeout){
> + @Override
> + public void setIdleTimeoutDelay(long activeThreadTimeout) {
> this.activeThreadTimeout = activeThreadTimeout;
> -
> - // As soon as we get invoked we grab the Thread
Leave the comment here :-)
> activeThread= Thread.currentThread();
> }
> -
> +
> +
> /**
> * Return the time, in milliseconds, this object can be attached to a {_at_link Thread}
> * @return the time, in milliseconds, this object can be attached to a {_at_link Thread}
> - */
> - public long getActiveThreadTimeout(){
> + */
> + @Override
> + public long getIdleTimeoutDelay() {
> return activeThreadTimeout;
> }
> }
> Index: modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java
> ===================================================================
> --- modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java (revision 2805)
> +++ modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java (working copy)
> @@ -37,6 +37,7 @@
> */
> package com.sun.grizzly.util;
>
> +import com.sun.grizzly.tcp.PendingIOhandler;
> import java.util.concurrent.Callable;
> import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
> import com.sun.grizzly.util.ThreadAttachment.Mode;
> @@ -79,8 +80,14 @@
> * The size of the ByteBuffer attached to this object.
> */
> private int initialByteBufferSize;
> +
> +
> + /**
> + * used by selectionkey attachments to enqueue io events that will be executed in
> + * selectorhandler.postselect by worker threads instead of the selector thread.
> + */
{minor] Lowercase -> Used by
> + private PendingIOhandler pendingIOhandler;
>
> -
> /**
> * Create a Thread that will synchronizes/block on
> * {_at_link DefaultThreadPool} instance.
> @@ -90,7 +97,14 @@
> public WorkerThreadImpl(ThreadGroup threadGroup, Runnable runnable){
> this(threadGroup, runnable, DEFAULT_BYTE_BUFFER_SIZE);
> }
> -
> +
> + public WorkerThreadImpl(Runnable runnable){
> + this(null, "workerthread", runnable, 0);
> + }
> +
> + public WorkerThreadImpl(String name, Runnable runnable){
> + this(null, name, runnable, 0);
> + }
> /**
> * Create a Thread that will synchronizes/block on
> * {_at_link DefaultThreadPool} instance.
> @@ -278,6 +292,22 @@
> }
>
>
> + /**
> + * used by selectionkey attachments to enqueue io events that will be executed in
> + * selectorhandler.postselect by worker threads instead of the selector thread.
> + */
> + public PendingIOhandler getPendingIOhandler() {
> + return pendingIOhandler;
> + }
> +
> + /**
> + * used by selectionkey attachments to enqueue io events that will be executed in
> + * selectorhandler.postselect by worker threads instead of the selector thread.
> + */
> + public void setPendingIOhandler(PendingIOhandler pendingIOhandler) {
> + this.pendingIOhandler = pendingIOhandler;
> + }
> +
> @Override
> protected void reset() {
> if (threadAttachment != null) {
> Index: modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java
> ===================================================================
> --- modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java (revision 2805)
> +++ modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java (working copy)
> @@ -40,9 +40,7 @@
> import com.sun.grizzly.arp.AsyncHandler;
> import com.sun.grizzly.arp.DefaultAsyncHandler;
> import com.sun.grizzly.http.SelectorThread;
> -import com.sun.grizzly.http.StatsThreadPool;
> import java.io.BufferedInputStream;
> -import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import java.lang.management.ManagementFactory;
> @@ -51,7 +49,6 @@
> import java.net.Socket;
> import java.net.SocketAddress;
> import java.util.concurrent.CountDownLatch;
> -import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicInteger;
> import junit.framework.TestCase;
> @@ -62,13 +59,13 @@
> * @author Gustav Trede
> */
> public class CometUnitTest extends TestCase {
> - private final int port = 19000;
> + private final int port = 19100;
> private SocketAddress connectadr;
> - private final int socketreusedelayMilliSec = 0;
> + private final int socketreusedelayMilliSec = 40;
> private static volatile boolean status;
> private static volatile boolean testisdone;
> private SelectorThread st;
> - private final String context = "/cometText";
> + private final String context = "/cometTextn";
> private final byte joinmessage = 126;
> private final byte[] connectstr=
> ("POST /index.html/comet HTTP/1.1\r\n"+
> @@ -81,9 +78,16 @@
> }
>
> @Override
> + protected void tearDown() throws Exception {
> + super.tearDown();
> + if (st != null)
> + st.stopEndpoint();
> + }
> +
> + @Override
> protected void setUp() throws Exception {
> - super.setUp();
> - init(false);
> + super.setUp();
> + init(false);
> }
>
> protected void init(boolean useconcurrentcomethandler) throws Exception{
> @@ -92,7 +96,7 @@
> System.err.println("JVM: "+rmx.getVmVendor()+" "+rmx.getVmName()+" "+rmx.getVmVersion()+" params: "+rmx.getInputArguments());
> st = new SelectorThread();
> st.setPort(port);
> - st.setDisplayConfiguration(true);
> + st.setDisplayConfiguration(false);
> st.setAdapter(new CometTestAdapter(context,useconcurrentcomethandler,-1));
> st.setEnableAsyncExecution(true);
> AsyncHandler asyncHandler = new DefaultAsyncHandler();
> @@ -100,10 +104,7 @@
> st.setAsyncHandler(asyncHandler);
> st.setTcpNoDelay(true);
> st.setLinger(-1);
> - /*st.setThreadPool( new StatsThreadPool(16,
> - 32, 50,
> - StatsThreadPool.DEFAULT_IDLE_THREAD_KEEPALIVE_TIMEOUT,
> - TimeUnit.MILLISECONDS));*/
> +
> try {
> st.listen();
> } catch (Exception ex) {
> @@ -113,44 +114,51 @@
> }
>
>
> - @Override
> - protected void tearDown() throws Exception {
> - super.tearDown();
> - if (st != null)
> - st.stopEndpoint();
> - }
> -
> /* public void testLongPollingSocketReuse() throws Exception{
> doActualLogic(true,false,40,20);
> }*/
>
>
> - /* public void testLongPollingNewSocket() throws Exception{
> - doActualLogic(false,false,64,5);
> - }
> -*/
> + /* public void testLongPollingNewSocket() throws Exception{
> + doActualLogic(false,false,6500,64);
> + }*/
>
>
> - public void testStreaming2() throws Exception{
> - doActualLogic(false,true,10,4);
> + public void testStreaming1() throws Throwable{
> + //doActualLogic(false,true,15,1,false);
> }
>
> + /* public void testStreaming2() throws Throwable{
> + doActualLogic(false,true,21,4, false);
> + }
> +
> + public void testStreaming3() throws Throwable{
> + doActualLogic(false,true,21,64, false);
> + }*/
> +
> + /* public void testStreaming5() throws Throwable{
> + doActualLogic(false,true, 15, 256);
> + }*/
Remove /* or the code ?
> +
> protected void doActualLogic(final boolean socketreuse,final boolean streaming,
> - final int secondspertest,final int threadcount) throws Exception{
> - System.err.println(streaming?"STREAMING-":"LONGPOLLING-"+(socketreuse?"SOCKETREUSE":"NEWSOCKET")+" client threads: "+threadcount);
> + final int secondspertest,final int threadcount, boolean spreadnotify) throws Throwable{
> + System.err.println((streaming?"STREAMING-":"LONGPOLLING-")+(socketreuse?"SOCKETREUSE":"NEWSOCKET")+" client threads: "+threadcount+" spreadNotifyToManyThreads: "+spreadnotify);
> //int cpus = Runtime.getRuntime().availableProcessors();
> + ((DefaultNotificationHandler)CometTestAdapter.cometContext.notificationHandler).
> + setSpreadNotifyToManyToThreads(spreadnotify);
> testisdone = false;
> msgc.set(0);
> - CometTestAdapter.usetreaming = streaming;
> - status = true;
> + CometTestAdapter.usetreaming = streaming;
> final CountDownLatch threadsaredone = new CountDownLatch(threadcount);
> try{
> + status = true;
> for (int i=0;i<threadcount;i++){
> + final boolean first = false;
> new Thread("cometUnitTestClient"){
> @Override
> public void run(){
> try {
> - connectClient(socketreuse,streaming);
> + connectClient(socketreuse,streaming, first);
> } catch (Exception ex) {
> if (!testisdone && status){
> status = false; //can happen a few times due to not hreadsafe. but itt ok
> @@ -164,91 +172,108 @@
> }
>
> Thread.currentThread().setPriority(Thread.currentThread().getPriority()+1);
> - ThreadPoolExecutor cometexecutor = (ThreadPoolExecutor)CometEngine.getEngine().threadPool;
> + //NewDefaultThreadPool tp = (NewDefaultThreadPool)CometEngine.getEngine().threadPool;
> + //ThreadPoolExecutor tp = (ThreadPoolExecutor)CometEngine.getEngine().threadPool;
> final long t0 = System.currentTimeMillis();
> long t1 = t0;
> int oldtotal = 0;
> - int eventbroadcasts = 100;
> + final int waittime = 20;
> + int eventbroadcasts = 900000/(threadcount*(1000/waittime));
> while(t1-t0 < secondspertest*1000 ){
> + //int queuesize = tp.getQueuedTasksCount();
> + // int queuesize = tp.getQueue().size();
> + int queuesize = 0;
> +
> long t2 = System.currentTimeMillis();
> long deltatime = t2-t1;
> - if (deltatime>2300){
> + if (deltatime>4500){
> t1 = t2;
> int currenttotalmsg = msgc.get();
> System.err.println(
> - " events/sec : "+((currenttotalmsg-oldtotal)*1000/deltatime)+
> + " K events/sec : "+((currenttotalmsg-oldtotal+500)/deltatime)+
> " comethandlers: "+CometTestAdapter.cometContext.handlers.size()+
> - " cometWorkqueue: "+cometexecutor.getQueue().size()
> + " workqueue: "+queuesize+
> + " broadcastsper: "+eventbroadcasts
> );
> oldtotal = currenttotalmsg;
> }
> - int queuesize = cometexecutor.getQueue().size();
> - if (queuesize < 10000){
> - eventbroadcasts = (eventbroadcasts*5)/4;
> +
> + if (streaming){
> +
> + /*if (queuesize < (spreadnotify?threadcount:1)*300 ){
> + eventbroadcasts = (eventbroadcasts*5)/4;
> + }*/
> + if (queuesize < (spreadnotify?threadcount:1)*100){
> + for (int i=0;i<eventbroadcasts;i++){
> + CometTestAdapter.cometContext.notify(joinmessage);
> + }
> + }
> + }else{
> + CometTestAdapter.cometContext.notify(joinmessage);
> }
> - if (queuesize < 30000){
> - for (int i=0;i<eventbroadcasts;i++){
> - CometTestAdapter.cometContext.notify(joinmessage);
> - }
> - }
> +
> synchronized(connectstr){
> - connectstr.wait(10);
> + connectstr.wait((waittime));
> }
> }
> testisdone = true;
> - threadsaredone.await(12,TimeUnit.SECONDS);
> - }catch(Exception ea){
> - status = false;
> + System.err.println("test is done. waiting for clients to die.");
> + threadsaredone.await(6,TimeUnit.SECONDS);
> + System.err.println("clients are done.");
> + assertTrue(status);
> + }catch(Throwable ea){
> throw ea;
> - }finally {
> - if (status)
> - assertTrue(true);
> - else
> - fail("error");
> }
> }
>
> static AtomicInteger msgc = new AtomicInteger();
>
> - protected void connectClient(final boolean socketreuse,final boolean streaming) throws Exception {
> + protected void connectClient(final boolean socketreuse,final boolean streaming,boolean notifyBeforeRead) throws Exception {
> InputStream in = null;
> OutputStream out = null;
> Socket socket = null;
> + final int deltaadd = 500;
> int msgcount = 0;
> try{
> while (!testisdone){
> if (socket == null){
> - socket = newSocket(10000);
> + socket = newSocket(5000);
> out = socket.getOutputStream();
> in = new BufferedInputStream(socket.getInputStream());
> }
>
> out.write(connectstr);
> out.flush();
> -
> - boolean _status = false;
> +
> +
> int b;
> + if (notifyBeforeRead)
> + CometTestAdapter.cometContext.notify(joinmessage);
> while ((b = in.read()) != joinmessage && !testisdone){
>
> }
> -
> +
> + if (!streaming)
> + msgc.getAndIncrement();
> + else{
> + if (msgcount++==deltaadd){ //lowers thread contention
> + msgc.getAndAdd(deltaadd);
> + msgcount = 0;
> + }
> + }
> //{
> //if (b==joinmessage){
> - if (msgcount++==10){ //lowers thread contention
> - msgc.getAndAdd(10);
> - msgcount = 0;
> - }
> +
> in.read();
> in.read();
> in.read();
> in.read();
> in.read();
> - _status = true;
> - //break;
> - //}
> - // }
> + boolean _status = true;
>
> - while(streaming && _status){
> + while(streaming && _status && !testisdone){
> + if (notifyBeforeRead)
> + CometTestAdapter.cometContext.notify(joinmessage);
> b = in.read();
> in.read();
> in.read();
> @@ -256,8 +281,8 @@
> in.read();
> in.read();
> _status = (b == joinmessage);
> - if (_status && msgcount++==10){ //lowers thread contention
> - msgc.getAndAdd(10);
> + if (_status && msgcount++==deltaadd){ //lowers thread contention
> + msgc.getAndAdd(deltaadd);
> msgcount = 0;
> }
> }
> @@ -268,12 +293,11 @@
> else
> fail("client did not recieve expected message, got:'"+b+"'");
> }
> -
> +
> if (!socketreuse){
> socket.close();
> socket = null;
> }
> -
> if (!streaming && socketreusedelayMilliSec > 0){
> Thread.sleep(socketreusedelayMilliSec);
> }
> @@ -287,11 +311,12 @@
>
> private Socket newSocket(int timeout) throws Exception{
> Socket socket = new Socket();
> - socket.setReuseAddress(false);
> - socket.setReceiveBufferSize(8192);
> - socket.setSendBufferSize(1024);
> + socket.setReuseAddress(true);
> + //socket.setReceiveBufferSize(2048);
> + //socket.setSendBufferSize(512);
> socket.setSoLinger(false, 0);
> socket.setSoTimeout(timeout);
> + socket.setTcpNoDelay(true);
> socket.connect(connectadr);
> return socket;
> }
> Index: modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java
> ===================================================================
> --- modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java (revision 2805)
> +++ modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java (working copy)
> @@ -71,31 +71,34 @@
> private int PORT = 18890;
> final CometContext test = CometEngine.getEngine().register("GrizzlyAdapter");
>
> + @Override
> + protected void setUp() throws Exception {
> + super.setUp();
> + test.setBlockingNotification(false);
> + }
>
> @Override
> protected void tearDown() throws Exception {
> super.tearDown();
> - test.activeTasks.clear();
> test.handlers.clear();
> - stopGrizzlyWebServer();
> -
> + stopGrizzlyWebServer();
> }
>
> public void testOnInterruptExpirationDelay() throws Exception {
> - System.out.println("testOnInterruptExpirationDelay - will wait 5 seconds");
> - final int delay = 5000;
> + System.out.println("testOnInterruptExpirationDelay - will wait 2 seconds");
> + final int delay = 2000;
> test.setExpirationDelay(delay);
> newGWS(PORT+=1);
> String alias = "/OnInterrupt";
> addAdapter(alias, false);
> gws.start();
>
> - HttpURLConnection conn = getConnection(alias);
> + HttpURLConnection conn = getConnection(alias,delay+4000);
> long t1 = System.currentTimeMillis();
> assertEquals(conn.getHeaderField(onInitialize), onInitialize);
> assertEquals(conn.getHeaderField(onInterrupt), onInterrupt);
> long delta = System.currentTimeMillis() - t1;
> - assertTrue("comet idletimeout was too fast,"+delta+"ms",delta > delay-200);
> + assertTrue("comet idletimeout was too fast,"+delta+"ms",delta > delay-250);
> assertTrue("comet idletimeout was too late,"+delta+"ms",delta < delay+3000);
> }
>
> @@ -109,7 +112,7 @@
>
> Socket s = new Socket("localhost", PORT);
> s.setSoLinger(false, 0);
> - s.setSoTimeout(5 * 1000);
> + s.setSoTimeout(1 * 1000);
> OutputStream os = s.getOutputStream();
> String a = "GET " + alias + " HTTP/1.1\n"+"Host: localhost:" + PORT + "\n\n";
> System.out.println(" "+a);
> @@ -120,66 +123,75 @@
> fail("client socket read did not read timeout");
> } catch (SocketTimeoutException ex) {
> s.close();
> - Thread.sleep(3 * 1000);
> + Thread.sleep(500);
> assertEquals(onInterrupt, ga.c.wasInterrupt);
> }
> + }
> +
> + public void testOnTerminate() throws IOException {
> + System.out.println("testOnTerminate ");
> + test.setExpirationDelay(-1);
> + newGWS(PORT+=3);
> + String alias = "/OnTerminate";
> + addAdapter(alias,true);
> + gws.start();
> + for (int i=0;i<10;i++){
> + new Thread() {
> + @Override
> + public void run() {
> + try {
> + Thread.sleep(200);
> + test.notify(onTerminate, CometEvent.TERMINATE);
> + } catch (Throwable ex) {
> + ex.printStackTrace();
> + fail("exception:"+ex.getMessage());
> + }
> + }
> + }.start();
> + HttpURLConnection conn = getConnection(alias,1000);
> + assertEquals(conn.getHeaderField(onInitialize) , onInitialize);
> + assertEquals(conn.getHeaderField(onTerminate), onTerminate);
> + conn.disconnect();
> + }
> }
> -
> - public void testOnEvent() throws IOException {
> - System.out.println("testOnEvent - will wait 5 seconds");
> - newGWS(PORT+=3);
> +
> + public void testOnEvent() throws Exception {
> + System.out.println("testOnEvent ");
> + newGWS(PORT+=4);
> String alias = "/OnEvent";
> addAdapter(alias, true);
> test.setExpirationDelay(-1);
> gws.start();
> - new Thread() {
> -
> - @Override
> - public void run() {
> - try {
> - Thread.sleep(5 * 1000);
> - test.notify(onEvent);
> - } catch (Throwable ex) {
> - Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex);
> - fail("sleep/notify exception:"+ex.getMessage());
> + int iter = 10;
> + while(iter-->0){
> + new Thread() {
> + @Override
> + public void run() {
> + try {
> + Thread.sleep(150);
> + test.notify(onEvent);
> + } catch (Throwable ex) {
> + Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex);
> + fail("sleep/notify exception:"+ex.getMessage());
> + }
> }
> - }
> - }.start();
> -
> - HttpURLConnection conn = getConnection(alias);
> - assertEquals(conn.getHeaderField(onInitialize), onInitialize);
> - assertEquals(conn.getHeaderField(onEvent), onEvent);
> + }.start();
> + HttpURLConnection conn = getConnection(alias,1000);
> + assertEquals(conn.getHeaderField(onInitialize), onInitialize);
> + assertEquals(conn.getHeaderField(onEvent), onEvent);
> + conn.disconnect();
> + }
> }
>
> - public void testOnTerminate() throws IOException {
> - System.out.println("testOnTerminate - will wait 5 seconds");
> - test.setExpirationDelay(-1);
> - newGWS(PORT+=4);
> - String alias = "/OnTerminate";
> - addAdapter(alias, false);
> - gws.start();
> - new Thread() {
> - @Override
> - public void run() {
> - try {
> - Thread.sleep(5 * 1000);
> - test.notify(onTerminate, CometEvent.TERMINATE);
> - } catch (Exception ex) {
> - fail("exception:"+ex.getMessage());
> - Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex);
> - }
> - }
> - }.start();
> - HttpURLConnection conn = getConnection(alias);
> - assertEquals(conn.getHeaderField(onInitialize) , onInitialize);
> - assertEquals(conn.getHeaderField(onTerminate), onTerminate);
> - }
> + private HttpURLConnection getConnection(String alias) throws IOException {
> + return getConnection(alias, 40*1000);
> + }
>
> - private HttpURLConnection getConnection(String alias) throws IOException {
> + private HttpURLConnection getConnection(String alias, int readtimeout) throws IOException {
> URL url = new URL("http", "localhost", PORT, alias);
> HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
> - urlConn.setConnectTimeout(10*1000);
> - urlConn.setReadTimeout(40*1000);
> + urlConn.setConnectTimeout(5*1000);
> + urlConn.setReadTimeout(readtimeout);
> urlConn.connect();
> return urlConn;
> }
> @@ -260,7 +272,6 @@
>
> response.addHeader(onTerminate, event.attachment().toString());
> response.getWriter().print(onTerminate);
> - event.getCometContext().resumeCometHandler(this);
> }
>
> public void onInterrupt(CometEvent event) throws IOException {
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java (working copy)
> @@ -40,34 +40,36 @@
> import com.sun.grizzly.Controller;
> import com.sun.grizzly.comet.CometEvent;
> import com.sun.grizzly.comet.CometHandler;
> -import java.io.Closeable;
> +import com.sun.grizzly.comet.DefaultNotificationHandler;
> import java.io.IOException;
> import java.util.LinkedList;
> import java.util.logging.Logger;
>
> /**
> - *
> * we queue events in each comethandler to lower the probability
> * that slow or massive IO for one comethandler severly delays events to others.<br>
> * <br>
> + * only streaming mode can benefit from buffering messages like this. <br>
> * only 1 thread at a time is allowed to do IO,
> * other threads put events in the queue and return to the thread pool.<br>
> *<br>
> * a thread initially calls enqueueEvent and stay there until there are no more
> - * events in the queue, calling the onEVent method in unsynchronized context for each Event.<br>
> + * events in the queue, calling the onEVent method in synchronized context for each Event.<br>
> *<br>
> * on IOE in onEvent we terminate.<br>
> * we have a limit, to keep memory usage under control.<br>
> * <br>
> * if queue limit is reached onQueueFull is called, and then we terminate.<br>
> *<br>
> - * default implementation of onInterrupt and onTerminate performs a .close() if attachment instanceof Closeable<br><
> *<br>
> * whats not optimal is that a worker thread is sticky to the client depending
> * uppon available events in the handlers local queue,
> * that can in theory allow a few clients to block all threads for extended time.<br>
> - * The improvement is that only 1 thread is tied up to a client instead of several
> - * being blocked by synchronized.<br>
> + * that effect can make this implementation unusable depending on the scenario,
> + * its not a perfect design be any means.
> + * <br>
> + * The potential improvement is that only 1 worker thread is tied up to a client instead of several
> + * being blocked by synchronized io wait for one comethandler .<br>
> *
> * @author Gustav Trede
> */
> @@ -78,12 +80,8 @@
> /**
> * used for preventing othe worker threads from the executor event queue from adding events
> * to the comethandlers local queue or starting IO logic after shuttdown.<br>
> - * <br>
> - * {_at_link DefaultNotificationHandler} sets shuttingdown = true when needed.<br>
> - * this way we dont need subclasses to remember to do super calls in the onXX methods.<br>
> - * todo: CometEvent.INTERRUPT should do cometHandler.shuttingdown = true; ?
> */
> - protected volatile boolean shuttingdown;
> + private boolean shuttingdown;
>
> /**
> * max number of events to locally queue for this comethandler.<br>
> @@ -128,11 +126,8 @@
> * further events in the internal queue.
> */
> public void EnQueueEvent(CometEvent event){
> - if (shuttingdown)
> - return;
> synchronized(messageQueue){
> if (!isreadyforwork){
> - // to prevent add of event when we are shutdown
> if (!shuttingdown && queuesize < messageQueueLimit){
> messageQueue.add(event);
> queuesize++;
> @@ -149,12 +144,19 @@
> return;
> }
> try{
> - onEvent(event);
> - } catch (Throwable ex) {
> + //move synchronized outside the while loop ?
> + synchronized(this){
> + onEvent(event);
> + }
> + } catch (IOException ex) {
> shuttingdown = true;
> - event.getCometContext().resumeCometHandler(this);
> - return;
> + }finally{
> + if (shuttingdown){
> + event.getCometContext().resumeCometHandler(this);
> + return;
> + }
> }
> +
> synchronized(messageQueue){
> if (queuesize == messageQueueLimit){
> queuefull = true;
> @@ -175,7 +177,7 @@
> }
>
> /**
> - * called in unsynchronized context, not blocking other threads
> + * called in synchronized context.
> * when the comethandler's local event queue is full.<br>
> * default impl resumes the comethandler
> * @param event {_at_link CometEvent}
> @@ -183,14 +185,6 @@
> public void onQueueFull(CometEvent event){
> event.getCometContext().resumeCometHandler(this);
> }
> -
> - /**
> - * prevents further event handling in the enQueue method.<br>
> - * existing queued events will be discarded.
> - */
> - public void shutdownQueue() {
> - shuttingdown = true;
> - }
>
> /**
> * returns the attachment
> @@ -229,14 +223,14 @@
> }
>
> /**
> - * closes the connection if attachment instanceof Closable.
> + *
> */
> protected void terminate(){
> - if (attachment() instanceof Closeable){
> + /* if (attachment() instanceof Closeable){
> try {
> ((Closeable) attachment()).close();
> } catch (IOException ex) { }
> - }
> + }*/
> }
>
> }
> \ No newline at end of file
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java (working copy)
> @@ -76,7 +76,11 @@
> * everytime a {_at_link CometContext#notify} is invoked. The {_at_link CometEvent}
> * will contains the message that can be pushed back to the remote client,
> * cached or ignored. This method can also be used to resume a connection
> - * once a notified by invoking {_at_link CometContext#resumeCometHandler}.
> + * once a notified by invoking {_at_link CometContext#resumeCometHandler}.<br>
> + * its not optimal to flush outputstream in this method for long polling,
> + * flush is performed in each CometContext.resume call.<br>
> + * flushing multiple times can fragment the data into several tcp packets,
> + * that leads to extra IO and overhead in general due to client ack for each packet etc.
> */
> public void onEvent(CometEvent event) throws IOException;
>
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java (working copy)
> @@ -1,376 +0,0 @@
> -/*
> - *
> - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
> - *
> - * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
> - *
> - * The contents of this file are subject to the terms of either the GNU
> - * General Public License Version 2 only ("GPL") or the Common Development
> - * and Distribution License("CDDL") (collectively, the "License"). You
> - * may not use this file except in compliance with the License. You can obtain
> - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
> - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
> - * language governing permissions and limitations under the License.
> - *
> - * When distributing the software, include this License Header Notice in each
> - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
> - * Sun designates this particular file as subject to the "Classpath" exception
> - * as provided by Sun in the GPL Version 2 section of the License file that
> - * accompanied this code. If applicable, add the following below the License
> - * Header, with the fields enclosed by brackets [] replaced by your own
> - * identifying information: "Portions Copyrighted [year]
> - * [name of copyright owner]"
> - *
> - * Contributor(s):
> - *
> - * If you wish your version of this file to be governed by only the CDDL or
> - * only the GPL Version 2, indicate your decision by adding "[Contributor]
> - * elects to include this software in this distribution under the [CDDL or GPL
> - * Version 2] license." If you don't indicate a single choice of license, a
> - * recipient has the option to distribute your version of this file under
> - * either the CDDL, the GPL Version 2 or to extend the choice of license to
> - * its licensees as provided above. However, if you add GPL Version 2 code
> - * and therefore, elected the GPL Version 2 license, then the option applies
> - * only if the new code is made subject to such option by the copyright
> - * holder.
> - *
> - */
> -
> -package com.sun.grizzly.comet;
> -
> -import com.sun.grizzly.http.SelectorThread;
> -import com.sun.grizzly.util.LinkedTransferQueue;
> -import java.io.IOException;
> -import java.nio.ByteBuffer;
> -import java.nio.channels.CancelledKeyException;
> -import java.nio.channels.SelectableChannel;
> -import java.nio.channels.SelectionKey;
> -import java.nio.channels.Selector;
> -import java.nio.channels.SocketChannel;
> -import java.util.concurrent.CountDownLatch;
> -import java.util.logging.Level;
> -import java.util.logging.Logger;
> -
> -/**
> - * NIO {_at_link Selector} allowing {_at_link CometHandler} to receive
> - * non-blocking requests bytes during request polling.
> - *
> - * TODO: investigate if its possible to move this functionality to grizzly main
> - * selector inorder to lower the extra overhead this 2nd selector is.
> - *
> - * @author Jeanfrancois Arcand
> - * @author Gustav Trede
> - */
> -public class CometSelector {
> -
> - /**
> - * The {_at_link CometEngine} singleton
> - */
> - protected final CometEngine cometEngine;
> -
> -
> - /**
> - * The {_at_link Selector}
> - */
> - private Selector selector;
> -
> - /**
> - * timestamp when last expireIdleKeys() performed its check
> - */
> - private long lastIdleCheck;
> -
> - /**
> - * Logger.
> - */
> - private final Logger logger = SelectorThread.logger();
> -
> - /**
> - *
> - */
> - private final ByteBuffer dumybuffer = ByteBuffer.allocate(1);
> -
> - /**
> - * The list of {_at_link SelectionKey} to register with the
> - * {_at_link Selector}
> - * TODO: replace with LinkedTransferQueue
> - */
> - private final LinkedTransferQueue<CometTask> keysToRegister
> - = new LinkedTransferQueue<CometTask>();
> -
> -
> - /**
> - * New {_at_link CometSelector}
> - * @param cometEngine The {_at_link CometEngine} singleton
> - */
> - public CometSelector(CometEngine cometEngine) {
> - this.cometEngine = cometEngine;
> - }
> -
> -
> - /**
> - * Start the {_at_link Selector} running on its
> - * Thread.
> - */
> - public void start() throws InterruptedException{
> - final CountDownLatch isStartedLatch = new CountDownLatch(1);
> - new Thread("CometSelector"){{
> - setDaemon(true);
> - }
> -
> - @Override
> - public void run(){
> - try{
> - selector = Selector.open();
> - } catch(IOException ex){
> - // Most probably a fd leak.
> - logger.log(Level.SEVERE,"CometSelector.open()",ex);
> - return;
> - }
> - isStartedLatch.countDown();
> -
> - doSelection();
> - }
> - }.start();
> - isStartedLatch.await();
> - }
> -
> - /**
> - * the selection logic
> - */
> - private void doSelection(){
> - while (true){
> - int selectorState = 0;
> - try{
> - try{
> - selectorState = selector.select(1000);
> - } catch (CancelledKeyException ex){
> - if (logger.isLoggable(Level.FINEST)){
> - logger.log(Level.FINEST,"CometSelector.open()",ex);
> - }
> - }
> -
> - handleSelectedKeys();
> - expireIdleKeys();
> - registerNewKeys();
> -
> - } catch (Throwable t){
> - handleException(t,null);
> - }finally{
> - if (selectorState <= 0){ //todo why is this needed ?
> - selector.selectedKeys().clear();
> - }
> - }
> - }
> - }
> -
> - /**
> - * handle the selected keys
> - */
> - private void handleSelectedKeys(){
> - for (SelectionKey cometKey:selector.selectedKeys()) {
> - try{
> - if (cometKey.isValid()) {
> - CometTask cometTask = (CometTask)cometKey.attachment();
> - boolean asyncExec = cometTask.isComethandlerisAsyncregistered();
> - if (asyncExec){
> - cometTask.setComethandlerisAsyncregistered(false);
> - if (cometKey.isReadable()){
> - cometKey.interestOps(cometKey.interestOps() & (~SelectionKey.OP_READ));
> - cometTask.upcoming_op = CometTask.OP_EVENT.READ;
> - }
> -
> - if (cometKey.isWritable()){
> - cometKey.interestOps(cometKey.interestOps() & (~SelectionKey.OP_WRITE));
> - cometTask.upcoming_op = CometTask.OP_EVENT.WRITE;
> - }
> - }
> - if (cometTask.getSelectionKey().attachment() == null){
> - if (cometTask.cometHandlerNotResumed()){
> - if (asyncExec){
> - cometTask.execute();
> - }else{
> - checkIfclientClosedConnection(cometKey);
> - }
> - }
> - } else {
> - // logger.warning("cometselector comettask.mainkey has an attachment. ");
> - cancelKey(cometKey,false,true, true);
> - }
> - } else {
> - //logger.warning("cometselector select detected invalid cometKey.");
> - cancelKey(cometKey,false,true,true);
> - }
> - }catch(Throwable t){
> - handleException(t, cometKey);
> - }
> - }
> - // one shot clear is alot faster then removing each element one by one.
> - selector.selectedKeys().clear();
> - }
> -
> - /**
> - *
> - * @param cometKey
> - */
> - private void checkIfclientClosedConnection(SelectionKey cometKey) {
> - boolean connectionclosed = true;
> - try {
> - SocketChannel socketChannel = (SocketChannel)cometKey.channel();
> - dumybuffer.clear();
> - connectionclosed = socketChannel.read(dumybuffer) == -1;
> - } catch (Throwable ex) {
> - // null we dont want cancelkey to happen here, cause it does not cancel mainKey
> - handleException(ex, null);
> - }
> - finally{
> - if (connectionclosed)
> - cancelKey(cometKey, true, true, true);
> - }
> - }
> -
> - /**
> - * perform the registration of new keys.
> - * The mainKey is the SelectionKey returned by the
> - * Selector used in the SelectorThread class.
> - */
> - private void registerNewKeys(){
> - SelectionKey cometKey = null;
> - CometTask cometTask;
> - while ((cometTask = keysToRegister.poll()) != null ){
> - try{
> - SelectionKey mainKey = cometTask.getSelectionKey();
> - SocketChannel channel = (SocketChannel)mainKey.channel();
> - if (mainKey.isValid() && channel.isOpen()) {
> - cometKey = channel.register(selector,SelectionKey.OP_READ);
> - cometTask.setCometKey(cometKey);
> - cometKey.attach(cometTask);
> - cometTask.getCometContext().addActiveCometTask(cometTask);
> - cometTask.getCometContext().
> - addActiveHandler(cometTask.getCometHandler(), cometKey);
> - cometKey = null;
> - }
> - }catch(Throwable t){
> - handleException(t, cometKey);
> - }
> - }
> - }
> -
> - /**
> - * Expires registered {_at_link SelectionKey}. If a
> - * {_at_link SelectionKey} is expired, the request will be resumed and the
> - * HTTP request will complete,
> - */
> - private void expireIdleKeys(){
> - if (selector.keys().isEmpty()){
> - return;
> - }
> -
> - final long current = System.currentTimeMillis();
> - if (current - lastIdleCheck < 1000){
> - return;
> - }
> -
> - lastIdleCheck = current;
> - for (SelectionKey cometKey:selector.keys()){
> - try{
> - CometTask cometTask = (CometTask)cometKey.attachment();
> - if (cometTask == null)
> - continue;
> - if (cometTask.hasExpired(current)){
> - cancelKey(cometKey,false,true, true);
> - continue;
> - }
> - /**
> - * The connection has been resumed since the timeout is
> - * re-attached to the SelectionKey so cancel the Comet key.
> - */
> - if (cometTask.getSelectionKey().attachment() instanceof Long){
> - cometKey.attach(null);
> - cometKey.cancel();
> - }
> - }catch(Throwable t){
> - handleException(t, cometKey);
> - }
> - }
> - }
> -
> - /**
> - * handle exceptions for selection logic
> - * @param t
> - * @param key
> - */
> - private void handleException(Throwable t, SelectionKey key){
> - try{
> - cancelKey(key,false,true, true);
> - } catch (Throwable t2){
> - logger.log(Level.SEVERE,"CometSelector",t2);
> - }
> - if (logger.isLoggable(Level.FINEST)){
> - logger.log(Level.FINEST,"CometSelector",t);
> - }
> - }
> -
> -
> - /**
> - * Cancel the {_at_link SelectionKey} associated with a suspended connection.
> - */
> - protected boolean cancelKey(SelectionKey cometKey, boolean cancelMainKey,
> - boolean removeCometHandler, boolean notifyInterrupt){
> - if (cometKey == null){ //cometcontext.resume can give a null cometkey
> - return false;
> - }
> - boolean status = true;
> - CometTask cometTask = null;
> - // attach is only atomic since dolphin b06 , hence we must synchronize
> - // until we can require dolphin
> - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6436220
> - synchronized(cometKey){
> - cometTask = (CometTask) cometKey.attach(null);
> - if (cometTask != null){
> - //synchronizes internally on itself and canceledkeyset,
> - //we want hotspot to use lock coarsening.
> - cometKey.cancel();
> - }
> - }
> - status = cometTask != null;
> - if (status){
> - status = cometTask.getCometContext().interrupt(cometTask,
> - removeCometHandler, notifyInterrupt);
> - cometEngine.flushPostExecute(cometTask.getAsyncProcessorTask());
> -
> - if (cancelMainKey){
> - cometTask.getSelectorThread().cancelKey(cometTask.getSelectionKey());
> - }
> - }
> - return status;
> - }
> -
> - /**
> - * Register the {_at_link SelectionKey} to the {_at_link Selector}. We
> - * cannot register the {_at_link SelectionKey} directy on the
> - * {_at_link Selector} because there is a deadlock in the VM (see bug XXX).
> - */
> - public void registerKey(CometTask cometTask){
> - if (cometTask.getSelectionKey().isValid() && selector != null){
> - cometTask.setExpireTime(System.currentTimeMillis());
> - keysToRegister.offer(cometTask);
> - selector.wakeup();
> - }
> - }
> -
> -
> - /**
> - * Wakes up the {_at_link Selector}
> - */
> - public void wakeup(){
> - selector.wakeup();
> - }
> -
> - /**
> - * Return the SelectionKey associated with this channel.
> - */
> - public SelectionKey cometKeyFor(SelectableChannel channel){
> - return channel.keyFor(selector);
> - }
> -
> -}
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java (working copy)
> @@ -38,13 +38,15 @@
>
> package com.sun.grizzly.comet;
>
> +import java.io.Serializable;
> +
> /**
> * Simple event class used to pass information between {_at_link CometHandler}
> * and the Comet implementation.
> *
> * @author Jeanfrancois Arcand
> */
> -public class CometEvent<E> {
> +public class CometEvent<E> implements Serializable{
>
>
> /**
> @@ -98,7 +100,7 @@
> /**
> * The CometContext from where this instance was fired.
> */
> - private CometContext cometContext;
> + private transient CometContext cometContext;
>
>
> /**
> @@ -116,6 +118,12 @@
> this.type = type;
> this.cometContext = context;
> }
> +
> + public CometEvent(int type, CometContext cometContext, E attachment) {
> + this.type = type;
> + this.attachment = attachment;
> + this.cometContext = cometContext;
> + }
>
> /**
> * Return the <code>type</code> of this object.
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java (working copy)
> @@ -48,7 +48,7 @@
>
> /**
> * Default Notificationhandler that uses a thread pool dedicated to the CometEngine
> - * to execute the notification process.
> + * to execute the notification process.<br>
> *
> * @author Jeanfrancois Arcand
> * @author Gustav Trede
> @@ -75,7 +75,7 @@
> /**
> * only used if blockingnotification == false and threadpool != null
> */
> - private boolean spreadNotifyToManyToThreads = true;
> + private boolean spreadNotifyToManyToThreads = false;
>
> public DefaultNotificationHandler() {
> }
> @@ -104,8 +104,21 @@
> public void setBlockingNotification(boolean blockingNotification) {
> this.blockingNotification = blockingNotification;
> }
> +
> + /**
> + * if true a notify to Iterator<CometHandler> will be spread into one runnable task for
> + * each comethandler.
> + * if false , all comethandlers notify will be executed in 1 Runnable, after each other,
> + *
> + * @param spreadNotifyToManyToThreads
> + */
> + public void setSpreadNotifyToManyToThreads(boolean spreadNotifyToManyToThreads) {
> + this.spreadNotifyToManyToThreads = spreadNotifyToManyToThreads;
> + }
I think we don't need this method as setBlockingNotification does the same?
>
> -
> +
> +
> +
> /**
> * Notify all {_at_link CometHandler}.
> * @param cometEvent the CometEvent used to notify CometHandler
> @@ -162,49 +175,36 @@
> protected void notify0(CometEvent cometEvent,CometHandler cometHandler) {
> try{
> switch (cometEvent.getType()) {
> - case CometEvent.INTERRUPT:
> - if (cometHandler instanceof DefaultConcurrentCometHandler){
> - ((DefaultConcurrentCometHandler)cometHandler).shutdownQueue();
> - //todo how do we synchronize ?, the defaultConcurrentcomethandler can do that, but we dont know if other implementations do
> - cometHandler.onInterrupt(cometEvent);
> - }else
> - synchronized(cometHandler){
> - cometHandler.onInterrupt(cometEvent);
> - }
> - break;
> + case CometEvent.INTERRUPT:
> + cometHandler.onInterrupt(cometEvent); break;
> case CometEvent.NOTIFY:
> case CometEvent.READ:
> case CometEvent.WRITE:
> - if (cometHandler instanceof DefaultConcurrentCometHandler)
> + if (cometHandler instanceof DefaultConcurrentCometHandler){
> ((DefaultConcurrentCometHandler)cometHandler).EnQueueEvent(cometEvent);
> - else
> - if (cometEvent.getCometContext().isActive(cometHandler))
> + break;
> + }
> + if (cometEvent.getCometContext().isActive(cometHandler)){
> synchronized(cometHandler){
> cometHandler.onEvent(cometEvent);
> }
> + }
> break;
> case CometEvent.INITIALIZE:
> - cometHandler.onInitialize(cometEvent);
> - break;
> + cometHandler.onInitialize(cometEvent); break;
> case CometEvent.TERMINATE:
> - if (cometHandler instanceof DefaultConcurrentCometHandler){
> - ((DefaultConcurrentCometHandler)cometHandler).shutdownQueue();
> - cometHandler.onTerminate(cometEvent); //todo how do we synchronize ?, the defaultConcurrentcomethandler can do that, but we dont know if other implementations do
> - }else
> - synchronized(cometHandler){
> - cometHandler.onTerminate(cometEvent);
> - }
> - break;
> + synchronized(cometHandler){
> + cometHandler.onTerminate(cometEvent); break;
> + }
> default:
> throw ISEempty;
> }
> } catch (Throwable ex) {
> try {
> - cometEvent.getCometContext().resumeCometHandler(cometHandler, true);
> + cometEvent.getCometContext().resumeCometHandler(cometHandler);
> } catch (Throwable t) {
> logger.log(Level.FINE, "Resume phase failed: ", t);
> }
> - //todo cant log this at WARNING level.. its normal to have alot of failed notifications. imagine several K users in the real world..
> logger.log(Level.FINE, "Notification failed: ", ex);
Add isLOggable logic here (missed from my original commit).
> }
> }
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java (working copy)
> @@ -1,9 +1,9 @@
> /*
> - *
> + *
> * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
> - *
> + *
> * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
> - *
> + *
> * The contents of this file are subject to the terms of either the GNU
> * General Public License Version 2 only ("GPL") or the Common Development
> * and Distribution License("CDDL") (collectively, the "License"). You
> @@ -11,7 +11,7 @@
> * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
> * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
> * language governing permissions and limitations under the License.
> - *
> + *
> * When distributing the software, include this License Header Notice in each
> * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
> * Sun designates this particular file as subject to the "Classpath" exception
> @@ -20,9 +20,9 @@
> * Header, with the fields enclosed by brackets [] replaced by your own
> * identifying information: "Portions Copyrighted [year]
> * [name of copyright owner]"
> - *
> + *
> * Contributor(s):
> - *
> + *
> * If you wish your version of this file to be governed by only the CDDL or
> * only the GPL Version 2, indicate your decision by adding "[Contributor]
> * elects to include this software in this distribution under the [CDDL or GPL
> @@ -44,127 +44,179 @@
> import com.sun.grizzly.ProtocolChain;
> import com.sun.grizzly.arp.AsyncProcessorTask;
> import com.sun.grizzly.http.SelectorThread;
> -import com.sun.grizzly.util.InputReader;
> -import com.sun.grizzly.http.TaskBase;
> +import com.sun.grizzly.http.Task;
> +import com.sun.grizzly.util.SelectedKeyAttachmentLogic;
> import com.sun.grizzly.util.WorkerThread;
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.nio.channels.SelectionKey;
> import java.nio.channels.SocketChannel;
> import java.util.logging.Level;
> +import java.util.logging.Logger;
>
> /**
> * A {_at_link Task} implementation that allow Grizzly ARP to invokeCometHandler
> - * {_at_link CometHandler} when new data (bytes) are available from the
> + * {_at_link CometHandler} when new data (bytes) are available from the
> * {_at_link CometSelector}.
> *
> * @author Jeanfrancois Arcand
> + * @author Gustav Trede
> */
> -public class CometTask extends TaskBase{
> +public class CometTask extends SelectedKeyAttachmentLogic implements Runnable{
>
> -
> - public enum OP_EVENT { READ, WRITE }
> -
> -
> + private static final Logger logger = SelectorThread.logger();
> +
> /**
> - * The current non blocking operation.
> + * The {_at_link CometContext} associated with this instance.
> */
> - protected OP_EVENT upcoming_op = OP_EVENT.READ;
> -
> -
> + protected final CometContext cometContext;
> +
> /**
> - * The {_at_link CometContext} associated with this instance.
> + * The {_at_link CometHandler} associated with this task.
> */
> - private CometContext cometContext;
> -
> -
> + protected final CometHandler cometHandler;
> +
> /**
> - * The {_at_link CometSelector} .
> + * The {_at_link AsyncProcessorTask}
> */
> - private CometSelector cometSelector;
> + private AsyncProcessorTask asyncProcessorTask;
>
> -
> /**
> - * The time in milliseconds before this object was registered the
> - * {_at_link SelectionKey} on the {_at_link CometSelector}
> + * true if comethandler is registered for async IO in cometcontext.
> + * used to optmize:
> + * dont give simple read == -1 operations to thread pool
> */
> - private long expireTime ;
> + protected volatile boolean cometHandlerIsAsyncRegistered;
>
> -
> /**
> - * used by cometselector to optmize:
> - * dont give simple read == -1 operations to thread pool
> + * The current non blocking operation.
> */
> - private volatile boolean comethandlerisAsyncregistered;
> -
> + protected boolean upcoming_op_isread;
> +
> /**
> - * The InputStream used to read bytes from the {_at_link CometSelector}
> + * true if run() should call cometcontext.interrupt0
> */
> - private InputReader cometInputStream;
> -
> -
> + protected boolean callInterrupt;
> +
> /**
> - * The CometSelector registered key.
> + * true if interrupt should flushAPT
> */
> - private SelectionKey cometKey;
> + protected boolean interruptFlushAPT;
>
> +
> /**
> - * The {_at_link AsyncProcessorTask}
> + * New {_at_link CometTask}.
> */
> - private AsyncProcessorTask asyncProcessorTask;
> + public CometTask(CometContext cometContext, CometHandler cometHandler) {
> + this.cometContext = cometContext;
> + this.cometHandler = cometHandler;
> + }
>
> /**
> - * The {_at_link CometEvent} associated with this task.
> + * performs doTask() or cometContext.interrupt0
> */
> - private CometEvent event;
> -
> + public void run(){
> + if (callInterrupt){
> + cometContext.interrupt0(this, true, interruptFlushAPT, true);
> + }else{
> + try{
> + doTask();
> + } catch (IOException ex){
> + throw new RuntimeException(ex);
> + }
> + }
> + }
> +
> /**
> - * The {_at_link CometHandler} associated with this task.
> + * {_at_inheritDoc}
> */
> - private CometHandler cometHandler;
> + @Override
> + public long getIdleTimeoutDelay() {
> + return cometContext.getExpirationDelay();
> + }
>
> /**
> - * The CometWriter associated with this task.
> + * this should never be called for for comet, due to we are nulling the attachment
> + * and completely overriding the selector.select logic.<br>
> + * called by grizzly when the selectionkey is canceled and its socket closed.<br>
> + *
> + * @param selectionKey
> */
> - private CometWriter writer;
> -
> -
> + @Override
> + public void release(SelectionKey selectionKey) {
> + //logger.warning("cometTask.release() : isactive: "+cometContext.isActive(cometHandler)+" attachment:"+selectionKey.attachment());
> + //cometContext.interrupt(this, true, false,false, true);
> + }
> +
> /**
> - * The CometReader associated with this task.
> + * {_at_inheritDoc}
> */
> - private CometReader reader;
> -
> -
> + @Override
> + public boolean timedOut(SelectionKey key){
> + //System.err.println("cometTask.timedout() : isactive: "+cometContext.isActive(cometHandler)+" attachment:"+key.attachment());
> +
Remove System.err commented code or log it as FINE.
cometContext.interrupt(this, true, true, true, true);
> + return false;
> + }
> +
> /**
> - * <tt>true</tt> if the CometHandler has been registered for OP_READ
> - * events.
> - * false by default. java lang specification states that.
> + * {_at_inheritDoc}
> */
> - private boolean asyncReadSupported ;
> -
> -
> + @Override
> + public void handleSelectedKey(SelectionKey selectionKey) {
> + if (!selectionKey.isValid()){
> + cometContext.interrupt(this, true, false,true, true);
> + return;
> + }
> + if (cometHandlerIsAsyncRegistered){
> + if (selectionKey.isReadable()){
> + selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ));
> + upcoming_op_isread = true;
> + }
> + if (selectionKey.isWritable()){
> + selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_WRITE));
> + upcoming_op_isread = false;
> + }
> + asyncProcessorTask.getThreadPool().execute(this);
> + }
> + else{
> + checkIfClientClosedConnection(selectionKey);
> + }
> + }
> +
> /**
> - * Is this Task suspended.
> + * checks if client has closed the connection.
> + * the check is done by trying to read 1 byte that is trown away.
> + * only used for non async registered comethandler.
> + * @param mainKey
> */
> - private boolean isSuspended = false;
> -
> - /**
> - * New {_at_link CometTask}.
> - */
> - public CometTask() {
> + private void checkIfClientClosedConnection(SelectionKey mainKey) {
> + boolean connectionclosed = true;
> + try {
> + connectionclosed = ((SocketChannel)mainKey.channel()).
> + read(ByteBuffer.allocate(1)) == -1;
> + } catch (IOException ex) {
> +
> + }
> + finally{
> + if (connectionclosed){
> + cometContext.interrupt(this, true, false,true, true);
> + }else{
> + //cometContext.interrupt(this, false, false, true,false, true);
> + //System.err.println("**** ready key detected : "+mainKey.attachment() +" isactive:"+cometContext.isActive(cometHandler));
> +
Log the above?
}
> + }
> }
>
> -
> +
> /**
> * Notify the {_at_link CometHandler} that bytes are available for read.
> * The notification will invoke all {_at_link CometContext}
> */
> - public void doTask() throws IOException{
> + public void doTask() throws IOException{
> // The CometHandler has been resumed.
> if (!cometContext.isActive(cometHandler) ){
> return;
> }
> -
> /**
> * The CometHandler in that case is **always** invoked using this
> * thread so we can re-use the Thread attribute safely.
> @@ -172,41 +224,31 @@
> ByteBuffer byteBuffer = null;
> boolean connectionClosed = false;
> boolean clearBuffer = true;
> + final SelectionKey key = getSelectionKey();
> try{
> -
> - if (cometInputStream == null){
> - cometInputStream = new InputReader();
> - }
> -
> - cometInputStream.setSelectionKey(cometKey);
> byteBuffer = ((WorkerThread)Thread.currentThread()).getByteBuffer();
> if (byteBuffer == null){
> - byteBuffer = ByteBuffer.allocate(selectorThread.getBufferSize());
> + byteBuffer = ByteBuffer.allocate(asyncProcessorTask.getSelectorThread().getBufferSize());
> ((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer);
> } else {
> byteBuffer.clear();
> }
>
> - cometInputStream.setByteBuffer(byteBuffer);
> - SocketChannel socketChannel = (SocketChannel)cometKey.channel();
> - if (upcoming_op == OP_EVENT.READ){
> + SocketChannel socketChannel = (SocketChannel)key.channel();
> + if (upcoming_op_isread){
> /*
> * We must execute the first read to prevent client abort.
> - */
> - int nRead = socketChannel.read(byteBuffer);
> + */
> + int nRead = socketChannel.read(byteBuffer);
> if (nRead == -1 ){
> connectionClosed = true;
> - } else {
> - /*
> + } else {
> + /*
> * This is an HTTP pipelined request. We need to resume
> - * the continuation and invoke the http parsing
> + * the continuation and invoke the http parsing
> * request code.
> */
> - if (!asyncReadSupported){
> - // Don't let the main Selector (SelectorThread) starts
> - // handling the pipelined request.
> - key.attach(Long.MIN_VALUE);
> -
> + if (!cometHandlerIsAsyncRegistered){
> /**
> * Something when wrong, most probably the CometHandler
> * has been resumed or removed by the Comet implementation.
> @@ -214,57 +256,57 @@
> if (!cometContext.isActive(cometHandler)){
> return;
> }
> -
> +
> // Before executing, make sure the connection is still
> - // alive. This situation happens with SSL and there
> + // alive. This situation happens with SSL and there
> // is not a cleaner way fo handling the browser closing
> // the connection.
> - nRead = socketChannel.read(byteBuffer);
> + nRead = socketChannel.read(byteBuffer);
> if (nRead == -1){
> connectionClosed = true;
> return;
> }
> -
> - cometContext.resumeCometHandler(cometHandler, false);
> + //resume without remove:
> + try{
> + cometHandler.onInterrupt(cometContext.eventInterrupt);
> + }catch(IOException e) { }
> + CometEngine.cometEngine.flushPostExecute(this,true,false);
> +
> clearBuffer = false;
> -
> +
> Controller controller = getSelectorThread().getController();
> - ProtocolChain protocolChain =
> + ProtocolChain protocolChain =
> controller.getProtocolChainInstanceHandler().poll();
> - NIOContext ctx = (NIOContext)controller.pollContext(key);
> + NIOContext ctx = (NIOContext)controller.pollContext(key);
> ctx.setController(controller);
> ctx.setSelectionKey(key);
> ctx.setProtocolChain(protocolChain);
> ctx.setProtocol(Protocol.TCP);
> - protocolChain.execute(ctx);
> + protocolChain.execute(ctx);
> } else {
> - byteBuffer.flip();
> - reader = new CometReader();
> + byteBuffer.flip();
> + CometReader reader = new CometReader();
> reader.setNRead(nRead);
> reader.setByteBuffer(byteBuffer);
> - if (event == null)
> - event = new CometEvent();
> - event.type = CometEvent.READ;
> + CometEvent event = new CometEvent(CometEvent.READ,cometContext);
> event.attach(reader);
> cometContext.invokeCometHandler(event,cometHandler);
> reader.setByteBuffer(null);
> -
> +
> // This Reader is now invalid. Any attempt to use
> // it will results in an IllegalStateException.
> reader.setReady(false);
> }
> }
> - } else if (upcoming_op == OP_EVENT.WRITE){
> - if (event == null)
> - event = new CometEvent();
> - event.type = CometEvent.WRITE;
> - writer = new CometWriter();
> + } else {
> + CometEvent event = new CometEvent(CometEvent.WRITE,cometContext);
> + CometWriter writer = new CometWriter();
> writer.setChannel(socketChannel);
> event.attach(writer);
> - cometContext.invokeCometHandler(event,cometHandler);
> -
> + cometContext.invokeCometHandler(event,cometHandler);
> +
> // This Writer is now invalid. Any attempt to use
> - // it will results in an IllegalStateException.
> + // it will results in an IllegalStateException.
> writer.setReady(false);
> }
> } catch (IOException ex){
> @@ -275,170 +317,84 @@
> }
> } catch (Throwable t){
> connectionClosed = true;
> - SelectorThread.logger().log(Level.SEVERE,"Comet exception",t);
> - } finally {
> + SelectorThread.logger().log(Level.SEVERE,"Comet exception",t);
> + } finally {
> + cometHandlerIsAsyncRegistered = false;
> +
> // Bug 6403933
> if (connectionClosed){
> - cometSelector.cancelKey(cometKey,true,true, true);
> + asyncProcessorTask.getSelectorThread().cancelKey(key);
> }
> -
> +
> if (clearBuffer && byteBuffer != null){
> byteBuffer.clear();
> }
> - asyncReadSupported = false;
> }
> }
>
> - public void setComethandlerisAsyncregistered(boolean comethandlerisAsyncregistered) {
> - this.comethandlerisAsyncregistered = comethandlerisAsyncregistered;
> + /**
> + * sets the comettask async interest flag in the comettask
> + * @param
> + */
> + public void setComethandlerIsAsyncRegistered(boolean cometHandlerIsAsyncRegistered) {
> + this.cometHandlerIsAsyncRegistered = cometHandlerIsAsyncRegistered;
> }
>
> - public boolean isComethandlerisAsyncregistered() {
> - return comethandlerisAsyncregistered;
> - }
> -
> /**
> - * returns true if the CometHandler has not been resumed / removed.
> - * allows cometSelector to do a fast check before leting threadpool execute the comettask
> + * returns true if the comethandler is registered for async io
> * @return
> */
> - public boolean cometHandlerNotResumed(){
> - return cometContext.isActive(cometHandler);
> + public boolean isComethandlerAsyncRegistered() {
> + return cometHandlerIsAsyncRegistered;
> }
> -
> +
> /**
> * Return the {_at_link CometContext} associated with this instance.
> - * @return CometContext the {_at_link CometContext} associated with this
> + * @return CometContext the {_at_link CometContext} associated with this
> * instance.
> */
> public CometContext getCometContext() {
> return cometContext;
> }
> -
> -
> - /**
> - * Set the {_at_link CometContext} used to invokeCometHandler {_at_link CometHandler}.
> - * @param cometContext the {_at_link CometContext} used to invokeCometHandler {_at_link CometHandler}
> - */
> - public void setCometContext(CometContext cometContext) {
> - this.cometContext = cometContext;
> - }
>
> -
> /**
> - * Recycle this object.
> + * returns the {_at_link AsyncProcessorTask }
> + * @return {_at_lnk AsyncProcessorTask }
> */
> - @Override
> - public void recycle(){
> - isSuspended = false;
> - key = null;
> - cometContext = null;
> - asyncReadSupported = false;
> - if(cometInputStream != null) {
> - cometInputStream.recycle();
> - }
> + public AsyncProcessorTask getAsyncProcessorTask() {
> + return asyncProcessorTask;
> }
>
> -
> /**
> - * Return the {_at_link CometSelector}
> - * @return CometSelector the {_at_link CometSelector}
> + * sets the {_at_link AsyncProcessorTask }
> + * @param {_at_link AsyncProcessorTask }
> */
> - public CometSelector getCometSelector() {
> - return cometSelector;
> + public void setAsyncProcessorTask(AsyncProcessorTask asyncProcessorTask) {
> + this.asyncProcessorTask = asyncProcessorTask;
> }
>
> -
> /**
> - * Set the {_at_link CometSelector}
> - * @param cometSelector the {_at_link CometSelector}
> - */
> - public void setCometSelector(CometSelector cometSelector) {
> - this.cometSelector = cometSelector;
> - }
> -
> -
> - /**
> - * Return the time in milliseconds before this object was registered the
> - * {_at_link SelectionKey} on the {_at_link CometSelector}
> - * @return long Return the time in milliseconds before this object was
> - * registered the {_at_link SelectionKey} on the
> - * {_at_link CometSelector}
> + * returns selectionkey
> + * @return
> */
> - public long getExpireTime() {
> - return expireTime;
> + public SelectionKey getSelectionKey() {
> + return asyncProcessorTask.getAsyncExecutor().getProcessorTask().getSelectionKey();
> }
>
> -
> /**
> - * Set the time in milliseconds before this object was registered the
> - * {_at_link SelectionKey} on the {_at_link CometSelector}
> - * @param expireTime Return the time in milliseconds before this object was
> - * registered the {_at_link SelectionKey} on the
> - * {_at_link CometSelector}
> - */
> - public void setExpireTime(long expireTime) {
> - this.expireTime = expireTime;
> - }
> -
> -
> - /**
> - * Return the {_at_link CometSelector}'s {_at_link SelectionKey}.
> + * returns the {_at_link AsyncProcessorTask }
> + * @return {_at_link AsyncProcessorTask }
> */
> - public SelectionKey getCometKey() {
> - return cometKey;
> + private SelectorThread getSelectorThread(){
> + return asyncProcessorTask.getSelectorThread();
> }
>
> -
> /**
> - * Set the {_at_link CometSelector}'s {_at_link SelectionKey}.
> + * returns the {_at_link CometHandler }
> + * @return {_at_link CometHandler }
> */
> - public void setCometKey(SelectionKey cometKey) {
> - this.cometKey = cometKey;
> - }
> -
> -
> - public boolean isAsyncReadSupported() {
> - return asyncReadSupported;
> - }
> -
> -
> - public void setAsyncReadSupported(boolean asyncReadSupported) {
> - this.asyncReadSupported = asyncReadSupported;
> - }
> -
> - /**
> - * Return true if cometContext.getExpirationDelay() != -1
> - * && timestamp - expireTime >= cometContext.getExpirationDelay();
> - * @param timestamp
> - * @return
> - */
> - protected boolean hasExpired(long timestamp){
> - long expdelay = cometContext.getExpirationDelay();
> - return expdelay != -1 && timestamp - expireTime >= expdelay;
> - }
> -
> - public AsyncProcessorTask getAsyncProcessorTask() {
> - return asyncProcessorTask;
> - }
> -
> - public void setAsyncProcessorTask(AsyncProcessorTask asyncProcessorTask) {
> - this.asyncProcessorTask = asyncProcessorTask;
> - }
> -
> public CometHandler getCometHandler() {
> return cometHandler;
> }
>
> - public void setCometHandler(CometHandler cometHandler) {
> - this.cometHandler = cometHandler;
> - }
> -
> - public boolean isSuspended() {
> - return isSuspended;
> - }
> -
> - public void setSuspended(boolean isSuspended) {
> - this.isSuspended = isSuspended;
> - }
> }
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (working copy)
> @@ -40,6 +40,7 @@
>
> import com.sun.grizzly.comet.concurrent.DefaultConcurrentCometHandler;
> import com.sun.grizzly.http.SelectorThread;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.nio.channels.SelectionKey;
> import java.util.Iterator;
> @@ -100,7 +101,7 @@
> * is doing. It is not recommended to use attributes if this
> * {_at_link CometContext} is not shared amongs multiple
> * context path (uses {_at_link HttpServletSession} instead).
> - * </p>
> + * </p>
> * @author Jeanfrancois Arcand
> * @author Gustav Trede
> */
> @@ -135,13 +136,6 @@
>
>
> /**
> - * The {_at_link CometSelector} used to register {_at_link SelectionKey}
> - * for upcoming bytes.
> - */
> - protected CometSelector cometSelector;
> -
> -
> - /**
> * The {_at_link CometContext} continuationType. See {_at_link CometEngine}
> */
> protected int continuationType = CometEngine.AFTER_SERVLET_PROCESSING;
> @@ -151,14 +145,14 @@
> * The default delay expiration before a {_at_link CometContext}'s
> * {_at_link CometHandler} are interrupted.
> */
> - private long expirationDelay = 30 * 1000;
> + private long expirationDelay ;
>
>
> /**
> * <tt>true</tt> if the caller of {_at_link #notify} should block when
> * notifying other CometHandler.
> */
> - protected boolean blockingNotification = false;
> + protected boolean blockingNotification;
>
>
> /**
> @@ -168,23 +162,20 @@
>
>
> /**
> - * timestamp for last performed resetSuspendidletimeout.
> + * timestamp for next idlecheck
> * used to limit the frequency of actual performed resets.
> */
> - private volatile long lastIdleReset;
> + private volatile long nextidleclear;
nextIdleClear
>
> /**
> - * Current associated list of {_at_link CometTask}
> - */
> - protected final ConcurrentHashMap<CometTask,Object> activeTasks;
> -
> - /**
> * The list of registered {_at_link CometHandler}
> */
> - protected final ConcurrentHashMap<CometHandler,SelectionKey> handlers;
> + protected final ConcurrentHashMap<CometHandler,CometTask> handlers;
>
> - private final CometEvent eventInterrupt;
> + protected final CometEvent eventInterrupt;
>
> + protected final CometEvent eventTerminate;
> +
> private final CometEvent eventInitialize;
>
> private static final IllegalStateException ISE = new IllegalStateException(INVALID_COMET_HANDLER);
> @@ -193,26 +184,34 @@
> new IllegalStateException("Make sure you have enabled Comet " +
> "or make sure the Thread invoking that method is the same " +
> "as the Servlet.service() Thread.");
> -
> - // ---------------------------------------------------------------------- //
> -
> -
> +
> /**
> * Create a new instance
> * @param topic the context path
> * @param type when the Comet processing will happen (see {_at_link CometEngine}).
> */
> public CometContext(String topic, int continuationType) {
> - this.topic = topic;
> + this.topic = topic;
> this.continuationType = continuationType;
> - this.attributes = new ConcurrentHashMap();
> - this.handlers = new ConcurrentHashMap<CometHandler,SelectionKey>(16,0.75f,64);
> - this.activeTasks = new ConcurrentHashMap<CometTask,Object>(16,0.75f,64);
> - this.eventInterrupt = new CometEvent<E>(CometEvent.INTERRUPT,this);
> - this.eventInitialize = new CometEvent<E>(CometEvent.INITIALIZE,this);
> + this.attributes = new ConcurrentHashMap();
> + this.handlers = new ConcurrentHashMap<CometHandler,CometTask>(8,0.75f,64);
> + this.eventInterrupt = new CometEvent<CometContext>(CometEvent.INTERRUPT,this);
> + this.eventInitialize = new CometEvent<CometContext>(CometEvent.INITIALIZE,this);
> + this.eventTerminate = new CometEvent<CometContext>(CometEvent.TERMINATE,this);
> + initDefaultValues();
> }
>
> /**
> + * init of default values.
> + * used by constructor and the cache recycle mechanism
> + */
> + private void initDefaultValues() {
> + blockingNotification = false;
> + expirationDelay = 30*1000;
> + nextidleclear = 0;
> + }
> +
> + /**
> * Get the context path associated with this instance.
> * @return topic the context path associated with this instance
> * @deprecated - use getTopic.
> @@ -289,14 +288,11 @@
> if (!CometEngine.getEngine().isCometEnabled()){
> throw cometNotEnabled;
> }
> - handlers.put(handler, CometEngine.dumykey);
> - // is it ok that we only manage one adcomethandler call ?
> - CometTask cometTask = new CometTask();
> - cometTask.setCometContext(this);
> - cometTask.setCometHandler(handler);
> - cometTask.setSuspended(alreadySuspended);
> + // is it ok that we only manage one addcomethandler call per thread ?
Can you elaborate?
> + // else we can use a list of handlers to add inside tlocal
> + CometTask cometTask = new CometTask(this,handler);
> + cometTask.upcoming_op_isread = alreadySuspended;
> CometEngine.updatedContexts.set(cometTask);
> -
> return handler.hashCode();
> }
>
> @@ -316,6 +312,7 @@
> /**
> * Retrieve a {_at_link CometHandler} using its based on its {_at_link CometHandler#hashCode};
> */
> + @Deprecated
Why and with what it is replaced?
> public CometHandler getCometHandler(int hashCode){
> for (CometHandler handler:handlers.keySet()){
> if (handler.hashCode() == hashCode )
> @@ -328,20 +325,28 @@
> * Recycle this object.
> */
> public void recycle(){
> + try{
> + notify(this,CometEvent.TERMINATE);
> + } catch (IOException ex) {
> +
> + }
I like that one!
> handlers.clear();
> attributes.clear();
> - activeTasks.clear();
> topic = null;
> + notificationHandler = null;
> + initDefaultValues();
> + // add check for datastructure size, if cometcontext had large
> + // datastructes its probably not optimal to waste RAM with caching it
> + CometEngine.cometEngine.cometContextCache.offer(this);
> }
>
>
> /**
> - * adds a {_at_link CometHandler} to the active set
> - * @param handler {_at_link CometHandler}
> - * @param cometKey {_at_link SelectionKey}
> + * adds a {_at_link CometTask} to the active set
> + * @param cometTask {_at_link CometTask}
> */
> - protected void addActiveHandler(CometHandler handler, SelectionKey cometKey){
> - handlers.put(handler, cometKey);
> + protected void addActiveHandler(CometTask cometTask){
> + handlers.put(cometTask.cometHandler, cometTask);
> }
>
> /**
> @@ -358,7 +363,9 @@
> if (cometHandler instanceof DefaultConcurrentCometHandler){
> ((DefaultConcurrentCometHandler)cometHandler).EnQueueEvent(event);
> }else{
> - cometHandler.onEvent(event);
> + synchronized(cometHandler){
> + cometHandler.onEvent(event);
> + }
> }
> }
>
> @@ -385,12 +392,10 @@
> * @return <tt>true</tt> if the operation succeeded.
> */
> public boolean removeCometHandler(CometHandler handler,boolean resume){
> - CometEngine.updatedContexts.set(null);
> - SelectionKey key = handlers.remove(handler);
> - if (key != null){
> + CometTask task = handlers.remove(handler);
> + if (task != null){
> if (resume){
> - CometEngine.getEngine().flushPostExecute(
> - ((CometTask)key.attachment()).getAsyncProcessorTask());
> + CometEngine.getEngine().flushPostExecute(task,true,false);
> }
> return true;
> }
> @@ -405,22 +410,18 @@
> * @param hashCode The hashcode of the CometHandler to remove.
> * @return <tt>true</tt> if the operation succeeded.
> */
> + @Deprecated
Replaced with? Add it to the documentation
> public boolean removeCometHandler(int hashCode){
> - CometEngine.updatedContexts.set(null);
> - Iterator<CometHandler> iterator = handlers.keySet().iterator();
> - CometHandler handler = null;
> - while (iterator.hasNext()){
> - handler = iterator.next();
> + CometHandler handler_ = null;
> + for (CometHandler handler:handlers.keySet()){
> if (handler.hashCode() == hashCode){
> - SelectionKey key = handlers.get(handler);
> - if (key == null){
> - logger.warning(ALREADY_REMOVED);
> - return false;
> - }
> - iterator.remove();
> - return true;
> - }
> + handler_ = handler;
> + break;
> + }
> }
> + if (handler_ != null){
> + return handlers.remove(handler_) != null;
> + }
> return false;
> }
>
> @@ -435,70 +436,68 @@
> * @return <tt>true</tt> if the operation succeeded.
> */
> public boolean resumeCometHandler(CometHandler handler){
> - return resumeCometHandler(handler,true);
> - }
> -
> -
> - /**
> - * Resume the suspended response. A response can only be suspended when
> - * {_at_link CometContext#addCometHandler} was called first.
> - *
> - * @param handler The CometHandler associated with the current continuation.
> - * @param remove true if the CometHandler needs to be removed.
> - * @return <tt>true</tt> if the operation succeeded.
> - */
> - protected boolean resumeCometHandler(CometHandler handler, boolean remove){
> - CometEngine.updatedContexts.set(null);
> - boolean b= cometSelector.cancelKey(handlers.get(handler), false, remove, false);
> -
> - // Try a second time to locate the associated CometTask
> - if (!b){
> - for (CometTask cometTask:activeTasks.keySet()){
> - if (cometTask.getCometHandler() == handler){
> - interrupt(cometTask, remove, false);
> - CometEngine.getEngine().flushPostExecute(
> - cometTask.getAsyncProcessorTask());
> - activeTasks.remove(cometTask);
> - return true;
> - }
> - }
> + boolean status = interrupt(handlers.get(handler),false,true,false,false);
> + if (status){
> + try {
> + handler.onTerminate(eventTerminate);
> + } catch (IOException ex) { }
> }
> - return b;
> + return status;
> }
>
> /**
> * Interrupt a {_at_link CometHandler} by invoking {_at_link CometHandler#onInterrupt}
> */
> - protected boolean interrupt(CometTask task,boolean removeCometHandler,
> - boolean notifyInterrupt) {
> -
> - boolean status = true;
> - try{
> - if (removeCometHandler){
> - status = (handlers.remove(task.getCometHandler()) != null);
> - if (status && notifyInterrupt){
> - task.getCometHandler().onInterrupt(eventInterrupt);
> - }else{
> - logger.fine(ALREADY_REMOVED);
> + protected boolean interrupt(final CometTask task,
> + final boolean notifyInterrupt, final boolean flushAPT,
> + final boolean cancelkey, boolean asyncExecution) {
> + if (task != null && handlers.remove(task.cometHandler) != null){
> + final SelectionKey key = task.getSelectionKey();
> + // setting attachment non asynced to ensure grizzly dont keep calling us
> + key.attach(System.currentTimeMillis());
> + if (asyncExecution){
> + if (cancelkey){
> + // dont want to do that in non selector thread:
> + // canceled key wont get canceled again due to isvalid check
> + key.cancel();
> }
> + task.callInterrupt = true;
> + task.interruptFlushAPT = flushAPT;
> + //((WorkerThreadImpl)Thread.currentThread()).
> + // getPendingIOhandler().addPendingIO(task);
Remove commented code.
> + task.run();
> +
> + }else{
> + interrupt0(task, notifyInterrupt, flushAPT, cancelkey);
> }
> - } catch (Throwable ex){
> - status = false;
> - logger.log(Level.FINE,"Unable to interrupt",ex);
> - }finally{
> - activeTasks.remove(task);
> - return status;
> + return true;
> }
> + return false;
> }
> -
> +
> /**
> + * interrupt logic in its own method, so it can be executed either async or sync.<br>
Interrup (do we need the <br> at the end?
> + * cometHandler.onInterrupt is performed async due to its functionality is unknown,
> + * hence not safe to run in the performance critical selector thread.
> + */
> + protected void interrupt0(CometTask task,
> + boolean notifyInterrupt, boolean flushAPT, boolean cancelkey){
> + if (notifyInterrupt){
> + try{
> + task.cometHandler.onInterrupt(eventInterrupt);
> + }catch(IOException e) { }
Log the exception FINEST?
> + }
> + CometEngine.cometEngine.flushPostExecute(task,flushAPT,cancelkey);
> + }
> +
> + /**
> * Return true if this {_at_link CometHandler} is still active, e.g. there is
> * still a suspended connection associated with it.
> *
> * @return true
> */
> public boolean isActive(CometHandler handler){
> - return handlers.containsKey(handler) || CometEngine.updatedContexts.get() != null;
> + return handlers.containsKey(handler);
> }
>
> /**
> @@ -507,7 +506,7 @@
> * of type NOTIFY.
> * @param attachment An object shared amongst {_at_link CometHandler}.
> */
> - public void notify(final E attachment) throws IOException{
> + public void notify(final Object attachment) throws IOException{
> notify(attachment, CometEvent.NOTIFY);
> }
>
> @@ -527,7 +526,7 @@
> * @param cometHandlerID Notify a single CometHandler.
> * @deprecated - use notify(attachment,eventType,CometHandler;
> */
> - public void notify(final E attachment,final int eventType,final int cometHandlerID)
> + public void notify(final Object attachment,final int eventType,final int cometHandlerID)
> throws IOException{
> notify(attachment,eventType,getCometHandler(cometHandlerID));
> }
> @@ -537,7 +536,7 @@
> * @param attachment An object shared amongst {_at_link CometHandler}.
> * @param {_at_link CometHandler} to notify.
> */
> - public void notify(final E attachment,final CometHandler cometHandler)
> + public void notify(final Object attachment,final CometHandler cometHandler)
> throws IOException{
> notify(attachment,CometEvent.NOTIFY,cometHandler);
> }
> @@ -556,21 +555,20 @@
> * @param type The type of notification.
> * @param {_at_link CometHandler} to notify.
> */
> - public void notify(final E attachment,final int eventType,final CometHandler cometHandler)
> + public void notify(final Object attachment,final int eventType,final CometHandler cometHandler)
> throws IOException{
> if (cometHandler == null){
> throw ISE;
> }
> - CometEvent event = new CometEvent<E>(eventType,this);
> - event.attach(attachment);
> + CometEvent event = new CometEvent(eventType,this,attachment);
> notificationHandler.setBlockingNotification(blockingNotification);
> notificationHandler.notify(event,cometHandler);
> - if (event.getType() == CometEvent.TERMINATE
> + if (event.getType() == CometEvent.TERMINATE
> || event.getType() == CometEvent.INTERRUPT) {
> resumeCometHandler(cometHandler);
> } else {
> resetSuspendIdleTimeout();
> - }
> + }
> }
>
>
> @@ -588,14 +586,12 @@
> * @param attachment An object shared amongst {_at_link CometHandler}.
> * @param type The type of notification.
> */
> - public void notify(final E attachment,final int eventType)
> - throws IOException{
> - CometEvent event = new CometEvent<E>(eventType,this);
> - event.attach(attachment);
> + public void notify(Object attachment,int eventType)throws IOException {
> + CometEvent event = new CometEvent(eventType,this,attachment);
> Iterator<CometHandler> iterator = handlers.keySet().iterator();
> notificationHandler.setBlockingNotification(blockingNotification);
> notificationHandler.notify(event,iterator);
> - if (event.getType() == CometEvent.TERMINATE
> + if (event.getType() == CometEvent.TERMINATE
> || event.getType() == CometEvent.INTERRUPT) {
> while(iterator.hasNext()){
> resumeCometHandler(iterator.next());
> @@ -625,16 +621,18 @@
> protected void resetSuspendIdleTimeout(){
> if (expirationDelay != -1){
> long timestamp = System.currentTimeMillis();
> - // not threadsafe, but that will only lead to a few extra idle checks.
> - // it will still be a major win.
> - if (timestamp - lastIdleReset >= 1000){
> - lastIdleReset = timestamp;
> - for (CometTask cometTask:activeTasks.keySet()){
> - cometTask.setExpireTime(timestamp);
> + if (timestamp > nextidleclear){
> + synchronized(handlers){
> + if (timestamp > nextidleclear){
> + nextidleclear = timestamp+1000;
> + for (CometTask cometTask:handlers.values()){
> + cometTask.setTimeout(timestamp);
> + }
> + }
> }
> }
> }
> - }
> + }
>
>
> /**
> @@ -675,38 +673,22 @@
> * @return true if the operation worked.
> */
> private boolean doAsyncRegister(CometHandler handler, int interest){
> - SelectionKey cometKey = null;
> if (handler != null) {
> - cometKey = handlers.get(handler);
> - }
> - if (handler == null || cometKey == null) {
> - throw ISE;
> - }
> -
> - CometTask cometTask = (CometTask)cometKey.attachment();
> - if (cometTask != null){
> - cometKey.interestOps(cometKey.interestOps() | interest);
> - if (interest == SelectionKey.OP_READ){
> - cometTask.setAsyncReadSupported(true);
> + CometTask task = handlers.get(handler);
> + if (task != null) {
> + SelectionKey mainKey = task.getSelectionKey();
> + if (mainKey != null){
> + mainKey.interestOps(mainKey.interestOps() | interest);
> + task.setComethandlerIsAsyncRegistered(true);
> + return true;
> + }
> }
> - cometTask.setComethandlerisAsyncregistered(true);
> - return true;
> }
> throw ISE;
> }
>
>
> /**
> - * Set the {_at_link CometSelector} associated with this instance.
> - * @param CometSelector the {_at_link CometSelector} associated with
> - * this instance.
> - */
> - protected void setCometSelector(CometSelector cometSelector) {
> - this.cometSelector = cometSelector;
> - }
> -
> -
> - /**
> * Helper.
> */
> @Override
> @@ -741,19 +723,9 @@
> public Set<CometHandler> getCometHandlers(){
> return handlers.keySet();
> }
> +
>
> -
> -
> /**
> - * Add a {_at_link CometTask} to the active list.
> - * @param cometTask
> - */
> - protected void addActiveCometTask(CometTask cometTask){
> - activeTasks.put(cometTask,Boolean.TRUE);
> - }
> -
> -
> - /**
> * Return <tt>true</tt> if the invoker of {_at_link #notify} should block when
> * notifying Comet Handlers.
> */
> @@ -788,13 +760,4 @@
> return notificationHandler;
> }
>
> - /**
> - * Return the current set of active {_at_link CometTask}
> - * @return
> - */
> - protected Set<CometTask> getActiveTasks() {
> - return activeTasks.keySet();
> - }
> -
> }
> -
> Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java
> ===================================================================
> --- modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java (revision 2805)
> +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java (working copy)
> @@ -41,12 +41,14 @@
> import com.sun.grizzly.arp.AsyncTask;
> import com.sun.grizzly.http.SelectorThread;
> import com.sun.grizzly.arp.AsyncProcessorTask;
> +import com.sun.grizzly.http.ProcessorTask;
> import com.sun.grizzly.util.LinkedTransferQueue;
> +import com.sun.grizzly.util.TestThreadPool;
Can we rename the ThreadPool? I don't like seeing TestThreadPool inside
the code.
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.nio.channels.SelectableChannel;
> import java.nio.channels.SelectionKey;
> import java.nio.channels.Selector;
> -import java.util.Set;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.LinkedBlockingQueue;
> @@ -123,7 +125,7 @@
> /**
> * The single instance of this class.
> */
> - private final static CometEngine cometEngine = new CometEngine();
> + protected final static CometEngine cometEngine = new CometEngine();
>
>
> /**
> @@ -133,11 +135,6 @@
>
>
> /**
> - * The {_at_link CometSelector} used to poll requests.
> - */
> - protected final CometSelector cometSelector;
> -
> - /**
> * cached CometContexts
> */
> protected final LinkedTransferQueue<CometContext> cometContextCache;
> @@ -154,7 +151,7 @@
> */
> protected final static ThreadLocal<CometTask> updatedContexts = new ThreadLocal<CometTask>();
>
> - protected static final SelectionKey dumykey = new SelectionKey() {
> + private static final SelectionKey dumykey = new SelectionKey() {
> public SelectableChannel channel() {throw ISE;}
> public int interestOps() {throw ISE;}
> public SelectionKey interestOps(int ops) {throw ISE;}
> @@ -165,33 +162,31 @@
> };
>
> /**
> - * Creat a singleton and initialize all lists required. Also create and
> - * start the {_at_link CometSelector}
> + * Creat a singleton and initialize all lists required.
> */
> protected CometEngine() {
> - cometSelector = new CometSelector(this);
> - try{
> - cometSelector.start();
> - } catch(InterruptedException ex){
> - logger.log(Level.SEVERE,"Unable to start CometSelector",ex);
> - }
> -
> cometContextCache = new LinkedTransferQueue<CometContext>();
> activeContexts = new ConcurrentHashMap<String,CometContext>(16,0.75f,64);
>
> - ThreadPoolExecutor tpe = new ThreadPoolExecutor(
> - 8,
> + /*ExecutorService tpe = new ThreadPoolExecutor(
Remove, no commented code :-)
> 64,
> + 64,
> 30L,
> TimeUnit.SECONDS,
> - new LinkedBlockingQueue<Runnable>(),
> + //new LinkedTransferQueue(),
> + new LinkedBlockingQueue<Runnable>(),
> new ThreadFactory() {
> private final AtomicInteger counter = new AtomicInteger();
> public Thread newThread(Runnable r) {
> - return new Thread(r, "CometWorker-"+counter.incrementAndGet());
> + //return new Thread(r, "CometWorker-"+counter.incrementAndGet());
> + return new WorkerThreadImpl(null, "CometWorker-"+counter.incrementAndGet(), r, 0);
> }
> - });
> - //tpe.allowCoreThreadTimeOut(true);
> + }); */
> +
> + //ExecutorService tpe = threadPool = new DefaultExecutorService(4, 8, 30,
> + // TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), "CometWorker-");
> + ExecutorService tpe = new com.sun.grizzly.util.FixedThreadPool(8,"CometWorker");
> + //ExecutorService tpe = new NewDefaultThreadPool("CometWorker",4,64,15,TimeUnit.SECONDS);
Remove all of the above
> threadPool = tpe;
> }
>
> @@ -215,6 +210,7 @@
>
> /**
> * sets the default threadpool that DefaultNotificationHandler use.
> + * shuttdownnow is called on the existing threadpool.
Shut
> * does notupdate existing notificationhandlers
> */
> public void setThreadPool(ExecutorService threadPool) {
> @@ -224,7 +220,15 @@
> }
> }
>
> + /**
> + * returns the threadpool comet is using
Return
> + * @return ExecutorService
> + */
> + public ExecutorService getThreadPool() {
> + return threadPool;
> + }
>
> +
> /**
> * Unregister the {_at_link CometHandler} to the list of the
> * {_at_link CometContext}. Invoking this method will invoke all
> @@ -238,18 +242,7 @@
> public CometContext unregister(String topic){
> CometContext cometContext = activeContexts.remove(topic);
> if (cometContext != null){
> - try{
> - cometContext.notify(cometContext,CometEvent.TERMINATE);
> - } catch (IOException ex) {}
> - Set<CometTask> tasks = cometContext.getActiveTasks();
> - for (CometTask cometTask: tasks){
> - // does this work ? the notify above might be async.
> - flushResponse(cometTask.getAsyncProcessorTask());
> - }
> - //TODO: add check for datastructure size, if cometcontext had large
> - // datastructes its probably not optimal to waste RAM with caching it
> - cometContext.recycle();
> - cometContextCache.offer(cometContext);
> + cometContext.recycle();
> }
> return cometContext;
> }
> @@ -279,7 +272,7 @@
> * {_at_link AFTER_SERVLET_PROCESSING} or {_at_link AFTER_RESPONSE_PROCESSING}
> * @return CometContext a configured {_at_link CometContext}.
> */
> - public CometContext register(String topic, int type){
> + public CometContext register(String topic, int type){
> return register(topic, type, DefaultNotificationHandler.class);
> }
>
> @@ -287,14 +280,13 @@
> * Instanciate a new {_at_link CometContext}.
> * @param topic the topic the new {_at_link CometContext} will represent.
> * @param type when the request will be suspended, e.g. {_at_link BEFORE_REQUEST_PROCESSING},
> - * {_at_link AFTER_SERVLET_PROCESSING} or {_at_link AFTER_RESPONSE_PROCESSING}
> + * {_at_link AFTER_SERVLET_PROCESSING} or {_at_link AFTER_RESPONSE_PROCESSING}
> * @return a new {_at_link CometContext} if not already created, or the
> * existing one.
> */
> public CometContext register(String topic, int type,
> Class<? extends NotificationHandler> notificationClass ) {
> -
> - // Double checked locking used used to prevent the otherwise static/global
> + // Double checked locking used used to prevent the otherwise static/global
> // locking, cause example code does heavy usage of register calls
> // for existing topics from http get calls etc.
> CometContext cometContext = activeContexts.get(topic);
> @@ -303,30 +295,31 @@
> cometContext = activeContexts.get(topic);
> if (cometContext == null){
> cometContext = cometContextCache.poll();
> - if (cometContext != null)
> + if (cometContext != null){
> cometContext.topic = topic;
> - if (cometContext == null){
> + }else{
> cometContext = new CometContext(topic, type);
> - NotificationHandler notificationHandler = null;
> - try{
> - notificationHandler = notificationClass.newInstance();
> - } catch (Throwable t) {
> - logger.log(Level.SEVERE,"Invalid NotificationHandler class : "
> - + notificationClass.getName() + " Using default.",t);
> - notificationHandler = new DefaultNotificationHandler();
> - }
> - cometContext.setCometSelector(cometSelector);
> - cometContext.setNotificationHandler(notificationHandler);
> - if (notificationHandler != null && (notificationHandler
> - instanceof DefaultNotificationHandler)){
> - ((DefaultNotificationHandler)notificationHandler)
> - .setThreadPool(threadPool);
> - }
> }
> + NotificationHandler notificationHandler = null;
> + try{
> + notificationHandler = notificationClass.newInstance();
> + } catch (Throwable t) {
> + logger.log(Level.SEVERE,"Invalid NotificationHandler class : "
> + + notificationClass.getName() + " Using default.",t);
> + notificationHandler = new DefaultNotificationHandler();
> + }
> + cometContext.setNotificationHandler(notificationHandler);
> + if (notificationHandler != null && (notificationHandler
> + instanceof DefaultNotificationHandler)){
> + ((DefaultNotificationHandler)notificationHandler)
> + .setThreadPool(threadPool);
> + }
> activeContexts.put(topic,cometContext);
> }
> +
> }
> }
> + cometContext.continuationType = type;
> return cometContext;
> }
>
> @@ -356,7 +349,7 @@
> * to the current thread so we can later retrieve the associated
> * SelectionKey. The SelectionKey is required in order to park the request.
> */
> - int continuationType = (cometContext == null)?
> + int continuationType = (cometContext == null)?
> AFTER_SERVLET_PROCESSING:cometContext.continuationType;
>
> /* Execute the Servlet.service method. CometEngine.register() or
> @@ -370,22 +363,27 @@
> */
> CometTask cometTask = updatedContexts.get();
> if (cometTask != null) {
> + //need to impl tlocal that gets and sets null in one efficent operation
> updatedContexts.set(null);
> - if (cometTask.isSuspended()){ //alreadySuspended)
> - cometTask.setSuspended(false);
> - cometTask.getCometContext().addActiveHandler(cometTask.getCometHandler(), dumykey);
> + cometContext = cometTask.getCometContext();
> + if (cometTask.upcoming_op_isread){ //alreadySuspended
> + cometTask.upcoming_op_isread = false;
> + //need to set dumykey in cometTask ?
Remove comment
> + cometContext.addActiveHandler(cometTask);
> return false;
> + }
> + cometTask.setAsyncProcessorTask(apt);
> + if (cometContext.getExpirationDelay() != -1){
> + cometTask.setTimeout(System.currentTimeMillis());
> }
> - SelectionKey key = apt.getAsyncExecutor().getProcessorTask().getSelectionKey();
> - key.attach("comet"); // Disable keep-alive
> - cometTask.getCometContext().initialize(cometTask.getCometHandler());
> - cometTask.setAsyncProcessorTask(apt);
> - cometTask.setSelectionKey(key);
> - cometTask.setCometSelector(cometSelector);
> - cometTask.setSelectorThread(apt.getSelectorThread());
> - cometTask.setThreadPool(apt.getThreadPool());
> - cometSelector.registerKey(cometTask);
> - return true;
> + SelectionKey mainKey = apt.getAsyncExecutor().getProcessorTask().getSelectionKey();
> + if (mainKey.isValid()){
> + mainKey.interestOps(SelectionKey.OP_READ);
> + mainKey.attach(cometTask);
> + cometContext.initialize(cometTask.getCometHandler());
> + cometContext.addActiveHandler(cometTask);
> + return true;
> + }
> }
> return false;
> }
> @@ -400,28 +398,33 @@
> }
>
> /**
> - * flush if AsyncTask.POST_EXECUTE
> + * flush if AsyncTask.POST_EXECUTE .<br>
> * {_at_link AsyncProcessorTask}
> */
> - protected void flushPostExecute(AsyncProcessorTask apt) {
> - if (apt != null && apt.getStage() == AsyncTask.POST_EXECUTE){
> - flushResponse(apt);
> - }
> - }
> -
> - /**
> - * Complete the asynchronous request.
> - */
> - protected void flushResponse(AsyncProcessorTask apt){
> - apt.setStage(AsyncTask.POST_EXECUTE);
> - try{
> - apt.doTask();
> - } catch (IllegalStateException ex){
> - if (logger.isLoggable(Level.FINEST)){
> - logger.log(Level.FINEST,"Resuming Response failed",ex);
> + protected void flushPostExecute(final CometTask task, boolean aptflush,boolean cancelkey) {
> + AsyncProcessorTask apt = task.getAsyncProcessorTask();
> + ProcessorTask p = task.getAsyncProcessorTask().getAsyncExecutor().getProcessorTask();
> + p.setReRegisterSelectionKey(false);
> + p.setAptCancelKey(cancelkey);
> + if (!aptflush){
> + p.terminateProcess();
> + }else{
> + if (apt.getStage() == AsyncTask.POST_EXECUTE){
> + try{
> + //All comet IO operations sync on handler except close
> + synchronized(task.cometHandler){
> + apt.doTask();
> + }
> + } catch (IllegalStateException ex){
> + if (logger.isLoggable(Level.FINEST)){
> + logger.log(Level.FINEST,"Resuming Response failed at aptflush",ex);
> + }
> + } catch (Throwable ex) {
> + logger.log(Level.SEVERE,"Resuming failed at aptflush",ex);
> + }
> + }else{
> + logger.warning("APTflush called at wrong stage");
> }
> - } catch (Throwable ex) {
> - logger.log(Level.SEVERE,"Resuming failed",ex);
> }
> }
>
> Index: modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java
> ===================================================================
> --- modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java (revision 2805)
> +++ modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java (working copy)
> @@ -39,6 +39,7 @@
> package com.sun.grizzly.utils;
>
> import com.sun.grizzly.*;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.util.Arrays;
> import java.util.Collection;
> @@ -74,7 +75,7 @@
> }
> });
>
> - new Thread(controller).start();
> + new WorkerThreadImpl("ControllerWorker", controller).start();
>
> try {
> latch.await();
> Index: modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java
> ===================================================================
> --- modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java (revision 2805)
> +++ modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java (working copy)
> @@ -41,6 +41,7 @@
> import com.sun.grizzly.filter.EchoFilter;
> import com.sun.grizzly.filter.LogFilter;
> import com.sun.grizzly.filter.ReadFilter;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import com.sun.grizzly.utils.ControllerUtils;
> import com.sun.grizzly.utils.TCPIOClient;
> import java.io.IOException;
> @@ -113,7 +114,7 @@
>
> ControllerUtils.startController(controller);
>
> - Thread restartThread = new Thread() {
> + Thread restartThread = new WorkerThreadImpl(new Runnable() {
> @Override
> public void run() {
> try {
> @@ -123,7 +124,7 @@
> exceptionHolder[0] = ex;
> }
> }
> - };
> + });
>
> restartThread.start();
>
> Index: modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (revision 2805)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (working copy)
> @@ -124,35 +124,38 @@
> }
>
> if (selector == null){
> - try{
> - isShutDown.set(false);
> + initSelector(ctx);
> + } else {
> + processPendingOperations(ctx);
> + }
> + }
>
> - connectorInstanceHandler = new ConnectorInstanceHandler.
> - ConcurrentQueueDelegateCIH(
> - getConnectorInstanceHandlerDelegate());
> -
> - datagramChannel = DatagramChannel.open();
> - selector = Selector.open();
> - if (role != Role.CLIENT){
> - datagramSocket = datagramChannel.socket();
> - datagramSocket.setReuseAddress(reuseAddress);
> - if (inet == null)
> - datagramSocket.bind(new InetSocketAddress(port));
> - else
> - datagramSocket.bind(new InetSocketAddress(inet,port));
> + private void initSelector(Context ctx) throws IOException{
> + try{
> + isShutDown.set(false);
>
> - datagramChannel.configureBlocking(false);
> - datagramChannel.register( selector, SelectionKey.OP_READ );
> -
> - datagramSocket.setSoTimeout(serverTimeout);
> - }
> - ctx.getController().notifyReady();
> - } catch (SocketException ex){
> - throw new BindException(ex.getMessage() + ": " + port);
> + connectorInstanceHandler = new ConnectorInstanceHandler.
> + ConcurrentQueueDelegateCIH(
> + getConnectorInstanceHandlerDelegate());
> +
> + datagramChannel = DatagramChannel.open();
> + selector = Selector.open();
> + if (role != Role.CLIENT){
> + datagramSocket = datagramChannel.socket();
> + datagramSocket.setReuseAddress(reuseAddress);
> + if (inet == null)
> + datagramSocket.bind(new InetSocketAddress(port));
> + else
> + datagramSocket.bind(new InetSocketAddress(inet,port));
> +
> + datagramChannel.configureBlocking(false);
> + datagramChannel.register( selector, SelectionKey.OP_READ );
> +
> + datagramSocket.setSoTimeout(serverTimeout);
> }
> -
> - } else {
> - processPendingOperations(ctx);
> + ctx.getController().notifyReady();
> + } catch (SocketException ex){
> + throw new BindException(ex.getMessage() + ": " + port);
> }
> }
>
> Index: modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java (revision 2805)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java (working copy)
> @@ -39,9 +39,8 @@
> package com.sun.grizzly;
>
> import java.util.ArrayList;
> -import java.util.Collection;
> -import java.util.HashSet;
> import java.util.List;
> +import java.util.concurrent.ExecutorService;
> import java.util.logging.Level;
>
> /**
> @@ -56,13 +55,13 @@
> /**
> * The list of ProtocolFilter this chain will invoke.
> */
> - protected List<ProtocolFilter> protocolFilters;
> + protected final List<ProtocolFilter> protocolFilters;
>
> /**
> * The list of {_at_link EventHandler}s, which will be notified about this
> * {_at_link ProtocolChain} events
> */
> - protected Collection<EventHandler> eventHandlers;
> + protected final List<EventHandler> eventHandlers;
>
> /**
> * <tt>true</tt> if a pipelined execution is required. A pipelined execution
> @@ -75,8 +74,9 @@
>
>
> public DefaultProtocolChain() {
> - protocolFilters = new ArrayList<ProtocolFilter>();
> - eventHandlers = new HashSet<EventHandler>();
> + protocolFilters = new ArrayList<ProtocolFilter>(4);
> + //ArrayList is faster then HashSet for small datasets.
> + eventHandlers = new ArrayList<EventHandler>(4);
> }
>
>
> @@ -296,9 +296,9 @@
> */
> protected void notifyException(Phase phase, ProtocolFilter filter,
> Throwable throwable) {
> - for(EventHandler eventHandler : eventHandlers) {
> + for(int i=0;i<eventHandlers.size();i++) {
> try {
> - eventHandler.onException(phase, filter, throwable);
> + eventHandlers.get(i).onException(phase, filter, throwable);
> } catch(Exception e) {
> Controller.logger().log(Level.SEVERE,
> "ProtocolChain notifyException exception", e);
> Index: modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java (revision 2805)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java (working copy)
> @@ -39,6 +39,7 @@
>
> import com.sun.grizzly.async.AsyncQueueReader;
> import com.sun.grizzly.async.AsyncQueueWriter;
> +import com.sun.grizzly.tcp.PendingIOhandler;
> import com.sun.grizzly.util.AttributeHolder;
> import com.sun.grizzly.util.Copyable;
> import com.sun.grizzly.util.State;
> @@ -59,8 +60,22 @@
> * @author Jeanfrancois Arcand
> */
> public interface SelectorHandler extends Handler, Copyable,
> - AttributeHolder, SupportStateHolder<State> {
> + AttributeHolder, SupportStateHolder<State> {
> +
> + /**
> + * enqueues runnable for later execution in postSelect <br>
Enqueue
> + * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.<br>
> + * @param runnable
>
> + public void addPendingIO(Runnable runnable);
> +
> +
> + * enqueues SlectionKey for later cancel and close .<br>
Enqueue SelectionKey
> + * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.<br>
> + * @param runnable
> +
> + public void addPendingKeyCancel(SelectionKey key);*/
Remove the above commented code?
> +
> /**
> * A token decribing the protocol supported by an implementation of this
> * interface
> Index: modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
> ===================================================================
> --- modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (revision 2805)
> +++ modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (working copy)
> @@ -45,9 +45,11 @@
> import com.sun.grizzly.util.DefaultThreadPool;
> import com.sun.grizzly.util.LinkedTransferQueue;
> import com.sun.grizzly.util.LoggerUtils;
> +import com.sun.grizzly.util.SelectedKeyAttachmentLogic;
> import com.sun.grizzly.util.State;
> import com.sun.grizzly.util.StateHolder;
> import com.sun.grizzly.util.SupportStateHolder;
> +import com.sun.grizzly.util.WorkerThreadImpl;
> import java.io.IOException;
> import java.nio.channels.ClosedChannelException;
> import java.nio.channels.ClosedSelectorException;
> @@ -58,7 +60,6 @@
> import java.util.Iterator;
> import java.util.Map;
> import java.util.Set;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.atomic.AtomicInteger;
> @@ -371,6 +372,13 @@
> private void handleSelectedKeys(Set<SelectionKey> readyKeys,SelectorHandler selectorHandler,NIOContext serverCtx){
> for(SelectionKey key:readyKeys) {
> try{
> +
> + Object attachment = key.attachment();
> + if (attachment instanceof SelectedKeyAttachmentLogic){
> + ((SelectedKeyAttachmentLogic)attachment).handleSelectedKey(key);
> + continue;
> + }
> +
> if (!key.isValid()){
> selectorHandler.getSelectionKeyHandler().close(key);
> continue;
GREAT WORK!!!!
--Jeanfrancois