/* * The contents of this file are subject to the terms * of the Common Development and Distribution License * (the License). You may not use this file except in * compliance with the License. * * You can obtain a copy of the license at * https://glassfish.dev.java.net/public/CDDLv1.0.html or * glassfish/bootstrap/legal/CDDLv1.0.txt. * See the License for the specific language governing * permissions and limitations under the License. * * When distributing Covered Code, include this CDDL * Header Notice in each file and include the License file * at glassfish/bootstrap/legal/CDDLv1.0.txt. * If applicable, add the following below the CDDL Header, * with the fields enclosed by brackets [] replaced by * you own identifying information: * "Portions Copyrighted [year] [name of copyright owner]" * * Copyright 2006 Sun Microsystems, Inc. All rights reserved. */ package com.sun.grizzly.impl; import java.io.IOException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.sun.grizzly.api.AcceptServiceHandler; import org.sun.grizzly.api.GrizzlyException; import org.sun.grizzly.api.GrizzlyInvalidStateException; import org.sun.grizzly.api.ReadServiceHandler; import org.sun.grizzly.api.TransportConfiguration; import org.sun.grizzly.api.TransportDispatcher; import org.sun.grizzly.api.TransportDispatcherInterceptor; import org.sun.grizzly.api.TransportHandle; // TODO: Needs more JavaDoc /** * * @author Charlie Hunt */ public class TransportDispatcherImpl extends Thread implements TransportDispatcher { protected Selector selector; private long timeout; private ConcurrentLinkedQueue keysToEnable; protected ReadServiceHandler readServiceHandler; /* * Run dispatch event loop until false */ private volatile boolean runDispatchLoop; private TransportHandle acceptHandle; private List activeHandles; private AcceptServiceHandler acceptServiceHandler; protected enum State { STOPPED, STARTED, PAUSED } protected volatile State state; final protected ReentrantLock stateLock; final protected TransportDispatcherInterceptor transportDispatcherInterceptor; final protected int configurationId; static final private AtomicInteger transportDispatcherUniqueId = new AtomicInteger(0); static final private String idStr = "TransportDispatcherImpl-"; /** * Creates a new instance of a TransportDispatcher */ public TransportDispatcherImpl(TransportConfiguration transportConfiguration) { stateLock = new ReentrantLock(); stateLock.lock(); try { state = State.STOPPED; } finally { stateLock.unlock(); } timeout = TransportDispatcher.defaultTimeout; keysToEnable = new ConcurrentLinkedQueue(); activeHandles = Collections.synchronizedList(new LinkedList()); runDispatchLoop = Boolean.FALSE; try { selector = Selector.open(); } catch (IOException ex) { // TODO: Need to log SEVERE message here, failed to open Selector. throw new GrizzlyException("An unexpected IO error occurred " + "when opening TransportDispatcher's " + "Selector", ex); } transportDispatcherInterceptor = transportConfiguration.getTransportDispatcherInterceptor(); acceptServiceHandler = null; readServiceHandler = null; configurationId = transportDispatcherUniqueId.incrementAndGet(); } public void registerReadServiceHandler(TransportHandle transportHandle, ReadServiceHandler serviceHandler) { if (readServiceHandler != null) { // Note: allows a TransportDispatcher to change its ReadServiceHandler // TODO: log a FINE message saying TransportDispatcher's ReadServiceHandler is being changed } this.readServiceHandler = serviceHandler; try { // Register TransportHandle to handle read events // NOTE: Write events are not registered here. SelectionKey aKey = transportHandle.getSocket().getChannel() .register(selector, SelectionKey.OP_READ); transportHandle.setSelectionKey(aKey); } catch (ClosedChannelException ex) { // TODO: Need GrizzlyException and logging ex.printStackTrace(); } } public void unregister(TransportHandle transportHandle) { SelectionKey key = transportHandle.getSelectionKey(); if (key != null && key.isValid()){ key.attach(null); key.cancel(); transportHandle.setSelectionKey(null); if (selector != null) { selector.wakeup(); } } } public void timeout(long timeout) { this.timeout = timeout; } public long timeout() { return timeout; } public void registerAcceptServiceHandler(TransportHandle transportHandle, AcceptServiceHandler acceptServiceHandler) { if (this.acceptHandle != null) { // TODO: log at FINE, an attempt to change AcceptHandler from/to, } acceptHandle = transportHandle; if (this.acceptServiceHandler != null) { // TODO: log at FINE, an attempt to change AcceptServiceHandler from/to, } this.acceptServiceHandler = acceptServiceHandler; // register theTransportHandle to listen for "accept" events try { transportHandle.setSelectionKey( transportHandle.getSelectableChannel().register( selector,SelectionKey.OP_ACCEPT)); } catch (ClosedChannelException ex) { // TODO: Need to log SEVERE error, not able to register accept service handler throw new GrizzlyException("Failed to register AcceptServiceHandler" + " with TransportDispatcher due to: " + ex.toString(), ex); } } /* * Return a string that uniquely identifies a TransportDispatcher. The form * of the string is "TransportDispatcher-#" where # is unique integer for * every instance of every TransportDispatcher. */ public String toString() { StringBuffer sb = new StringBuffer(idStr.length() + 16) .append(idStr).append(getConfigurationId()); return sb.toString(); } /* * Return a unique id for a TransportDispatcher. */ public int getConfigurationId() { return configurationId; } public TransportHandle acceptHandle() { return acceptHandle; } public List passiveHandles() { // TODO: Need to implement return null; } public List activeHandles() { return activeHandles; } // public UnaryVoidFunction selectCallback() { // // TODO: Need to implement // return null; // } // public void selectCallback(UnaryVoidFunction cb) { // // TODO: Need to implement // } // Lifecycle interface(s) public void startProcessing() { stateLock.lock(); try { if ( state == State.STOPPED) { try { // start Selector event loop if (!selector.isOpen()) { selector = Selector.open(); } runDispatchLoop = Boolean.TRUE; super.start(); state = State.STARTED; } catch (IOException ex) { // TODO: Need a SEVERE log here saying Selector failed to open throw new GrizzlyException("Unexpected fatal i/o exception when " + "attempting to open TransportDispatcher's Selector", ex); } } else if (state == State.PAUSED) { // Not a valid operation when a TransportDispatcher is paused throw new GrizzlyInvalidStateException( "TransportDispatcher.startProcessing() cannot not " + "allowed when TransportDispatcher is in a paused " + "state. Must use TransportDispatcher.resumeProcessing()"); } else { // it's already started // TODO: log FINE message saying TransportDispatcher already started } } finally { stateLock.unlock(); } } public void stopProcessing() { stateLock.lock(); try { if (state != State.STOPPED) { // stop the Selector event loop runDispatchLoop = Boolean.FALSE; // close all registered SocketChannels Set keySet = selector.keys(); Iterator itr = keySet.iterator(); while (itr.hasNext()) { SelectionKey key = itr.next(); if (key != null && key.isValid()) { if (key.channel() instanceof SocketChannel) { SocketChannel channel = (SocketChannel)key.channel(); try { channel.socket().shutdownInput(); } catch (IOException ex) { // TODO: log FINE message, shutdown input i/o error } try { channel.socket().shutdownOutput(); } catch (IOException ex) { // TODO: log FINE message, shutdown output i/o error } try { channel.socket().close(); } catch (IOException ex){ // TODO: log FINE message, socket close i/o error } try { channel.close(); } catch (IOException ex) { // TODO: log FINE message, SocketChannel close i/o error } key.attach(null); key.cancel(); } } } selector.wakeup(); try { selector.close(); } catch (IOException ex) { // TODO: Log FINE message, i/o exception on Selector.close() } state = State.STOPPED; } else { // TODO: log FINE message saying TransportDispatcher is already in stopped state } } finally { stateLock.unlock(); } } public void pauseProcessing() { // pause the select even loop // TODO: needs to be implemented throw new GrizzlyUnsupportedOperationException( "TransportDispatcher.pauseProcessing() is not supported"); } public void resumeProcessing() { // resume the Selector event loop // TODO: needs to be implemented throw new GrizzlyUnsupportedOperationException( "TransportDispatcher.resumeProcessing() is not supported"); } // Implement Thread's run() public void run() { // TODO: Add doPriv block to set Thread name to toString(), (test to // make sure TransportDispatcherImpl and // DividedTransportDispatcherImpl get set right) do { try { doSelect(); } catch (Throwable ex) { // TODO: log FINE message for any / all exceptions caught here // TODO: May need to catch additional exception types ? } } while (runDispatchLoop); } /* * Add a SelectionKey to the list of SelectionKeys to be enabled. */ void addSelectionKeyToEnableList(SelectionKey theSelectionKey) { keysToEnable.add(theSelectionKey); // wakeup the Selector selector.wakeup(); } protected void doSelect() throws Throwable { SelectionKey key = null; Set readyKeys; Iterator iterator; int selectorState; try{ selectorState = 0; enableSelectionKeys(); try{ selectorState = selector.select(TransportDispatcher.defaultTimeout); } catch (CancelledKeyException ex){ // TODO: log FINE message saying a SelectionKey has been cancelled } readyKeys = selector.selectedKeys(); iterator = readyKeys.iterator(); while (iterator.hasNext()) { key = iterator.next(); iterator.remove(); if (key.isValid()) { handleSelectedOp(key); } else { cancelSelectionKey(key); } } // execute TransportDispatcherInterceptor if one is configured if (transportDispatcherInterceptor != null) { transportDispatcherInterceptor.execute(this); } if (selectorState <= 0){ selector.selectedKeys().clear(); return; } } catch (ClosedSelectorException cse) { // TODO: This could indicate that the TransportDispatcher is // shutting down. Hence, we need to handle this Exception // appropriately. Perhaps check the state before logging // what's happening ? stateLock.lock(); try { if (state != State.STOPPED) { // TODO: log SEVERE message, something bad happened to the Selector } else { // TODO: log FINE message, TransportDispatcher select loop shutting down } } finally { stateLock.unlock(); } clearSelectionKey(key); } catch (Throwable t){ // TODO: Need to figure out what to do if we see a Throwable here, perhaps nothing ? clearSelectionKey(key); } } /* * Enable all registered interestOps. Due an NIO bug, all interestOps * invocations need to occur in the same thread as the thread the Selector * is running. */ protected void enableSelectionKeys(){ SelectionKey selectionKey; int size = keysToEnable.size(); for (int i=0; i < size; i++) { selectionKey = keysToEnable.poll(); selectionKey.interestOps( selectionKey.interestOps() | SelectionKey.OP_READ); } } protected void handleSelectedOp(SelectionKey selectionKey) { if ((selectionKey.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // TODO: log FINE message, info // handling OP_ACCEPT for ... try { acceptServiceHandler.handleAcceptEvent(); } catch (IOException ex) { // TODO: Need to handle this Exception ex.printStackTrace(); } } else if ((selectionKey.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // handle read event // TODO: log FINE message, info // handling OP_READ event for ... // disable OP_READ on key before doing anything else selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ); TransportHandle aHandle = new TransportHandleImpl(this, selectionKey); // An alternative to creating aHandle? // TransportHandle aHandle = itsPassiveHandles.remove(theSelectionKey); // itsActiveHandles.put(theSelectionKey, aHandle); readServiceHandler.handleReadEvent(aHandle); } else { // TODO: What do we do if we get something other than a OP_ACCEPT or OP_READ throw new GrizzlyUnsupportedOperationException("Unsupported event: " + selectionKey.toString()); } } protected void cancelSelectionKey(SelectionKey selectionKey) { // log FINE message, info // cancel SelectionKey on .... selectionKey.cancel(); } protected void clearSelectionKey(SelectionKey selectionKey) { if (selectionKey != null){ selectionKey.attach(null); selectionKey.cancel(); } } // Package visible Selector getSelector() { return selector; } }