Coverage Report - com.sun.grizzly.async.AbstractAsyncQueueReader
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractAsyncQueueReader
72 %
86/119
48 %
42/88
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 /*
 39  
  * To change this template, choose Tools | Templates
 40  
  * and open the template in the editor.
 41  
  */
 42  
 
 43  
 package com.sun.grizzly.async;
 44  
 
 45  
 import com.sun.grizzly.Controller;
 46  
 import com.sun.grizzly.SelectorHandler;
 47  
 import com.sun.grizzly.async.AsyncQueue.AsyncQueueEntry;
 48  
 import java.io.IOException;
 49  
 import java.net.SocketAddress;
 50  
 import java.nio.ByteBuffer;
 51  
 import java.nio.channels.ReadableByteChannel;
 52  
 import java.nio.channels.SelectableChannel;
 53  
 import java.nio.channels.SelectionKey;
 54  
 import java.util.concurrent.ConcurrentLinkedQueue;
 55  
 import java.util.concurrent.atomic.AtomicReference;
 56  
 import java.util.concurrent.locks.ReentrantLock;
 57  
 import java.util.logging.Level;
 58  
 
 59  
 /**
 60  
  *
 61  
  * @author oleksiys
 62  
  */
 63  
 public abstract class AbstractAsyncQueueReader implements AsyncQueueReader {
 64  
     private SelectorHandler selectorHandler;
 65  
     private AsyncQueue<SelectableChannel, AsyncReadQueueRecord> readQueue;
 66  
     private ConcurrentLinkedQueue<AsyncReadQueueRecord> recordQueue;
 67  
     
 68  136
     public AbstractAsyncQueueReader(SelectorHandler selectorHandler) {
 69  136
         this.selectorHandler = selectorHandler;
 70  136
         readQueue = new AsyncQueue<SelectableChannel, AsyncReadQueueRecord>();
 71  136
         recordQueue = new ConcurrentLinkedQueue<AsyncReadQueueRecord>();
 72  136
     }
 73  
     
 74  
     /**
 75  
      * {@inheritDoc}
 76  
      */
 77  
     public void read(SelectionKey key, ByteBuffer buffer, 
 78  
             AsyncReadCallbackHandler callbackHandler) throws IOException {
 79  0
         read(key, buffer, callbackHandler, null);
 80  0
     }
 81  
 
 82  
     /**
 83  
      * {@inheritDoc}
 84  
      */
 85  
     public void read(SelectionKey key, ByteBuffer buffer, 
 86  
             AsyncReadCallbackHandler callbackHandler, 
 87  
             AsyncReadCondition condition) throws IOException {
 88  0
         read(key, buffer, callbackHandler, condition, null);
 89  0
     }
 90  
     
 91  
     /**
 92  
      * {@inheritDoc}
 93  
      */
 94  
     public void read(SelectionKey key, ByteBuffer buffer, 
 95  
             AsyncReadCallbackHandler callbackHandler, 
 96  
             AsyncReadCondition condition, 
 97  
             AsyncQueueDataProcessor readPostProcessor) throws IOException {
 98  
 
 99  300
         if (key == null) {
 100  0
             throw new IOException("SelectionKey is null! " +
 101  
                     "Probably key was cancelled or connection was closed?");
 102  
         }
 103  
 
 104  300
         SelectableChannel channel = (SelectableChannel) key.channel();
 105  300
         AsyncQueueEntry channelEntry = 
 106  
                 readQueue.obtainAsyncQueueEntry(channel);
 107  
         
 108  300
         ConcurrentLinkedQueue<AsyncReadQueueRecord> queue = channelEntry.queue;
 109  300
         AtomicReference<AsyncReadQueueRecord> currentElement = channelEntry.currentElement;
 110  300
         ReentrantLock lock = channelEntry.queuedActionLock;
 111  
         
 112  
         // If AsyncQueue is empty - try to read ByteBuffer here
 113  
         try {
 114  300
             AsyncReadQueueRecord record = null;
 115  300
             SocketAddress address = null;
 116  300
             boolean isDirectReadCompleted = false;
 117  
 
 118  300
             if (currentElement.get() == null && // Weak comparison for null
 119  
                     lock.tryLock()) {
 120  300
                 record = obtainRecord();
 121  
                 // Strong comparison for null, because we're in locked region
 122  300
                 if (currentElement.compareAndSet(null, record)) {
 123  
                     
 124  
                     // Do direct reading
 125  
                     do {
 126  256440
                         address = doRead((ReadableByteChannel) channel, buffer, readPostProcessor);
 127  
                         // If some data was read - we need to check "condition"
 128  
                         // Check is performed for each message separately, not like for TCP
 129  256440
                         if (address != null &&
 130  
                                 (!buffer.hasRemaining() || (condition != null &&
 131  
                                 condition.checkAsyncReadCompleted(key, address, buffer)))) {
 132  200
                             isDirectReadCompleted = true;
 133  200
                             break;
 134  
                         }
 135  256240
                     } while (address != null);
 136  
                 } else {
 137  0
                     lock.unlock();
 138  
                 }
 139  
             }
 140  
 
 141  300
             if (!isDirectReadCompleted && buffer.hasRemaining()) {
 142  100
                 if (record == null) {
 143  0
                     record = obtainRecord();
 144  
                 }
 145  
 
 146  100
                 record.set(buffer, callbackHandler, condition, readPostProcessor);
 147  
 
 148  100
                 boolean isRegisterForReading = false;
 149  
                 
 150  
                 // add new element to the queue, if it's not current
 151  100
                 if (currentElement.get() != record) {
 152  0
                     queue.offer(record); // add to queue
 153  0
                     if (!lock.isLocked()) {
 154  0
                         isRegisterForReading = true;
 155  
                     }
 156  
                 } else {  // if element was read direct (not fully read)
 157  100
                     isRegisterForReading = true;
 158  100
                     lock.unlock();
 159  
                 }
 160  
                 
 161  100
                 if (isRegisterForReading) {
 162  100
                     registerForReading(key);
 163  
                 }
 164  100
             } else { // If there are no bytes available for reading
 165  
                 
 166  
                 // Notify callback handler
 167  200
                 if (callbackHandler != null) {
 168  200
                     callbackHandler.onReadCompleted(key,
 169  
                             address, buffer);
 170  
                 }
 171  
                 
 172  
                 // If buffer was read directly - set next queue element as current
 173  200
                 if (lock.isHeldByCurrentThread()) {
 174  200
                     AsyncReadQueueRecord nextRecord = queue.poll();
 175  200
                     if (nextRecord != null) { // if there is something in queue
 176  0
                         currentElement.set(nextRecord); 
 177  0
                         lock.unlock();
 178  0
                         registerForReading(key);
 179  
                     } else { // if nothing in queue
 180  200
                         currentElement.set(null);
 181  200
                         lock.unlock();  // unlock
 182  200
                         if (queue.peek() != null) {  // check one more time
 183  0
                             registerForReading(key);
 184  
                         }
 185  
                     }
 186  
                 }
 187  
                 
 188  
                 // Release record element
 189  200
                 if (record != null) {
 190  200
                     recordQueue.offer(record);
 191  
                 }
 192  
             }
 193  0
         } catch(IOException e) {
 194  0
             onClose(channel);
 195  0
             throw e;
 196  
         } finally {
 197  300
             if (lock.isHeldByCurrentThread()) {
 198  0
                 lock.unlock();
 199  
             }
 200  
         }
 201  300
     }
 202  
     
 203  
     /**
 204  
      * {@inheritDoc}
 205  
      */
 206  
     public boolean isAsyncQueueReaderEnabledFor(SelectionKey key) {
 207  153611
         AsyncQueueEntry channelEntry = 
 208  
                 readQueue.getAsyncQueueEntry(key.channel());
 209  
         
 210  153611
         return channelEntry != null && (channelEntry.currentElement != null || 
 211  
                 (channelEntry.queue != null && !channelEntry.queue.isEmpty()));
 212  
     }
 213  
 
 214  
     /**
 215  
      * {@inheritDoc}
 216  
      */
 217  
     public void onRead(SelectionKey key) throws IOException {
 218  9737
         SelectableChannel channel = key.channel();
 219  
         
 220  9737
         AsyncQueueEntry channelEntry = 
 221  
                 readQueue.obtainAsyncQueueEntry(channel);
 222  
         
 223  9737
         ConcurrentLinkedQueue<AsyncReadQueueRecord> queue = channelEntry.queue;
 224  9737
         AtomicReference<AsyncReadQueueRecord> currentElement = channelEntry.currentElement;
 225  9737
         ReentrantLock lock = channelEntry.queuedActionLock;
 226  
 
 227  9737
         if (currentElement.get() == null) {
 228  0
             AsyncReadQueueRecord nextRecord = queue.peek();
 229  0
             if (nextRecord != null && lock.tryLock()) {
 230  0
                 if (!queue.isEmpty() &&
 231  
                         currentElement.compareAndSet(null, nextRecord)) {
 232  0
                     queue.remove();
 233  
                 }
 234  
             } else {
 235  0
                 return;
 236  
             }
 237  0
         } else if (!lock.tryLock()) {
 238  2
             return;
 239  
         }
 240  
 
 241  
         try {
 242  9735
             while (currentElement.get() != null) {
 243  9735
                 AsyncReadQueueRecord queueRecord = currentElement.get();
 244  
 
 245  9735
                 ByteBuffer byteBuffer = queueRecord.byteBuffer;
 246  9735
                 SocketAddress address = null;
 247  9735
                 AsyncQueueDataProcessor readPostProcessor = queueRecord.readPostProcessor;
 248  
                 try {
 249  9735
                     address = doRead((ReadableByteChannel) channel, byteBuffer, readPostProcessor);
 250  0
                 } catch (IOException e) {
 251  0
                     if (queueRecord.callbackHandler != null) {
 252  0
                         queueRecord.callbackHandler.onIOException(e, key,
 253  
                                 byteBuffer, queue);
 254  
                     } else {
 255  0
                         Controller.logger().log(Level.SEVERE,
 256  
                                 "Exception occured when executing " +
 257  
                                 "asynchronous queue reading", e);
 258  
                     }
 259  
 
 260  0
                     onClose(channel);
 261  9735
                 }
 262  
 
 263  
                 // check if buffer was completely read
 264  9735
                 AsyncReadCondition condition = queueRecord.condition;
 265  9735
                 if (!byteBuffer.hasRemaining() || (condition != null &&
 266  
                         condition.checkAsyncReadCompleted(key, address, byteBuffer))) {
 267  100
                     if (queueRecord.callbackHandler != null) {
 268  100
                         queueRecord.callbackHandler.onReadCompleted(
 269  
                                 key, address, byteBuffer);
 270  
                     }
 271  
 
 272  100
                     currentElement.set(queue.poll());
 273  100
                     recordQueue.offer(queueRecord);
 274  
 
 275  
                     // If last element in queue is null - we have to be careful
 276  100
                     if (currentElement.get() == null) {
 277  100
                         lock.unlock();
 278  100
                         AsyncReadQueueRecord nextRecord = queue.peek();
 279  100
                         if (nextRecord != null && lock.tryLock()) {
 280  0
                             if (!queue.isEmpty() &&
 281  
                                     currentElement.compareAndSet(null, nextRecord)) {
 282  0
                                 queue.remove();
 283  
                             }
 284  
                             
 285  
                             continue;
 286  
                         } else {
 287  
                             break;
 288  
                         }
 289  
                     }
 290  
                 } else { // if there is still some data in current buffer
 291  9635
                     lock.unlock();
 292  9635
                     registerForReading(key);
 293  9635
                     break;
 294  
                 }
 295  0
             }
 296  
         } finally {
 297  9735
             if (lock.isHeldByCurrentThread()) {
 298  0
                 channelEntry.queuedActionLock.unlock();
 299  
             }
 300  
         }
 301  9735
     }
 302  
 
 303  
     /**
 304  
      * {@inheritDoc}
 305  
      */
 306  
     public void onClose(SelectableChannel channel) {
 307  423
         readQueue.removeEntry(channel);
 308  423
     }
 309  
     
 310  
     /**
 311  
      * {@inheritDoc}
 312  
      */
 313  
     public void close() {
 314  136
         readQueue.clear();
 315  136
         readQueue = null;
 316  136
     }
 317  
 
 318  
     protected abstract SocketAddress doRead(ReadableByteChannel channel, ByteBuffer byteBuffer, 
 319  
             AsyncQueueDataProcessor readPostProcessor) throws IOException;
 320  
 
 321  
     private void registerForReading(SelectionKey key) {
 322  9735
         selectorHandler.register(key, SelectionKey.OP_READ);
 323  9735
     }
 324  
     
 325  
     private AsyncReadQueueRecord obtainRecord() {
 326  300
         AsyncReadQueueRecord record = recordQueue.poll();
 327  300
         if (record == null) {
 328  3
             record = new AsyncReadQueueRecord();
 329  
         }
 330  
         
 331  300
         return record;
 332  
     }
 333  
 }