Coverage Report - com.sun.grizzly.connectioncache.impl.transport.OutboundConnectionCacheBlockingImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
OutboundConnectionCacheBlockingImpl
67 %
156/233
58 %
106/184
0
OutboundConnectionCacheBlockingImpl$1
100 %
1/1
N/A
0
OutboundConnectionCacheBlockingImpl$CacheEntry
100 %
6/6
N/A
0
OutboundConnectionCacheBlockingImpl$ConnectionState
90 %
9/10
N/A
0
OutboundConnectionCacheBlockingImpl$ConnectionStateValue
100 %
1/1
N/A
0
 
 1  
 /*
 2  
  * 
 3  
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 4  
  * 
 5  
  * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
 6  
  * 
 7  
  * The contents of this file are subject to the terms of either the GNU
 8  
  * General Public License Version 2 only ("GPL") or the Common Development
 9  
  * and Distribution License("CDDL") (collectively, the "License").  You
 10  
  * may not use this file except in compliance with the License. You can obtain
 11  
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 12  
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 13  
  * language governing permissions and limitations under the License.
 14  
  * 
 15  
  * When distributing the software, include this License Header Notice in each
 16  
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 17  
  * Sun designates this particular file as subject to the "Classpath" exception
 18  
  * as provided by Sun in the GPL Version 2 section of the License file that
 19  
  * accompanied this code.  If applicable, add the following below the License
 20  
  * Header, with the fields enclosed by brackets [] replaced by your own
 21  
  * identifying information: "Portions Copyrighted [year]
 22  
  * [name of copyright owner]"
 23  
  * 
 24  
  * Contributor(s):
 25  
  * 
 26  
  * If you wish your version of this file to be governed by only the CDDL or
 27  
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
 28  
  * elects to include this software in this distribution under the [CDDL or GPL
 29  
  * Version 2] license."  If you don't indicate a single choice of license, a
 30  
  * recipient has the option to distribute your version of this file under
 31  
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
 32  
  * its licensees as provided above.  However, if you add GPL Version 2 code
 33  
  * and therefore, elected the GPL Version 2 license, then the option applies
 34  
  * only if the new code is made subject to such option by the copyright
 35  
  * holder.
 36  
  *
 37  
  */
 38  
 
 39  
 package com.sun.grizzly.connectioncache.impl.transport;
 40  
 
 41  
 import com.sun.grizzly.connectioncache.spi.concurrent.ConcurrentQueue;
 42  
 import com.sun.grizzly.connectioncache.spi.transport.ConnectionFinder;
 43  
 import com.sun.grizzly.connectioncache.spi.transport.ContactInfo;
 44  
 import com.sun.grizzly.connectioncache.spi.transport.OutboundConnectionCache;
 45  
 import java.io.Closeable;
 46  
 import java.io.IOException ;
 47  
 
 48  
 import java.util.Map ;
 49  
 import java.util.HashMap ;
 50  
 import java.util.Queue ;
 51  
 import java.util.Collection ;
 52  
 import java.util.Collections ;
 53  
 
 54  
 import java.util.concurrent.LinkedBlockingQueue ;
 55  
 
 56  
 import java.util.logging.Logger ;
 57  
 
 58  266
 public final class OutboundConnectionCacheBlockingImpl<C extends Closeable>
 59  
         extends ConnectionCacheBlockingBase<C>
 60  
         implements OutboundConnectionCache<C> {
 61  
     
 62  
     // Configuration data
 63  
     // XXX we may want this data to be dynamically re-configurable
 64  
     private final int maxParallelConnections ;        // Maximum number of
 65  
     // connections we will open
 66  
     // to the same endpoint
 67  
     
 68  
     private Map<ContactInfo<C>,CacheEntry<C>> entryMap ;
 69  
     private Map<C,ConnectionState<C>> connectionMap ;
 70  
     
 71  
     public int maxParallelConnections() {
 72  0
         return maxParallelConnections ;
 73  
     }
 74  
     
 75  
     protected String thisClassName() {
 76  0
         return "OutboundConnectionCacheBlockingImpl" ;
 77  
     }
 78  
     
 79  
     // NEW: connection was just created; currently not queued
 80  
     // BUSY: connection queued on busyConnections queue
 81  
     // IDLE: connection queued on idleConnections queue
 82  5
     private enum ConnectionStateValue { NEW, BUSY, IDLE }
 83  
     
 84  
     private static final class ConnectionState<C extends Closeable> {
 85  
         ConnectionStateValue csv ;                        // Indicates state of
 86  
         // connection
 87  
         final ContactInfo<C> cinfo ;                        // ContactInfo used to
 88  
         // create this
 89  
         // Connection
 90  
         final C connection ;                                // Connection of the
 91  
         // ConnectionState
 92  
         final CacheEntry<C> entry ;                        // This Connection's
 93  
         // CacheEntry
 94  
         
 95  
         int busyCount ;                                        // Number of calls to
 96  
         // get without release
 97  
         int expectedResponseCount ;                        // Number of expected
 98  
         // responses not yet
 99  
         // received
 100  
         
 101  
         // At all times, a connection is either on the busy or idle queue in
 102  
         // its ConnectionEntry.  If the connection is on the idle queue,
 103  
         // reclaimableHandle may also be non-null if the Connection is also on
 104  
         // the reclaimableConnections queue.
 105  
         ConcurrentQueue.Handle<C> reclaimableHandle ;        // non-null iff
 106  
         // connection is not
 107  
         // in use and has no
 108  
         // outstanding requests
 109  
         
 110  
         ConnectionState( final ContactInfo<C> cinfo, final CacheEntry<C> entry,
 111  36
                 final C conn ) {
 112  
             
 113  36
             this.csv = ConnectionStateValue.NEW ;
 114  36
             this.cinfo = cinfo ;
 115  36
             this.connection = conn ;
 116  36
             this.entry = entry ;
 117  
             
 118  36
             busyCount = 0 ;
 119  36
             expectedResponseCount = 0 ;
 120  36
             reclaimableHandle = null ;
 121  36
         }
 122  
         
 123  
         public String toString() {
 124  0
             return "ConnectionState["
 125  
                     + "cinfo=" + cinfo
 126  
                     + " connection=" + connection
 127  
                     + " busyCount=" + busyCount
 128  
                     + " expectedResponseCount=" + expectedResponseCount
 129  
                     + "]" ;
 130  
         }
 131  
     }
 132  
     
 133  
     // Represents an entry in the outbound connection cache.
 134  
     // This version handles normal shareable ContactInfo
 135  
     // (we also need to handle no share).
 136  20
     private static final class CacheEntry<C extends Closeable> {
 137  10
         final Queue<C> idleConnections = new LinkedBlockingQueue<C>() ;
 138  10
         final Collection<C> idleConnectionsView =
 139  
                 Collections.unmodifiableCollection( idleConnections ) ;
 140  
         
 141  10
         final Queue<C> busyConnections = new LinkedBlockingQueue<C>() ;
 142  10
         final Collection<C> busyConnectionsView =
 143  
                 Collections.unmodifiableCollection( busyConnections ) ;
 144  
         
 145  
         public int totalConnections() {
 146  38
             return idleConnections.size() + busyConnections.size() ;
 147  
         }
 148  
     }
 149  
     
 150  
     public OutboundConnectionCacheBlockingImpl( final String cacheType,
 151  
             final int highWaterMark, final int numberToReclaim,
 152  
             final int maxParallelConnections, Logger logger ) {
 153  
         
 154  2
         super( cacheType, highWaterMark, numberToReclaim, logger ) ;
 155  
         
 156  2
         if (maxParallelConnections < 1)
 157  0
             throw new IllegalArgumentException(
 158  
                     "maxParallelConnections must be > 0" ) ;
 159  
         
 160  2
         this.maxParallelConnections = maxParallelConnections ;
 161  
         
 162  2
         this.entryMap = new HashMap<ContactInfo<C>,CacheEntry<C>>() ;
 163  2
         this.connectionMap = new HashMap<C,ConnectionState<C>>() ;
 164  
         
 165  2
         if (debug()) {
 166  0
             dprint(".constructor completed: " + cacheType );
 167  
         }
 168  2
     }
 169  
     
 170  
     public boolean canCreateNewConnection( ContactInfo<C> cinfo ) {
 171  0
         CacheEntry<C> entry = entryMap.get( cinfo ) ;
 172  0
         if (entry == null)
 173  0
             return true ;
 174  
         
 175  0
         return internalCanCreateNewConnection( entry ) ;
 176  
     }
 177  
     
 178  
     private boolean internalCanCreateNewConnection( final CacheEntry<C> entry ) {
 179  38
         final int totalConnectionsInEntry = entry.totalConnections() ;
 180  
         
 181  38
         final boolean createNewConnection =
 182  
                 (totalConnectionsInEntry == 0) ||
 183  
                 ((numberOfConnections() < highWaterMark()) &&
 184  
                 (totalConnectionsInEntry < maxParallelConnections)) ;
 185  
         
 186  38
         return createNewConnection ;
 187  
     }
 188  
     
 189  
     private CacheEntry<C> getEntry( final ContactInfo<C> cinfo
 190  
             ) throws IOException {
 191  
         
 192  52
         if (debug()) {
 193  0
             dprint( "->getEntry: " + cinfo ) ;
 194  
         }
 195  
         
 196  
         try {
 197  
             // This should be the only place a CacheEntry is constructed.
 198  52
             CacheEntry<C> result = entryMap.get( cinfo ) ;
 199  52
             if (result == null) {
 200  10
                 if (debug()) {
 201  0
                     dprint( ".getEntry: " + cinfo
 202  
                             + " creating new CacheEntry" ) ;
 203  
                 }
 204  
                 
 205  10
                 result = new CacheEntry<C>() ;
 206  10
                 entryMap.put( cinfo, result ) ;
 207  
             } else {
 208  42
                 if (debug()) {
 209  0
                     dprint( ".getEntry: " + cinfo +
 210  
                             " re-using existing CacheEntry" ) ;
 211  
                 }
 212  
             }
 213  
             
 214  52
             return result ;
 215  
         } finally {
 216  52
             if (debug()) {
 217  0
                 dprint( "<-getEntry: " + cinfo ) ;
 218  
             }
 219  
         }
 220  
     }
 221  
     
 222  
     // Note that tryNewConnection will ALWAYS create a new connection if
 223  
     // no connection currently exists.
 224  
     private C tryNewConnection( final CacheEntry<C> entry,
 225  
             final ContactInfo<C> cinfo ) throws IOException {
 226  
         
 227  38
         if (debug())
 228  0
             dprint( "->tryNewConnection: " + cinfo ) ;
 229  
         
 230  
         try {
 231  38
             C conn = null ;
 232  
             
 233  38
             if (internalCanCreateNewConnection(entry)) {
 234  
                 // If this throws an exception just let it
 235  
                 // propagate: let a higher layer handle a
 236  
                 // connection creation failure.
 237  36
                 conn = cinfo.createConnection() ;
 238  
                 
 239  35
                 if (debug()) {
 240  0
                     dprint( ".tryNewConnection: " + cinfo
 241  
                             + " created connection " + conn ) ;
 242  
                 }
 243  
             }
 244  
             
 245  37
             return conn ;
 246  
         } finally {
 247  38
             if (debug())
 248  0
                 dprint( "<-tryNewConnection: " + cinfo ) ;
 249  
         }
 250  
     }
 251  
     
 252  
     private void decrementTotalIdle() {
 253  23
         if (debug())
 254  0
             dprint( "->decrementTotalIdle: totalIdle = "
 255  
                     + totalIdle ) ;
 256  
         
 257  
         try {
 258  23
             if (totalIdle > 0) {
 259  23
                 totalIdle-- ;
 260  
             } else {
 261  0
                 if (debug()) {
 262  0
                     dprint( ".decrementTotalIdle: "
 263  
                             + "incorrect idle count: was already 0" ) ;
 264  
                 }
 265  
             }
 266  
         } finally {
 267  23
             if (debug()) {
 268  0
                 dprint( "<-decrementTotalIdle: totalIdle = "
 269  
                         + totalIdle ) ;
 270  
             }
 271  
         }
 272  23
     }
 273  
     
 274  
     private void decrementTotalBusy() {
 275  47
         if (debug())
 276  0
             dprint( "->decrementTotalBusy: totalBusy = "
 277  
                     + totalBusy ) ;
 278  
         
 279  
         try {
 280  47
             if (totalBusy > 0) {
 281  47
                 totalBusy-- ;
 282  
             } else {
 283  0
                 if (debug()) {
 284  0
                     dprint( ".decrementTotalBusy: "
 285  
                             + "incorrect idle count: was already 0" ) ;
 286  
                 }
 287  
             }
 288  
         } finally {
 289  47
             if (debug()) {
 290  0
                 dprint( "<-decrementTotalBusy: totalBusy = "
 291  
                         + totalBusy ) ;
 292  
             }
 293  
         }
 294  47
     }
 295  
     
 296  
     // Update queues and counts to make the result busy.
 297  
     private void makeResultBusy( C result, ConnectionState<C> cs,
 298  
             CacheEntry<C> entry ) {
 299  
         
 300  50
         if (debug())
 301  0
             dprint( "->makeResultBusy: " + result
 302  
                     + " was previously " + cs.csv ) ;
 303  
         
 304  
         try {
 305  50
             switch (cs.csv) {
 306  
                 case NEW :
 307  36
                     totalBusy++ ;
 308  36
                     break ;
 309  
                     
 310  
                 case IDLE :
 311  11
                     totalBusy++ ;
 312  11
                     decrementTotalIdle() ;
 313  
                     
 314  11
                     final ConcurrentQueue.Handle<C> handle =
 315  
                             cs.reclaimableHandle ;
 316  
                     
 317  11
                     if (handle != null) {
 318  11
                         if (!handle.remove()) {
 319  0
                             if (debug()) {
 320  0
                                 dprint( ".makeResultBusy: " + cs.cinfo
 321  
                                         + " result was not on reclaimable Q" ) ;
 322  
                             }
 323  
                         }
 324  11
                         cs.reclaimableHandle = null ;
 325  
                     }
 326  
                     break ;
 327  
                     
 328  
                 case BUSY :
 329  
                     // Nothing to do here
 330  
                     break ;
 331  
             }
 332  
             
 333  50
             entry.busyConnections.offer( result ) ;
 334  50
             cs.csv = ConnectionStateValue.BUSY ;
 335  50
             cs.busyCount++ ;
 336  
         } finally {
 337  50
             if (debug())
 338  0
                 dprint( "<-makeResultBusy: " + result ) ;
 339  
         }
 340  50
     }
 341  
     
 342  
     private C tryIdleConnections( CacheEntry<C> entry ) {
 343  48
         if (debug()) {
 344  0
             dprint( "->tryIdleConnections" ) ;
 345  
         }
 346  
         
 347  
         try {
 348  48
             return entry.idleConnections.poll() ;
 349  
         } finally {
 350  48
             if (debug()) {
 351  0
                 dprint( "<-tryIdleConnections" ) ;
 352  
             }
 353  
         }
 354  
     }
 355  
     
 356  
     private C tryBusyConnections( CacheEntry<C> entry ) {
 357  
         // Use a busy connection.  Note that there MUST be a busy
 358  
         // connection available at this point, because
 359  
         // tryNewConnection did not create a new connection.
 360  2
         if (debug()) {
 361  0
             dprint( "->tryBusyConnections" ) ;
 362  
         }
 363  
         
 364  
         try {
 365  2
             C result = entry.busyConnections.poll() ;
 366  
             
 367  2
             if (result == null) {
 368  0
                 throw new RuntimeException(
 369  
                         "INTERNAL ERROR: no busy connection available" ) ;
 370  
             }
 371  
             
 372  2
             return result ;
 373  
         } finally {
 374  2
             if (debug()) {
 375  0
                 dprint( "<-tryBusyConnections" ) ;
 376  
             }
 377  
         }
 378  
     }
 379  
     
 380  
     public synchronized C get( final ContactInfo<C> cinfo
 381  
             ) throws IOException {
 382  
         
 383  37
         return get( cinfo, null ) ;
 384  
     }
 385  
     
 386  
     public synchronized ConnectionState<C> getConnectionState(
 387  
             ContactInfo<C> cinfo, CacheEntry<C> entry, C conn ) {
 388  
         
 389  50
         if (debug())
 390  0
             dprint( "->getConnectionState: " + conn ) ;
 391  
         
 392  
         try {
 393  50
             ConnectionState<C> cs = connectionMap.get( conn ) ;
 394  50
             if (cs == null) {
 395  36
                 if (debug())
 396  0
                     dprint( ".getConnectionState: " + conn
 397  
                             + " creating new ConnectionState" + cs ) ;
 398  
                 
 399  36
                 cs = new ConnectionState<C>( cinfo, entry, conn ) ;
 400  36
                 connectionMap.put( conn, cs ) ;
 401  
             } else {
 402  14
                 if (debug())
 403  0
                     dprint( ".getConnectionState: " + conn
 404  
                             + " found ConnectionState" + cs ) ;
 405  
             }
 406  
             
 407  50
             return cs ;
 408  
         } finally {
 409  50
             if (debug())
 410  0
                 dprint( "<-getConnectionState: " + conn ) ;
 411  
         }
 412  
     }
 413  
     
 414  
     public synchronized C get( final ContactInfo<C> cinfo,
 415  
             final ConnectionFinder<C> finder ) throws IOException {
 416  
         
 417  52
         if (debug()) {
 418  0
             dprint( "->get: " + cinfo ) ;
 419  
         }
 420  
         
 421  52
         ConnectionState<C> cs = null ;
 422  
         
 423  
         try {
 424  52
             final CacheEntry<C> entry = getEntry( cinfo ) ;
 425  52
             C result = null ;
 426  
             
 427  52
             if (numberOfConnections() >= highWaterMark()) {
 428  
                 // This reclaim probably does nothing, because
 429  
                 // connections are reclaimed on release in the
 430  
                 // overflow state.
 431  4
                 reclaim() ;
 432  
             }
 433  
             
 434  52
             if (finder != null) {
 435  
                 // Try the finder if present.
 436  5
                 if (debug()) {
 437  0
                     dprint( ".get: " + cinfo +
 438  
                             " Calling the finder to get a connection" ) ;
 439  
                 }
 440  
                 
 441  5
                 result = finder.find( cinfo, entry.idleConnectionsView,
 442  
                         entry.busyConnectionsView ) ;
 443  
                 
 444  4
                 if (result != null) {
 445  3
                     cs = getConnectionState( cinfo, entry, result ) ;
 446  
                     
 447  
                     // Dequeue from cache entry if not NEW
 448  3
                     if (cs.csv == ConnectionStateValue.BUSY)
 449  1
                         entry.busyConnections.remove( result ) ;
 450  2
                     else if (cs.csv == ConnectionStateValue.IDLE)
 451  1
                         entry.idleConnections.remove( result ) ;
 452  
                 }
 453  
             }
 454  
             
 455  51
             if (result == null) {
 456  48
                 result = tryIdleConnections( entry ) ;
 457  
             }
 458  
             
 459  51
             if (result == null) {
 460  38
                 result = tryNewConnection( entry, cinfo ) ;
 461  
             }
 462  
             
 463  50
             if (result == null) {
 464  2
                 result = tryBusyConnections( entry ) ;
 465  
             }
 466  
             
 467  50
             if (cs == null)
 468  47
                 cs = getConnectionState( cinfo, entry, result ) ;
 469  
             
 470  50
             makeResultBusy( result, cs, entry ) ;
 471  50
             return result ;
 472  
         } finally {
 473  52
             if (debug()) {
 474  0
                 dprint( ".get " + cinfo
 475  
                         + " totalIdle=" + totalIdle
 476  
                         + " totalBusy=" + totalBusy ) ;
 477  
                 
 478  0
                 dprint( "<-get " + cinfo + " ConnectionState=" + cs ) ;
 479  
             }
 480  
         }
 481  
     }
 482  
     
 483  
     // If overflow, close conn and return true,
 484  
     // otherwise enqueue on reclaimable queue and return false.
 485  
     private boolean reclaimOrClose( ConnectionState<C> cs, final C conn ) {
 486  26
         if (debug())
 487  0
             dprint( "->reclaimOrClose: " + conn ) ;
 488  
         
 489  
         try {
 490  26
             final boolean isOverflow = numberOfConnections() >
 491  
                     highWaterMark() ;
 492  
             
 493  26
             if (isOverflow) {
 494  3
                 if (debug()) {
 495  0
                     dprint( ".reclaimOrClose: closing overflow connection "
 496  
                             + conn ) ;
 497  
                 }
 498  
                 
 499  3
                 close( conn ) ;
 500  
             } else {
 501  23
                 if (debug()) {
 502  0
                     dprint( ".reclaimOrClose: queuing reclaimable connection "
 503  
                             + conn ) ;
 504  
                 }
 505  
                 
 506  23
                 cs.reclaimableHandle =
 507  
                         reclaimableConnections.offer( conn ) ;
 508  
             }
 509  
             
 510  26
             return isOverflow ;
 511  
         } finally {
 512  26
             if (debug())
 513  0
                 dprint( "<-reclaimOrClose: " + conn ) ;
 514  
         }
 515  
     }
 516  
     
 517  
     public synchronized void release( final C conn,
 518  
             final int numResponsesExpected ) {
 519  
         
 520  29
         if (debug()) {
 521  0
             dprint( "->release: " + conn
 522  
                     + " expecting " + numResponsesExpected + " responses" ) ;
 523  
         }
 524  
         
 525  29
         final ConnectionState<C> cs = connectionMap.get( conn ) ;
 526  
         
 527  
         try {
 528  29
             if (cs == null) {
 529  0
                 if (debug()) {
 530  0
                     dprint( ".release: " + conn + " was closed" ) ;
 531  
                 }
 532  
                 
 533  
                 return ;
 534  
             } else {
 535  29
                 cs.expectedResponseCount += numResponsesExpected ;
 536  29
                 int numResp = cs.expectedResponseCount ;
 537  29
                 int numBusy = --cs.busyCount ;
 538  29
                 if (numBusy < 0) {
 539  0
                     if (debug()) {
 540  0
                         dprint( ".release: " + conn + " numBusy=" +
 541  
                                 numBusy + " is < 0: error" ) ;
 542  
                     }
 543  
                     
 544  0
                     cs.busyCount = 0 ;
 545  
                     return ;
 546  
                 }
 547  
                 
 548  29
                 if (debug()) {
 549  0
                     dprint( ".release: " + numResp + " responses expected" ) ;
 550  0
                     dprint( ".release: " + numBusy + " busy count" ) ;
 551  
                 }
 552  
                 
 553  29
                 if (numBusy == 0) {
 554  26
                     final CacheEntry<C> entry = cs.entry ;
 555  26
                     boolean wasOnBusy = entry.busyConnections.remove( conn ) ;
 556  26
                     if (!wasOnBusy)
 557  0
                         if (debug())
 558  0
                             dprint( ".release: " + conn
 559  
                                     + " was NOT on busy queue, "
 560  
                                     + "but should have been" ) ;
 561  
                     
 562  26
                     boolean connectionClosed = false ;
 563  26
                     if (numResp == 0) {
 564  19
                         connectionClosed = reclaimOrClose( cs, conn ) ;
 565  
                     }
 566  
                     
 567  26
                     decrementTotalBusy() ;
 568  
                     
 569  26
                     if (!connectionClosed) {
 570  24
                         if (debug()) {
 571  0
                             dprint( ".release: queuing idle connection "
 572  
                                     + conn ) ;
 573  
                         }
 574  
                         
 575  24
                         totalIdle++ ;
 576  24
                         entry.idleConnections.offer( conn ) ;
 577  24
                         cs.csv = ConnectionStateValue.IDLE ;
 578  
                     }
 579  
                 }
 580  
             }
 581  
         } finally {
 582  29
             if (debug()) {
 583  0
                 dprint( ".release " + conn
 584  
                         + " cs=" + cs
 585  
                         + " totalIdle=" + totalIdle
 586  
                         + " totalBusy=" + totalBusy ) ;
 587  
                 
 588  0
                 dprint( "<-release" + conn ) ;
 589  
             }
 590  
         }
 591  29
     }
 592  
     
 593  
     /** Decrement the number of expected responses.  When a connection is idle
 594  
      * and has no expected responses, it can be reclaimed.
 595  
      * @param conn  a connection
 596  
      */
 597  
     public synchronized void responseReceived( final C conn ) {
 598  8
         if (debug()) {
 599  0
             dprint( "->responseReceived: " + conn ) ;
 600  
         }
 601  
         
 602  
         try {
 603  8
             final ConnectionState<C> cs = connectionMap.get( conn ) ;
 604  8
             if (cs == null) {
 605  0
                 if (debug()) {
 606  0
                     dprint(
 607  
                             ".responseReceived: "
 608  
                             + "received response on closed connection "
 609  
                             + conn ) ;
 610  
                 }
 611  
                 
 612  
                 return ;
 613  
             }
 614  
             
 615  8
             final int waitCount = --cs.expectedResponseCount ;
 616  
             
 617  8
             if (debug())  {
 618  0
                 dprint( ".responseReceived: " + conn
 619  
                         + " waitCount=" + waitCount ) ;
 620  
             }
 621  
             
 622  8
             if (waitCount < 0) {
 623  0
                 if (debug())  {
 624  0
                     dprint( ".responseReceived: " + conn
 625  
                             + " incorrect call: error" ) ;
 626  
                 }
 627  0
                 cs.expectedResponseCount = 0 ;
 628  
                 return ;
 629  
             }
 630  
             
 631  8
             if ((waitCount == 0) && (cs.busyCount == 0)) {
 632  7
                 reclaimOrClose( cs, conn ) ;
 633  
             }
 634  
         } finally {
 635  8
             if (debug()) {
 636  0
                 dprint( "<-responseReceived: " + conn ) ;
 637  
             }
 638  
         }
 639  8
     }
 640  
     
 641  
     /** Close a connection, regardless of whether the connection is busy
 642  
      * or not.
 643  
      * @param conn  a connection
 644  
      */
 645  
     public synchronized void close( final C conn ) {
 646  35
         if (debug()) {
 647  0
             dprint( "->close: " + conn ) ;
 648  
         }
 649  
         
 650  
         try {
 651  35
             final ConnectionState<C> cs = connectionMap.remove( conn ) ;
 652  35
             if (cs == null) {
 653  0
                 if (debug()) {
 654  0
                     dprint( ".close: " + conn + " was already closed" ) ;
 655  
                 }
 656  
                 
 657  
                 return ;
 658  
             }
 659  
             
 660  35
             if (debug()) {
 661  0
                 dprint( ".close: " + conn
 662  
                         + "Connection state=" + cs ) ;
 663  
             }
 664  
             
 665  35
             final ConcurrentQueue.Handle rh = cs.reclaimableHandle ;
 666  35
             if (rh != null) {
 667  11
                 boolean result = rh.remove() ;
 668  11
                 if (debug()) {
 669  0
                     dprint( ".close: " + conn
 670  
                             + "reclaimableHandle .remove = " + result ) ;
 671  
                 }
 672  
             }
 673  
             
 674  35
             if (cs.entry.busyConnections.remove( conn )) {
 675  21
                 if (debug()) {
 676  0
                     dprint( ".close: " + conn
 677  
                             + " removed from busyConnections" ) ;
 678  
                 }
 679  
                 
 680  21
                 decrementTotalBusy() ;
 681  
             }
 682  
             
 683  35
             if (cs.entry.idleConnections.remove( conn )) {
 684  12
                 if (debug()) {
 685  0
                     dprint( ".close: " + conn
 686  
                             + " removed from idleConnections" ) ;
 687  
                 }
 688  
                 
 689  12
                 decrementTotalIdle() ;
 690  
             }
 691  
             
 692  
             try {
 693  35
                 conn.close() ;
 694  0
             } catch (IOException exc) {
 695  0
                 if (debug())
 696  0
                     dprint( ".close: " + conn + ": Caught IOException on close:"
 697  
                             + exc ) ;
 698  35
             }
 699  
         } finally {
 700  35
             if (debug()) {
 701  0
                 dprintStatistics() ;
 702  0
                 dprint( "<-close: " + conn ) ;
 703  
             }
 704  
         }
 705  35
     }
 706  
 }
 707  
 
 708  
 // End of file.