/*
 * The contents of this file are subject to the terms
 * of the Common Development and Distribution License
 * (the License).  You may not use this file except in
 * compliance with the License.
 *
 * You can obtain a copy of the license at
 * https://glassfish.dev.java.net/public/CDDLv1.0.html or
 * glassfish/bootstrap/legal/CDDLv1.0.txt.
 * See the License for the specific language governing
 * permissions and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL
 * Header Notice in each file and include the License file
 * at glassfish/bootstrap/legal/CDDLv1.0.txt.
 * If applicable, add the following below the CDDL Header,
 * with the fields enclosed by brackets [] replaced by
 * you own identifying information:
 * "Portions Copyrighted [year] [name of copyright owner]"
 *
 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
 */

package com.sun.grizzly.impl;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.sun.grizzly.api.AcceptServiceHandler;
import org.sun.grizzly.api.GrizzlyException;
import org.sun.grizzly.api.GrizzlyInvalidStateException;
import org.sun.grizzly.api.ReadServiceHandler;
import org.sun.grizzly.api.TransportConfiguration;
import org.sun.grizzly.api.TransportDispatcher;
import org.sun.grizzly.api.TransportDispatcherInterceptor;
import org.sun.grizzly.api.TransportHandle;


// TODO: Needs more JavaDoc


/**
 *
 * @author Charlie Hunt
 */

public class TransportDispatcherImpl extends Thread 
                     implements TransportDispatcher {
    
    protected Selector selector;
    private long timeout;
    private ConcurrentLinkedQueue<SelectionKey> keysToEnable;
    protected ReadServiceHandler readServiceHandler;
    /*
     * Run dispatch event loop until false
     */
    private volatile boolean runDispatchLoop;
    private TransportHandle acceptHandle;
    private List<TransportHandle> activeHandles;
    private AcceptServiceHandler acceptServiceHandler;

    protected enum State { STOPPED, STARTED, PAUSED }
    protected volatile State state;
    final protected ReentrantLock stateLock;
    final protected TransportDispatcherInterceptor transportDispatcherInterceptor;
    final protected int configurationId;
    
    static final private AtomicInteger transportDispatcherUniqueId = 
                                                           new AtomicInteger(0);
    static final private String idStr = "TransportDispatcherImpl-";
    
    
    /**
     * Creates a new instance of a TransportDispatcher
     */
    public TransportDispatcherImpl(TransportConfiguration transportConfiguration) {
        stateLock = new ReentrantLock();
        stateLock.lock();
        try {
            state = State.STOPPED;
        } finally {
            stateLock.unlock();
        }
        timeout = TransportDispatcher.defaultTimeout;
        keysToEnable = new ConcurrentLinkedQueue<SelectionKey>();
        activeHandles = Collections.synchronizedList(new LinkedList<TransportHandle>());
        runDispatchLoop = Boolean.FALSE;

        try {
            selector = Selector.open();
        } catch (IOException ex) {
            // TODO: Need to log SEVERE message here, failed to open Selector.
            throw new GrizzlyException("An unexpected IO error occurred " +
                                       "when opening TransportDispatcher's " +
                                       "Selector", ex);
        }

        transportDispatcherInterceptor =
                     transportConfiguration.getTransportDispatcherInterceptor();
        acceptServiceHandler = null;
        readServiceHandler = null;
        configurationId = transportDispatcherUniqueId.incrementAndGet();
    }
 
    
    public void registerReadServiceHandler(TransportHandle transportHandle,
                                           ReadServiceHandler serviceHandler) {
        
        if (readServiceHandler != null) {
            // Note: allows a TransportDispatcher to change its ReadServiceHandler
            // TODO: log a FINE message saying TransportDispatcher's ReadServiceHandler is being changed
        }
        
        this.readServiceHandler = serviceHandler;
        
        try {
            // Register TransportHandle to handle read events
            // NOTE: Write events are not registered here.
            SelectionKey aKey =
                    transportHandle.getSocket().getChannel()
                    .register(selector, SelectionKey.OP_READ);
            transportHandle.setSelectionKey(aKey);
        } catch (ClosedChannelException ex) {
            // TODO: Need GrizzlyException and logging
            ex.printStackTrace();
        }
    }
    

    public void unregister(TransportHandle transportHandle) {
        SelectionKey key = transportHandle.getSelectionKey();
        if (key != null && key.isValid()){
            key.attach(null);
            key.cancel();
            transportHandle.setSelectionKey(null);
            if (selector != null) {
                selector.wakeup();
            }
        }
    }
 

    public void timeout(long timeout) {
        this.timeout = timeout;
    }
    

    public long timeout() {
        return timeout;
    }
    

    public void registerAcceptServiceHandler(TransportHandle transportHandle,
                                       AcceptServiceHandler acceptServiceHandler) {
        if (this.acceptHandle != null) {
            // TODO: log at FINE, an attempt to change AcceptHandler from/to,
        }
        acceptHandle = transportHandle;
        if (this.acceptServiceHandler != null) {
            // TODO: log at FINE, an attempt to change AcceptServiceHandler from/to,
        }
        this.acceptServiceHandler = acceptServiceHandler;
        
        // register theTransportHandle to listen for "accept" events
        try {
            transportHandle.setSelectionKey(
                    transportHandle.getSelectableChannel().register(
                                              selector,SelectionKey.OP_ACCEPT));
        } catch (ClosedChannelException ex) {
            // TODO: Need to log SEVERE error, not able to register accept service handler
            throw new GrizzlyException("Failed to register AcceptServiceHandler" +
                    " with TransportDispatcher due to: " + ex.toString(), ex);
        }
    }


    /*
     * Return a string that uniquely identifies a TransportDispatcher. The form
     * of the string is "TransportDispatcher-#" where # is unique integer for
     * every instance of every TransportDispatcher.
     */
    public String toString() {
        StringBuffer sb = new StringBuffer(idStr.length() + 16)
                                    .append(idStr).append(getConfigurationId());
        return sb.toString();
    }


    /*
     * Return a unique id for a TransportDispatcher.
     */
    public int getConfigurationId() {
        return configurationId;
    }


    public TransportHandle acceptHandle() {
        return acceptHandle;
    }
    

    public List<TransportHandle> passiveHandles() {
        // TODO: Need to implement
        return null;
    }
    

    public List<TransportHandle> activeHandles() {
        return activeHandles;
    }
    
//    public UnaryVoidFunction<TransportDispatcher> selectCallback() {
//        // TODO: Need to implement
//        return null;
//    }
    
//    public void selectCallback(UnaryVoidFunction<TransportDispatcher> cb) {
//        // TODO: Need to implement
//    }
    
    // Lifecycle interface(s)
    
    public void startProcessing() {
        stateLock.lock();
        try {
            if ( state == State.STOPPED) {
                try {
                    // start Selector event loop
                    if (!selector.isOpen()) {
                        selector = Selector.open();
                    }
                    runDispatchLoop = Boolean.TRUE;
                    super.start();
                    state = State.STARTED;
                } catch (IOException ex) {
                    // TODO: Need a SEVERE log here saying Selector failed to open
                    throw new GrizzlyException("Unexpected fatal i/o exception when " +
                            "attempting to open TransportDispatcher's Selector",
                            ex);
                }
            } else if (state == State.PAUSED) {
                // Not a valid operation when a TransportDispatcher is paused
                throw new GrizzlyInvalidStateException(
                        "TransportDispatcher.startProcessing() cannot not " +
                        "allowed when TransportDispatcher is in a paused " +
                        "state. Must use TransportDispatcher.resumeProcessing()");
            } else {
                // it's already started
                // TODO: log FINE message saying TransportDispatcher already started
            }
        } finally {
            stateLock.unlock();
        }
    }
    

    public void stopProcessing() {
        stateLock.lock();
        try {
            if (state != State.STOPPED) {
                // stop the Selector event loop
                runDispatchLoop = Boolean.FALSE;
                // close all registered SocketChannels
                Set<SelectionKey> keySet = selector.keys();
                Iterator<SelectionKey> itr = keySet.iterator();
                while (itr.hasNext()) {
                    SelectionKey key = itr.next();
                    if (key != null && key.isValid()) {
                        if (key.channel() instanceof SocketChannel) {
                            SocketChannel channel =
                                    (SocketChannel)key.channel();
                            try {
                                channel.socket().shutdownInput();
                            } catch (IOException ex) {
                                // TODO: log FINE message, shutdown input i/o error
                            }
                            try {
                                channel.socket().shutdownOutput();
                            } catch (IOException ex) {
                                // TODO: log FINE message, shutdown output i/o error
                            }
                            try {
                                channel.socket().close();
                            } catch (IOException ex){
                                // TODO: log FINE message, socket close i/o error
                            }
                            try {
                                channel.close();
                            } catch (IOException ex) {
                                // TODO: log FINE message, SocketChannel close i/o error
                            }
                            
                            key.attach(null);
                            key.cancel();
                        }
                    }
                }
                selector.wakeup();
                try {
                    selector.close();
                } catch (IOException ex) {
                    // TODO: Log FINE message, i/o exception on Selector.close()
                }
                state = State.STOPPED;
            } else {
                // TODO: log FINE message saying TransportDispatcher is already in stopped state
            }
        } finally {
            stateLock.unlock();
        }
    }
    

    public void pauseProcessing() {
        // pause the select even loop
        // TODO: needs to be implemented
        throw new GrizzlyUnsupportedOperationException(
                "TransportDispatcher.pauseProcessing() is not supported");
    }
    
    
    public void resumeProcessing() {
        // resume the Selector event loop
        // TODO: needs to be implemented
        throw new GrizzlyUnsupportedOperationException(
                "TransportDispatcher.resumeProcessing() is not supported");
    }
    

    // Implement Thread's run()
    public void run() {
        // TODO: Add doPriv block to set Thread name to toString(), (test to
        //       make sure TransportDispatcherImpl and 
        //       DividedTransportDispatcherImpl get set right)
        do {
            try {
                doSelect();
            } catch (Throwable ex) {
                // TODO: log FINE message for any / all exceptions caught here
                // TODO: May need to catch additional exception types ?
            }
        } while (runDispatchLoop);
    }
    

    /*
     * Add a SelectionKey to the list of SelectionKeys to be enabled.
     */
    void addSelectionKeyToEnableList(SelectionKey theSelectionKey) {
        keysToEnable.add(theSelectionKey);
        // wakeup the Selector
        selector.wakeup();
    }
    
 
    protected void doSelect() throws Throwable {
        SelectionKey key = null;
        Set<SelectionKey> readyKeys;
        Iterator<SelectionKey> iterator;
        int selectorState;
        try{
            selectorState = 0;
            enableSelectionKeys();
            
            try{
                selectorState = selector.select(TransportDispatcher.defaultTimeout);
            } catch (CancelledKeyException ex){
                // TODO: log FINE message saying a SelectionKey has been cancelled
            }
            
            readyKeys = selector.selectedKeys();
            iterator = readyKeys.iterator();
            
            while (iterator.hasNext()) {
                key = iterator.next();
                iterator.remove();
                if (key.isValid()) {
                    handleSelectedOp(key);
                } else {
                    cancelSelectionKey(key);
                }
            }
            
            // execute TransportDispatcherInterceptor if one is configured
            if (transportDispatcherInterceptor != null) {
                transportDispatcherInterceptor.execute(this);
            }
            
            if (selectorState <= 0){
                selector.selectedKeys().clear();
                return;
            }
            
        } catch (ClosedSelectorException cse) {
            // TODO: This could indicate that the TransportDispatcher is
            //       shutting down. Hence, we need to handle this Exception
            //       appropriately. Perhaps check the state before logging
            //       what's happening ?
            stateLock.lock();
            try {
                if (state != State.STOPPED) {
                    // TODO: log SEVERE message, something bad happened to the Selector
                } else {
                    // TODO: log FINE message, TransportDispatcher select loop shutting down
                }
            } finally {
                stateLock.unlock();
            }
            
            clearSelectionKey(key);
            
        } catch (Throwable t){
            // TODO: Need to figure out what to do if we see a Throwable here, perhaps nothing ?
            clearSelectionKey(key);            
        }
    }
    

    /*
     * Enable all registered interestOps. Due an NIO bug, all interestOps
     * invocations need to occur in the same thread as the thread the Selector
     * is running.
     */
    protected void enableSelectionKeys(){
        SelectionKey selectionKey;
        int size = keysToEnable.size();
        for (int i=0; i < size; i++) {
            selectionKey = keysToEnable.poll();
            selectionKey.interestOps(
                    selectionKey.interestOps() | SelectionKey.OP_READ);
        }
    }
    
    
    protected void handleSelectedOp(SelectionKey selectionKey) {
        if ((selectionKey.readyOps() & SelectionKey.OP_ACCEPT) == 
                                                       SelectionKey.OP_ACCEPT) {
            // TODO: log FINE message, info
            // handling OP_ACCEPT for ...
            try {
                acceptServiceHandler.handleAcceptEvent();
            } catch (IOException ex) {
                // TODO: Need to handle this Exception
                ex.printStackTrace();
            }
        } else if ((selectionKey.readyOps() & SelectionKey.OP_READ) == 
                                                         SelectionKey.OP_READ) {
            // handle read event
            // TODO: log FINE message, info
            // handling OP_READ event for ...
            // disable OP_READ on key before doing anything else
            selectionKey.interestOps(selectionKey.interestOps() &
                                     ~SelectionKey.OP_READ);
            TransportHandle aHandle = new TransportHandleImpl(this, selectionKey);
            // An alternative to creating aHandle?
            // TransportHandle aHandle = itsPassiveHandles.remove(theSelectionKey);
            // itsActiveHandles.put(theSelectionKey, aHandle);
            readServiceHandler.handleReadEvent(aHandle);
        } else {
            // TODO: What do we do if we get something other than a OP_ACCEPT or OP_READ
            throw new GrizzlyUnsupportedOperationException("Unsupported event: " +
                                                       selectionKey.toString());
        }
    }
    

    protected void cancelSelectionKey(SelectionKey selectionKey) {
        // log FINE message, info
        // cancel SelectionKey on ....
        selectionKey.cancel();
    }    


    protected void clearSelectionKey(SelectionKey selectionKey) {
        if (selectionKey != null){
            selectionKey.attach(null);
            selectionKey.cancel();
        }
    }    


    // Package visible
    Selector getSelector() {
        return selector;
    }
}