Coverage Report - com.sun.grizzly.async.AbstractAsyncQueueWriter
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractAsyncQueueWriter
77 %
105/136
68 %
63/92
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 /*
 39  
  * To change this template, choose Tools | Templates
 40  
  * and open the template in the editor.
 41  
  */
 42  
 
 43  
 package com.sun.grizzly.async;
 44  
 
 45  
 import com.sun.grizzly.Controller;
 46  
 import com.sun.grizzly.SelectorHandler;
 47  
 import com.sun.grizzly.async.AsyncQueue.AsyncQueueEntry;
 48  
 import com.sun.grizzly.util.ByteBufferFactory;
 49  
 import java.io.IOException;
 50  
 import java.net.SocketAddress;
 51  
 import java.nio.ByteBuffer;
 52  
 import java.nio.channels.SelectableChannel;
 53  
 import java.nio.channels.SelectionKey;
 54  
 import java.nio.channels.WritableByteChannel;
 55  
 import java.util.concurrent.ConcurrentLinkedQueue;
 56  
 import java.util.concurrent.atomic.AtomicReference;
 57  
 import java.util.concurrent.locks.ReentrantLock;
 58  
 import java.util.logging.Level;
 59  
 
 60  
 /**
 61  
  *
 62  
  * @author oleksiys
 63  
  */
 64  
 public abstract class AbstractAsyncQueueWriter implements AsyncQueueWriter {
 65  
     private SelectorHandler selectorHandler;
 66  
     private AsyncQueue<SelectableChannel, AsyncWriteQueueRecord> writeQueue;
 67  
     private ConcurrentLinkedQueue<AsyncWriteQueueRecord> recordQueue;
 68  
     
 69  136
     public AbstractAsyncQueueWriter(SelectorHandler selectorHandler) {
 70  136
         this.selectorHandler = selectorHandler;
 71  136
         writeQueue = new AsyncQueue<SelectableChannel, AsyncWriteQueueRecord>();
 72  136
         recordQueue = new ConcurrentLinkedQueue<AsyncWriteQueueRecord>();
 73  136
     }
 74  
 
 75  
     /**
 76  
      * {@inheritDoc}
 77  
      */
 78  
     public void write(SelectionKey key, ByteBuffer buffer) throws IOException {
 79  0
         write(key, null, buffer, null);
 80  0
     }
 81  
     
 82  
     /**
 83  
      * {@inheritDoc}
 84  
      */
 85  
     public void write(SelectionKey key, ByteBuffer buffer, 
 86  
             AsyncWriteCallbackHandler callbackHandler) throws IOException {
 87  0
         write(key, null, buffer, callbackHandler, null);
 88  0
     }
 89  
 
 90  
     /**
 91  
      * {@inheritDoc}
 92  
      */
 93  
     public void write(SelectionKey key, ByteBuffer buffer, 
 94  
             AsyncWriteCallbackHandler callbackHandler, 
 95  
             AsyncQueueDataProcessor writePreProcessor) throws IOException {
 96  0
         write(key, null, buffer, callbackHandler, writePreProcessor, false);
 97  0
     }
 98  
 
 99  
     /**
 100  
      * {@inheritDoc}
 101  
      */
 102  
     public void write(SelectionKey key, ByteBuffer buffer, 
 103  
             AsyncWriteCallbackHandler callbackHandler, 
 104  
             AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer)
 105  
             throws IOException {
 106  886661
         write(key, null, buffer, callbackHandler, writePreProcessor, isCloneByteBuffer);
 107  886660
     }
 108  
 
 109  
     /**
 110  
      * {@inheritDoc}
 111  
      */
 112  
     public void write(SelectionKey key, SocketAddress dstAddress,
 113  
             ByteBuffer buffer) throws IOException {
 114  0
         write(key, dstAddress, buffer, null);
 115  0
     }
 116  
     
 117  
     /**
 118  
      * {@inheritDoc}
 119  
      */
 120  
     public void write(SelectionKey key, SocketAddress dstAddress, 
 121  
             ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) 
 122  
             throws IOException {
 123  0
         write(key, dstAddress, buffer, callbackHandler, null);
 124  0
     }
 125  
 
 126  
     /**
 127  
      * {@inheritDoc}
 128  
      */
 129  
     public void write(SelectionKey key, SocketAddress dstAddress, 
 130  
             ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler,
 131  
             AsyncQueueDataProcessor writePreProcessor) throws IOException {
 132  0
         write(key, dstAddress, buffer, callbackHandler, writePreProcessor, false);
 133  0
     }
 134  
 
 135  
     /**
 136  
      * {@inheritDoc}
 137  
      */
 138  
     public void write(SelectionKey key, SocketAddress dstAddress, 
 139  
             ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler,
 140  
             AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer)
 141  
             throws IOException {
 142  
         
 143  906660
         if (key == null) {
 144  0
             throw new IOException("SelectionKey is null! " +
 145  
                     "Probably key was cancelled or connection was closed?");
 146  
         }
 147  
 
 148  906661
         SelectableChannel channel = key.channel();
 149  906662
         AsyncQueueEntry channelEntry = 
 150  
                 writeQueue.obtainAsyncQueueEntry(channel);
 151  
         
 152  906397
         ConcurrentLinkedQueue<AsyncWriteQueueRecord> queue = channelEntry.queue;
 153  906662
         AtomicReference<AsyncWriteQueueRecord> currentElement = channelEntry.currentElement;
 154  906662
         ReentrantLock lock = channelEntry.queuedActionLock;
 155  
         
 156  
         // If AsyncQueue is empty - try to write ByteBuffer here
 157  
         try {
 158  906662
             AsyncWriteQueueRecord record = null;
 159  
 
 160  906662
             if (currentElement.get() == null && // Weak comparison for null
 161  
                     lock.tryLock()) {
 162  89237
                 record = obtainRecord();
 163  
                 // Strong comparison for null, because we're in locked region
 164  89237
                 if (currentElement.compareAndSet(null, record)) {
 165  89237
                     doWrite((WritableByteChannel) channel, dstAddress, buffer, 
 166  
                             writePreProcessor);
 167  
                 } else {
 168  0
                     lock.unlock();
 169  
                 }
 170  
             }
 171  
 
 172  906662
             if (buffer.hasRemaining() || 
 173  
                     (lock.isHeldByCurrentThread() && writePreProcessor != null && 
 174  
                     writePreProcessor.getInternalByteBuffer().hasRemaining())) {
 175  817425
                 if (record == null) {
 176  817425
                     record = obtainRecord();
 177  
                 }
 178  
                 
 179  
                 // clone ByteBuffer if required
 180  817162
                 if (isCloneByteBuffer) {
 181  275
                     int size = buffer.remaining();
 182  275
                     ByteBuffer newBuffer = ByteBufferFactory.allocateView(
 183  
                             size, buffer.isDirect());
 184  
 
 185  275
                     newBuffer.put(buffer);
 186  275
                     newBuffer.position(0);
 187  275
                     buffer = newBuffer;
 188  
                 }
 189  
 
 190  817075
                 record.set(buffer, callbackHandler, writePreProcessor, dstAddress);
 191  
 
 192  817425
                 boolean isRegisterForWriting = false;
 193  
                 
 194  
                 // add new element to the queue, if it's not current
 195  817425
                 if (currentElement.get() != record) {
 196  817425
                     queue.offer(record); // add to queue
 197  817425
                     if (!lock.isLocked()) {
 198  379506
                         isRegisterForWriting = true;
 199  
                     }
 200  
                 } else {  // if element was written direct (not fully written)
 201  0
                     isRegisterForWriting = true;
 202  0
                     lock.unlock();
 203  
                 }
 204  
                 
 205  817425
                 if (isRegisterForWriting) {
 206  379397
                     registerForWriting(key);
 207  
                 }
 208  817425
             } else { // If there are no bytes available for writing
 209  
                 
 210  
                 // Notify callback handler
 211  89237
                 if (callbackHandler != null) {
 212  0
                     callbackHandler.onWriteCompleted(key, buffer);
 213  
                 }
 214  
                 
 215  
                 // If buffer was written directly - set next queue element as current
 216  89237
                 if (lock.isHeldByCurrentThread()) {
 217  89237
                     AsyncWriteQueueRecord nextRecord = queue.poll();
 218  89237
                     if (nextRecord != null) { // if there is something in queue
 219  1413
                         currentElement.set(nextRecord); 
 220  1413
                         lock.unlock();
 221  1413
                         registerForWriting(key);
 222  
                     } else { // if nothing in queue
 223  87824
                         currentElement.set(null);
 224  87824
                         lock.unlock();  // unlock
 225  87824
                         if (queue.peek() != null) {  // check one more time
 226  7
                             registerForWriting(key);
 227  
                         }
 228  
                     }
 229  
                 }
 230  
                 
 231  
                 // Release record element
 232  89237
                 if (record != null) {
 233  89237
                     recordQueue.offer(record);
 234  
                 }
 235  
             }
 236  0
         } catch(IOException e) {
 237  0
             onClose(channel);
 238  0
             throw e;
 239  
         } finally {
 240  906025
             if (lock.isHeldByCurrentThread()) {
 241  0
                 lock.unlock();
 242  
             }
 243  
         }
 244  906662
     }
 245  
 
 246  
     /**
 247  
      * {@inheritDoc}
 248  
      */
 249  
     public boolean hasReadyAsyncWriteData(SelectionKey key) {
 250  2241
         AsyncQueueEntry channelEntry = 
 251  
                 writeQueue.getAsyncQueueEntry(key.channel());
 252  
         
 253  2241
         return channelEntry != null && (channelEntry.currentElement.get() != null || 
 254  
                 (channelEntry.queue != null && !channelEntry.queue.isEmpty()));
 255  
     }
 256  
     
 257  
     /**
 258  
      * {@inheritDoc}
 259  
      */
 260  
     public void onWrite(SelectionKey key) throws IOException {
 261  2183
         SelectableChannel channel = key.channel();
 262  
         
 263  2183
         AsyncQueueEntry channelEntry = 
 264  
                 writeQueue.obtainAsyncQueueEntry(channel);
 265  
         
 266  2183
         ConcurrentLinkedQueue<AsyncWriteQueueRecord> queue = channelEntry.queue;
 267  2183
         AtomicReference<AsyncWriteQueueRecord> currentElement = channelEntry.currentElement;
 268  2183
         ReentrantLock lock = channelEntry.queuedActionLock;
 269  
 
 270  2183
         if (currentElement.get() == null) {
 271  40
             AsyncWriteQueueRecord nextRecord = queue.peek();
 272  40
             if (nextRecord != null && lock.tryLock()) {
 273  16
                 if (!queue.isEmpty() && 
 274  
                         currentElement.compareAndSet(null, nextRecord)) {
 275  16
                     queue.remove();
 276  
                 }
 277  
             } else {
 278  24
                 return;
 279  
             }
 280  16
         } else if (!lock.tryLock()) {
 281  730
             return;
 282  
         }
 283  
 
 284  
         try {
 285  817425
             while (currentElement.get() != null) {
 286  817425
                 AsyncWriteQueueRecord queueRecord = currentElement.get();
 287  
 
 288  817425
                 ByteBuffer byteBuffer = queueRecord.byteBuffer;
 289  817425
                 AsyncQueueDataProcessor writePreProcessor = queueRecord.writePreProcessor;
 290  
                 try {
 291  817425
                     doWrite((WritableByteChannel) channel, 
 292  
                             queueRecord.dstAddress, byteBuffer, writePreProcessor);
 293  0
                 } catch (IOException e) {
 294  0
                     if (queueRecord.callbackHandler != null) {
 295  0
                         queueRecord.callbackHandler.onIOException(e, key,
 296  
                                 byteBuffer, queue);
 297  
                     } else {
 298  0
                         Controller.logger().log(Level.SEVERE,
 299  
                                 "Exception occured when executing " +
 300  
                                 "asynchronous queue writing", e);
 301  
                     }
 302  
 
 303  0
                     onClose(channel);
 304  817425
                 }
 305  
 
 306  
                 // check if buffer was completely written
 307  817425
                 if (!byteBuffer.hasRemaining() &&
 308  
                         (writePreProcessor == null ||
 309  
                         !writePreProcessor.getInternalByteBuffer().hasRemaining())) {
 310  817425
                     if (queueRecord.callbackHandler != null) {
 311  0
                         queueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
 312  
                     }
 313  
 
 314  817425
                     currentElement.set(queue.poll());
 315  817425
                     recordQueue.offer(queueRecord);
 316  
 
 317  
                     // If last element in queue is null - we have to be careful
 318  817425
                     if (currentElement.get() == null) {
 319  1430
                         lock.unlock();
 320  1430
                         AsyncWriteQueueRecord nextRecord = queue.peek();
 321  1430
                         if (nextRecord != null && lock.tryLock()) {
 322  1
                             if (!queue.isEmpty() && 
 323  
                                     currentElement.compareAndSet(null, nextRecord)) {
 324  1
                                 queue.remove();
 325  
                             }
 326  
                             
 327  
                             continue;
 328  
                         } else {
 329  
                             break;
 330  
                         }
 331  
                     }
 332  
                 } else { // if there is still some data in current buffer
 333  0
                     lock.unlock();
 334  0
                     registerForWriting(key);
 335  0
                     break;
 336  
                 }
 337  815995
             }
 338  
         } finally {
 339  1429
             if (lock.isHeldByCurrentThread()) {
 340  0
                 channelEntry.queuedActionLock.unlock();
 341  
             }
 342  
         }
 343  1429
     }
 344  
     
 345  
     /**
 346  
      * {@inheritDoc}
 347  
      */
 348  
     public void onClose(SelectableChannel channel) {
 349  439
         writeQueue.removeEntry(channel);
 350  439
     }
 351  
     
 352  
     /**
 353  
      * {@inheritDoc}
 354  
      */
 355  
     public void close() {
 356  136
         writeQueue.clear();
 357  136
     }
 358  
     
 359  
     protected void doWrite(WritableByteChannel channel, SocketAddress dstAddress,
 360  
             ByteBuffer byteBuffer, AsyncQueueDataProcessor writePreProcessor)
 361  
             throws IOException {
 362  906657
         if (writePreProcessor != null) {
 363  652741
             ByteBuffer resultByteBuffer = null;
 364  
             do {
 365  652741
                 if (byteBuffer.hasRemaining()) {
 366  652740
                     writePreProcessor.process(byteBuffer);
 367  
                 }
 368  
                 
 369  652741
                 resultByteBuffer = writePreProcessor.getInternalByteBuffer();
 370  652741
                 doWrite(channel, dstAddress, resultByteBuffer);
 371  652735
             } while(byteBuffer.hasRemaining() && 
 372  
                     !resultByteBuffer.hasRemaining());
 373  652738
         } else {
 374  253921
             doWrite(channel, dstAddress, byteBuffer);
 375  
         }
 376  906655
     }
 377  
 
 378  
     protected abstract void doWrite(WritableByteChannel channel, SocketAddress dstAddress,
 379  
             ByteBuffer byteBuffer) throws IOException;
 380  
     
 381  
     protected void registerForWriting(SelectionKey key) {
 382  380926
         selectorHandler.register(key, SelectionKey.OP_WRITE);
 383  380926
     }
 384  
     
 385  
     private AsyncWriteQueueRecord obtainRecord() {
 386  906662
         AsyncWriteQueueRecord record = recordQueue.poll();
 387  906655
         if (record == null) {
 388  4202
             record = new AsyncWriteQueueRecord();
 389  
         }
 390  
         
 391  906662
         return record;
 392  
     }    
 393  
 }