Coverage Report - com.sun.grizzly.suspendable.SuspendableMonitor
 
Classes in this File Line Coverage Branch Coverage Complexity
SuspendableMonitor
88 %
14/16
50 %
1/2
0
SuspendableMonitor$1
59 %
44/74
47 %
16/34
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 package com.sun.grizzly.suspendable;
 39  
 
 40  
 import com.sun.grizzly.Controller;
 41  
 import java.io.IOException;
 42  
 import java.nio.channels.CancelledKeyException;
 43  
 import java.nio.channels.SelectionKey;
 44  
 import java.nio.channels.Selector;
 45  
 import java.util.Iterator;
 46  
 import java.util.Set;
 47  
 import java.util.logging.Level;
 48  
 import java.util.logging.Logger;
 49  
 
 50  
 import com.sun.grizzly.suspendable.SuspendableFilter.KeyHandler;
 51  
 import java.nio.channels.ClosedChannelException;
 52  
 import java.nio.channels.SelectableChannel;
 53  
 import java.util.concurrent.ConcurrentLinkedQueue;
 54  
 
 55  
 /**
 56  
  * A secondary {@link Selector} used to keep the state of a suspended
 57  
  * connection ({@link SelectionKey}). See  {@link SuspendableFilter} for more info.
 58  
  *
 59  
  * TODO: Add Pipelining/Multiplexing support.
 60  
  * @author Jeanfrancois Arcand
 61  
  */
 62  1196551
 public class SuspendableMonitor {
 63  
 
 64  
     /**
 65  
      * The {@link Selector}
 66  
      */
 67  
     private Selector selector;
 68  
     
 69  
 
 70  1
     private ConcurrentLinkedQueue<KeyHandler> keysToRegister 
 71  
             = new ConcurrentLinkedQueue<KeyHandler>();    
 72  
     
 73  
     /**
 74  
      * Logger.
 75  
      */
 76  1
     private Logger logger = Controller.logger();
 77  
 
 78  
     /**
 79  
      * Start a new Thread with a Selector running.
 80  
      */
 81  1
     public SuspendableMonitor() {
 82  1
         start();
 83  1
     }
 84  
 
 85  
     public void start() {
 86  1
         new Thread("GrizzlySuspendableMonitor") {
 87  
 
 88  
             {
 89  1
                 setDaemon(true);
 90  1
             }
 91  
 
 92  
             @SuppressWarnings("empty-statement")
 93  
             @Override
 94  
             public void run() {
 95  
                 try {
 96  1
                     selector = Selector.open();
 97  0
                 } catch (IOException ex) {
 98  
                     // Most probably a fd leak.
 99  0
                     logger.log(Level.SEVERE, "SuspendableMonitor.open()", ex);
 100  0
                     return;
 101  1
                 }
 102  
                 while (true) {
 103  341853
                     SelectionKey foreignKey = null;
 104  341853
                     KeyHandler kh = null;                    
 105  
                     Set readyKeys;
 106  
                     Iterator<SelectionKey> iterator;
 107  341853
                     int selectorState = 0;
 108  
 
 109  
                     try {
 110  341853
                         selectorState = 0;
 111  
 
 112  
                         try {
 113  341853
                             selectorState = selector.select(1000);
 114  0
                         } catch (CancelledKeyException ex) {
 115  
                             ;
 116  341852
                         }
 117  
                         
 118  341852
                         Iterator<KeyHandler> keys = 
 119  
                                 keysToRegister.iterator();
 120  
 
 121  
                         SelectableChannel channel;
 122  341861
                         while (keys.hasNext()){
 123  9
                             kh = keys.next();
 124  9
                             channel =  kh.getKey().channel();
 125  9
                             if (kh.getKey().isValid() && channel.isOpen()) {
 126  9
                                 foreignKey = channel
 127  
                                     .register(selector,SelectionKey.OP_READ,kh);  
 128  9
                                 kh.setForeignKey(foreignKey);
 129  9
                                 keys.remove();
 130  
                             } 
 131  
                         }  
 132  
                         
 133  
                       /*  readyKeys = selector.selectedKeys();
 134  
                         iterator = readyKeys.iterator();
 135  
                         // TODO: Support pipelining
 136  
                        /* while (iterator.hasNext()) {
 137  
                             key = iterator.next();
 138  
                             if (key.isReadable()) {
 139  
                             //SuspendableMonitor.this.interrupted(key);
 140  
                             }
 141  
                         }*/
 142  341852
                         expireIdleKeys();
 143  
 
 144  341852
                         if (selectorState <= 0) {
 145  170981
                             selector.selectedKeys().clear();
 146  
                         }
 147  0
                     } catch (Throwable t) {
 148  0
                         t.printStackTrace();
 149  
                         try{
 150  0
                             if (kh != null) {
 151  
                                 try {
 152  0
                                     interrupted(kh.getKey());
 153  0
                                 } catch (Throwable t2) {
 154  0
                                     logger.log(Level.SEVERE, "SuspendableMonitor", t2);
 155  0
                                 }
 156  
                             }
 157  
 
 158  0
                             if (selectorState <= 0) {
 159  0
                                 selector.selectedKeys().clear();
 160  
                             }
 161  0
                         } catch (Throwable t2){
 162  0
                             logger.log(Level.SEVERE, "SuspendableMonitor", t2);
 163  0
                         }
 164  341852
                     }
 165  341852
                 }
 166  
             }
 167  
 
 168  
 
 169  
             /**
 170  
              * Expire the SelectionKey?
 171  
              */
 172  
             protected void expireIdleKeys() {
 173  341852
                 Set<SelectionKey> readyKeys = selector.keys();
 174  341852
                 if (readyKeys.isEmpty()) {
 175  86
                     return;
 176  
                 }
 177  341766
                 long current = System.currentTimeMillis();
 178  341766
                 Iterator<SelectionKey> iterator = readyKeys.iterator();
 179  
                 SelectionKey key;
 180  683532
                 while (iterator.hasNext()) {
 181  341766
                     key = iterator.next();
 182  341766
                     KeyHandler kh = (KeyHandler) key.attachment();
 183  341766
                     if (kh == null) {
 184  0
                         return;
 185  
                     }
 186  
 
 187  341766
                     long expire = kh.getRegistrationTime();
 188  
                      
 189  341766
                     if (expire == -1){
 190  23
                         continue;
 191  
                     }
 192  
 
 193  341743
                     if (current - expire >= kh.getSuspendableHandler().getExpireTime()) {
 194  3
                         kh.setRegistrationTime(-1);
 195  3
                         if (logger.isLoggable(Level.FINE)) {
 196  0
                             logger.log(Level.FINE, "Expiring: " 
 197  
                                     + key + " attachment: " + key.attachment());
 198  
                         }
 199  
                         try {
 200  3
                             kh.getSuspendableHandler().getSuspendableHandler()
 201  
                                     .expired(kh.getSuspendableHandler().getAttachment());
 202  0
                         } catch (Throwable t) {
 203  0
                             if (logger.isLoggable(Level.FINE) && kh != null) {
 204  0
                                 logger.log(Level.FINE, "Interrupting: " + t);
 205  
                             }
 206  3
                         }
 207  3
                         kh.getSuspendableHandler().getSuspendableFilter()
 208  
                                 .resume(kh.getKey());                        
 209  
                     }
 210  341743
                 }
 211  341766
             }
 212  
 
 213  
             /**
 214  
              * Interrupt a suspended SelectionKey that have timed out.
 215  
              */
 216  
             protected void interrupted(SelectionKey key) {
 217  0
                 key.cancel();
 218  
 
 219  0
                 KeyHandler kh = (KeyHandler) key.attachment();
 220  0
                 kh.getSuspendableHandler().getSelectorHandler()
 221  
                         .getSelectionKeyHandler().cancel(kh.getKey());
 222  0
                 if (logger.isLoggable(Level.FINE) && kh != null) {
 223  0
                     logger.log(Level.FINE, "Interrupting: " + kh.getKey());
 224  
                 }
 225  
 
 226  0
                 if (kh != null) {
 227  0
                     kh.getSuspendableHandler().getSuspendableHandler().
 228  
                             interupted(kh.getSuspendableHandler().getAttachment());
 229  0
                     kh.getSuspendableHandler().getSuspendableFilter()
 230  
                             .suspendedKeys.remove(kh.getKey());
 231  
                 }
 232  0
             }
 233  
         }.start();
 234  1
     }
 235  
 
 236  
     
 237  
     /**
 238  
      * Suspend the {@link ReadableChannel} represented by this {@link SuspendableFilter.KeyHandler}
 239  
      * by registering it on secondary Selector.
 240  
      * @param kh The KeyHandler which hold the current SelectionKey.
 241  
      */
 242  
     protected void suspend(KeyHandler kh)
 243  
             throws ClosedChannelException {
 244  
         try{            
 245  9
             kh.setRegistrationTime(System.currentTimeMillis());
 246  9
             if (kh.getForeignKey() == null){
 247  
                 
 248  
                 /**
 249  
                  * Why this call block sometimes? Almost getting mad!!!
 250  
                  *                                
 251  
                 SelectionKey foreignKey = 
 252  
                         kh.getKey().channel()
 253  
                         .register(selector, SelectionKey.OP_READ, kh);
 254  
                 kh.setForeignKey(foreignKey);                
 255  
                 selector.wakeup();
 256  
                  */
 257  9
                 keysToRegister.offer(kh);
 258  9
                 selector.wakeup();
 259  
             }
 260  0
         } catch (Throwable ex){
 261  0
             logger.log(Level.SEVERE,"suspend exception: " + kh.getKey(), ex);
 262  9
         }
 263  9
     }
 264  
 }