Coverage Report - com.sun.grizzly.suspendable.SuspendableMonitor
 
Classes in this File Line Coverage Branch Coverage Complexity
SuspendableMonitor
88 %
14/16
50 %
1/2
0
SuspendableMonitor$1
59 %
43/73
47 %
16/34
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 package com.sun.grizzly.suspendable;
 39  
 
 40  
 import com.sun.grizzly.Controller;
 41  
 import java.io.IOException;
 42  
 import java.nio.channels.CancelledKeyException;
 43  
 import java.nio.channels.SelectionKey;
 44  
 import java.nio.channels.Selector;
 45  
 import java.util.Iterator;
 46  
 import java.util.Set;
 47  
 import java.util.logging.Level;
 48  
 import java.util.logging.Logger;
 49  
 
 50  
 import com.sun.grizzly.suspendable.SuspendableFilter.KeyHandler;
 51  
 import java.nio.channels.ClosedChannelException;
 52  
 import java.nio.channels.SelectableChannel;
 53  
 import java.util.concurrent.ConcurrentLinkedQueue;
 54  
 
 55  
 /**
 56  
  * A secondary {@link Selector} used to keep the state of a suspended
 57  
  * connection ({@link SelectionKey}). See  {@link SuspendableFilter} for more info.
 58  
  *
 59  
  * TODO: Add Pipelining/Multiplexing support.
 60  
  * @author Jeanfrancois Arcand
 61  
  */
 62  1222395
 public class SuspendableMonitor {
 63  
 
 64  
     /**
 65  
      * The {@link Selector}
 66  
      */
 67  
     private Selector selector;
 68  
     
 69  
 
 70  1
     private ConcurrentLinkedQueue<KeyHandler> keysToRegister 
 71  
             = new ConcurrentLinkedQueue<KeyHandler>();    
 72  
     
 73  
     /**
 74  
      * Logger.
 75  
      */
 76  1
     private Logger logger = Controller.logger();
 77  
 
 78  
     /**
 79  
      * Start a new Thread with a Selector running.
 80  
      */
 81  1
     public SuspendableMonitor() {
 82  1
         start();
 83  1
     }
 84  
 
 85  
     public void start() {
 86  1
         new Thread("GrizzlySuspendableMonitor") {
 87  
 
 88  
             {
 89  1
                 setDaemon(true);
 90  1
             }
 91  
 
 92  
             @SuppressWarnings("empty-statement")
 93  
             @Override
 94  
             public void run() {
 95  
                 try {
 96  1
                     selector = Selector.open();
 97  0
                 } catch (IOException ex) {
 98  
                     // Most probably a fd leak.
 99  0
                     logger.log(Level.SEVERE, "SuspendableMonitor.open()", ex);
 100  0
                     return;
 101  1
                 }
 102  
                 while (true) {
 103  349238
                     SelectionKey foreignKey = null;
 104  349238
                     KeyHandler kh = null;                    
 105  349238
                     int selectorState = 0;
 106  
 
 107  
                     try {
 108  
                         try {
 109  349238
                             selectorState = selector.select(1000);
 110  0
                         } catch (CancelledKeyException ex) {
 111  
                             ;
 112  349237
                         }
 113  
                         
 114  349237
                         Iterator<KeyHandler> keys = 
 115  
                                 keysToRegister.iterator();
 116  
 
 117  
                         SelectableChannel channel;
 118  349246
                         while (keys.hasNext()){
 119  9
                             kh = keys.next();
 120  9
                             channel =  kh.getKey().channel();
 121  9
                             if (kh.getKey().isValid() && channel.isOpen()) {
 122  9
                                 foreignKey = channel
 123  
                                     .register(selector,SelectionKey.OP_READ,kh);  
 124  9
                                 kh.setForeignKey(foreignKey);
 125  9
                                 keys.remove();
 126  
                             } 
 127  
                         }  
 128  
                         
 129  
                       /*  readyKeys = selector.selectedKeys();
 130  
                         iterator = readyKeys.iterator();
 131  
                         // TODO: Support pipelining
 132  
                        /* while (iterator.hasNext()) {
 133  
                             key = iterator.next();
 134  
                             if (key.isReadable()) {
 135  
                             //SuspendableMonitor.this.interrupted(key);
 136  
                             }
 137  
                         }*/
 138  349237
                         expireIdleKeys();
 139  
 
 140  349237
                         if (selectorState <= 0) {
 141  174670
                             selector.selectedKeys().clear();
 142  
                         }
 143  0
                     } catch (Throwable t) {
 144  0
                         logger.log(Level.SEVERE,"SuspendableMonitor",t);
 145  
                         try{
 146  0
                             if (kh != null) {
 147  
                                 try {
 148  0
                                     interrupted(kh.getKey());
 149  0
                                 } catch (Throwable t2) {
 150  0
                                     logger.log(Level.SEVERE, "SuspendableMonitor", t2);
 151  0
                                 }
 152  
                             }
 153  
 
 154  0
                             if (selectorState <= 0) {
 155  0
                                 selector.selectedKeys().clear();
 156  
                             }
 157  0
                         } catch (Throwable t2){
 158  0
                             logger.log(Level.SEVERE, "SuspendableMonitor", t2);
 159  0
                         }
 160  349237
                     }
 161  349237
                 }
 162  
             }
 163  
 
 164  
 
 165  
             /**
 166  
              * Expire the SelectionKey?
 167  
              */
 168  
             protected void expireIdleKeys() {
 169  349237
                 Set<SelectionKey> readyKeys = selector.keys();
 170  349237
                 if (readyKeys.isEmpty()) {
 171  77
                     return;
 172  
                 }
 173  349160
                 long current = System.currentTimeMillis();
 174  349160
                 Iterator<SelectionKey> iterator = readyKeys.iterator();
 175  
                 SelectionKey key;
 176  698320
                 while (iterator.hasNext()) {
 177  349160
                     key = iterator.next();
 178  349160
                     KeyHandler kh = (KeyHandler) key.attachment();
 179  349160
                     if (kh == null) {
 180  0
                         return;
 181  
                     }
 182  
 
 183  349160
                     long expire = kh.getRegistrationTime();
 184  
                      
 185  349160
                     if (expire == -1){
 186  23
                         continue;
 187  
                     }
 188  
 
 189  349137
                     if (current - expire >= kh.getSuspendableHandler().getExpireTime()) {
 190  3
                         kh.setRegistrationTime(-1);
 191  3
                         if (logger.isLoggable(Level.FINE)) {
 192  0
                             logger.log(Level.FINE, "Expiring: " 
 193  
                                     + key + " attachment: " + key.attachment());
 194  
                         }
 195  
                         try {
 196  3
                             kh.getSuspendableHandler().getSuspendableHandler()
 197  
                                     .expired(kh.getSuspendableHandler().getAttachment());
 198  0
                         } catch (Throwable t) {
 199  0
                             if (logger.isLoggable(Level.FINE) && kh != null) {
 200  0
                                 logger.log(Level.FINE, "Interrupting: " + t);
 201  
                             }
 202  3
                         }
 203  3
                         kh.getSuspendableHandler().getSuspendableFilter()
 204  
                                 .resume(kh.getKey());                        
 205  
                     }
 206  349137
                 }
 207  349160
             }
 208  
 
 209  
             /**
 210  
              * Interrupt a suspended SelectionKey that have timed out.
 211  
              */
 212  
             protected void interrupted(SelectionKey key) {
 213  0
                 key.cancel();
 214  
 
 215  0
                 KeyHandler kh = (KeyHandler) key.attachment();
 216  0
                 kh.getSuspendableHandler().getSelectorHandler()
 217  
                         .getSelectionKeyHandler().cancel(kh.getKey());
 218  0
                 if (logger.isLoggable(Level.FINE) && kh != null) {
 219  0
                     logger.log(Level.FINE, "Interrupting: " + kh.getKey());
 220  
                 }
 221  
 
 222  0
                 if (kh != null) {
 223  0
                     kh.getSuspendableHandler().getSuspendableHandler().
 224  
                             interupted(kh.getSuspendableHandler().getAttachment());
 225  0
                     kh.getSuspendableHandler().getSuspendableFilter()
 226  
                             .suspendedKeys.remove(kh.getKey());
 227  
                 }
 228  0
             }
 229  
         }.start();
 230  1
     }
 231  
 
 232  
     
 233  
     /**
 234  
      * Suspend the {@link ReadableChannel} represented by this {@link SuspendableFilter.KeyHandler}
 235  
      * by registering it on secondary Selector.
 236  
      * @param kh The KeyHandler which hold the current SelectionKey.
 237  
      */
 238  
     protected void suspend(KeyHandler kh)
 239  
             throws ClosedChannelException {
 240  
         try{            
 241  9
             kh.setRegistrationTime(System.currentTimeMillis());
 242  9
             if (kh.getForeignKey() == null){
 243  9
                 keysToRegister.offer(kh);
 244  9
                 selector.wakeup();
 245  
             }
 246  0
         } catch (Throwable ex){
 247  0
             logger.log(Level.SEVERE,"suspend exception: " + kh.getKey(), ex);
 248  9
         }
 249  9
     }
 250  
 }