1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
|
21 | |
|
22 | |
|
23 | |
|
24 | |
|
25 | |
|
26 | |
|
27 | |
|
28 | |
|
29 | |
|
30 | |
|
31 | |
|
32 | |
|
33 | |
|
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
package com.sun.grizzly.connectioncache.impl.transport; |
40 | |
|
41 | |
import com.sun.grizzly.connectioncache.spi.concurrent.ConcurrentQueue; |
42 | |
import com.sun.grizzly.connectioncache.spi.transport.InboundConnectionCache; |
43 | |
|
44 | |
import java.io.Closeable; |
45 | |
import java.io.IOException ; |
46 | |
|
47 | |
import java.util.logging.Logger ; |
48 | |
|
49 | |
import java.util.Map ; |
50 | |
import java.util.HashMap ; |
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | 84 | public final class InboundConnectionCacheBlockingImpl<C extends Closeable> |
58 | |
extends ConnectionCacheBlockingBase<C> |
59 | |
implements InboundConnectionCache<C> { |
60 | |
|
61 | |
private final Map<C,ConnectionState<C>> connectionMap ; |
62 | |
|
63 | |
protected String thisClassName() { |
64 | 0 | return "InboundConnectionCacheBlockingImpl" ; |
65 | |
} |
66 | |
|
67 | |
private static final class ConnectionState<C extends Closeable> { |
68 | |
final C connection ; |
69 | |
|
70 | |
int busyCount ; |
71 | |
|
72 | |
int expectedResponseCount ; |
73 | |
|
74 | |
|
75 | |
|
76 | |
ConcurrentQueue.Handle<C> reclaimableHandle ; |
77 | |
|
78 | |
|
79 | |
|
80 | 38 | ConnectionState( final C conn ) { |
81 | 38 | this.connection = conn ; |
82 | |
|
83 | 38 | busyCount = 0 ; |
84 | 38 | expectedResponseCount = 0 ; |
85 | 38 | reclaimableHandle = null ; |
86 | 38 | } |
87 | |
} |
88 | |
|
89 | |
public InboundConnectionCacheBlockingImpl( final String cacheType, |
90 | |
final int highWaterMark, final int numberToReclaim, |
91 | |
Logger logger ) { |
92 | |
|
93 | 3 | super( cacheType, highWaterMark, numberToReclaim, logger ) ; |
94 | |
|
95 | 3 | this.connectionMap = new HashMap<C,ConnectionState<C>>() ; |
96 | |
|
97 | 3 | if (debug()) { |
98 | 0 | dprint(".constructor completed: " + getCacheType() ); |
99 | |
} |
100 | 3 | } |
101 | |
|
102 | |
|
103 | |
|
104 | |
public synchronized void requestReceived( final C conn ) { |
105 | 1029 | if (debug()) |
106 | 0 | dprint( "->requestReceived: connection " + conn ) ; |
107 | |
|
108 | |
try { |
109 | 1029 | ConnectionState<C> cs = getConnectionState( conn ) ; |
110 | |
|
111 | 1029 | final int totalConnections = totalBusy + totalIdle ; |
112 | 1029 | if (totalConnections > highWaterMark()) |
113 | 4 | reclaim() ; |
114 | |
|
115 | 1029 | ConcurrentQueue.Handle<C> reclaimHandle = cs.reclaimableHandle ; |
116 | 1029 | if (reclaimHandle != null) { |
117 | 990 | if (debug()) |
118 | 0 | dprint( ".requestReceived: " + conn |
119 | |
+ " removed from reclaimableQueue" ) ; |
120 | 990 | reclaimHandle.remove() ; |
121 | |
} |
122 | |
|
123 | 1029 | int count = cs.busyCount++ ; |
124 | 1029 | if (count == 0) { |
125 | 1028 | if (debug()) |
126 | 0 | dprint( ".requestReceived: " + conn |
127 | |
+ " transition from idle to busy" ) ; |
128 | |
|
129 | 1028 | totalIdle-- ; |
130 | 1028 | totalBusy++ ; |
131 | |
} |
132 | |
} finally { |
133 | 1029 | if (debug()) |
134 | 0 | dprint( "<-requestReceived: connection " + conn ) ; |
135 | |
} |
136 | 1029 | } |
137 | |
|
138 | |
public synchronized void requestProcessed( final C conn, |
139 | |
final int numResponsesExpected ) { |
140 | |
|
141 | 1009 | if (debug()) |
142 | 0 | dprint( "->requestProcessed: connection " + conn |
143 | |
+ " expecting " + numResponsesExpected + " responses" ) ; |
144 | |
|
145 | |
try { |
146 | 1009 | final ConnectionState<C> cs = connectionMap.get( conn ) ; |
147 | |
|
148 | 1009 | if (cs == null) { |
149 | 1 | if (debug()) |
150 | 0 | dprint( ".release: connection " + conn + " was closed" ) ; |
151 | |
|
152 | |
return ; |
153 | |
} else { |
154 | 1008 | cs.expectedResponseCount += numResponsesExpected ; |
155 | 1008 | int numResp = cs.expectedResponseCount ; |
156 | 1008 | int numBusy = --cs.busyCount ; |
157 | |
|
158 | 1008 | if (debug()) { |
159 | 0 | dprint( ".release: " + numResp + " responses expected" ) ; |
160 | 0 | dprint( ".release: " + numBusy + |
161 | |
" busy count for connection" ) ; |
162 | |
} |
163 | |
|
164 | 1008 | if (numBusy == 0) { |
165 | 1007 | totalBusy-- ; |
166 | 1007 | totalIdle++ ; |
167 | |
|
168 | 1007 | if (numResp == 0) { |
169 | 2 | if (debug()) |
170 | 0 | dprint( ".release: " |
171 | |
+ "queuing reclaimable connection " |
172 | |
+ conn ) ; |
173 | |
|
174 | 2 | if ((totalBusy+totalIdle) > highWaterMark()) { |
175 | 2 | close( conn ) ; |
176 | |
} else { |
177 | 0 | cs.reclaimableHandle = |
178 | |
reclaimableConnections.offer( conn ) ; |
179 | |
} |
180 | |
} |
181 | |
} |
182 | |
} |
183 | |
} finally { |
184 | 1009 | if (debug()) |
185 | 0 | dprint( "<-requestProcessed" ) ; |
186 | |
} |
187 | 1008 | } |
188 | |
|
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
public synchronized void responseSent( final C conn ) { |
194 | 1006 | if (debug()) |
195 | 0 | dprint( "->responseSent: " + conn ) ; |
196 | |
|
197 | |
try { |
198 | 1006 | final ConnectionState<C> cs = connectionMap.get( conn ) ; |
199 | 1006 | if (cs == null) { |
200 | 1 | if (debug()) |
201 | 0 | dprint( ".release: connection " + conn + " was closed" ) ; |
202 | |
|
203 | |
return ; |
204 | |
} |
205 | |
|
206 | 1005 | final int waitCount = --cs.expectedResponseCount ; |
207 | 1005 | if (waitCount == 0) { |
208 | 1005 | if (debug()) |
209 | 0 | dprint( ".responseSent: " + conn + " is now reclaimable" ) ; |
210 | |
|
211 | 1005 | if ((totalBusy+totalIdle) > highWaterMark()) { |
212 | 1 | if (debug()) { |
213 | 0 | dprint( ".responseSent: " + conn |
214 | |
+ " closing connection" ) ; |
215 | |
} |
216 | 1 | close( conn ) ; |
217 | |
} else { |
218 | 1004 | cs.reclaimableHandle = |
219 | |
reclaimableConnections.offer( conn ) ; |
220 | |
|
221 | 1004 | if (debug()) { |
222 | 0 | dprint( ".responseSent: " + conn |
223 | |
+ " is now reclaimable" ) ; |
224 | |
} |
225 | |
} |
226 | |
} else { |
227 | 0 | if (debug()) { |
228 | 0 | dprint( ".responseSent: " + conn + " waitCount=" |
229 | |
+ waitCount ) ; |
230 | |
} |
231 | |
} |
232 | |
} finally { |
233 | 1006 | if (debug()) { |
234 | 0 | dprint( "<-responseSent: " + conn ) ; |
235 | |
} |
236 | |
} |
237 | 1005 | } |
238 | |
|
239 | |
|
240 | |
|
241 | |
|
242 | |
|
243 | |
public synchronized void close( final C conn ) { |
244 | 40 | if (debug()) |
245 | 0 | dprint( "->close: " + conn ) ; |
246 | |
|
247 | |
try { |
248 | 40 | final ConnectionState<C> cs = connectionMap.remove( conn ) ; |
249 | 40 | if (cs != null) { |
250 | 38 | int count = cs.busyCount ; |
251 | 38 | if (debug()) |
252 | 0 | dprint( ".close: " + conn + " count = " + count ) ; |
253 | |
|
254 | 38 | if (count == 0) |
255 | 17 | totalIdle-- ; |
256 | |
else |
257 | 21 | totalBusy-- ; |
258 | |
|
259 | 38 | final ConcurrentQueue.Handle rh = cs.reclaimableHandle ; |
260 | 38 | if (rh != null) { |
261 | 14 | if (debug()) |
262 | 0 | dprint( ".close: " + conn + " connection was reclaimable" ) ; |
263 | |
|
264 | 14 | rh.remove() ; |
265 | |
} |
266 | |
|
267 | |
try { |
268 | 38 | conn.close() ; |
269 | 0 | } catch (IOException exc) { |
270 | 0 | if (debug()) |
271 | 0 | dprint( ".close: " + conn + " close threw " + exc ) ; |
272 | 38 | } |
273 | |
} |
274 | |
} finally { |
275 | 40 | if (debug()) |
276 | 0 | dprint( "<-close: " + conn ) ; |
277 | |
} |
278 | 40 | } |
279 | |
|
280 | |
|
281 | |
|
282 | |
private ConnectionState<C> getConnectionState( C conn ) { |
283 | |
|
284 | 1029 | if (debug()) |
285 | 0 | dprint( "->getConnectionState: " + conn ) ; |
286 | |
|
287 | |
try { |
288 | 1029 | ConnectionState<C> result = connectionMap.get( conn ) ; |
289 | 1029 | if (result == null) { |
290 | 38 | if (debug()) |
291 | 0 | dprint( ".getConnectionState: " + conn + |
292 | |
" creating new ConnectionState instance" ) ; |
293 | 38 | result = new ConnectionState<C>( conn ) ; |
294 | 38 | connectionMap.put( conn, result ) ; |
295 | 38 | totalIdle++ ; |
296 | |
} |
297 | |
|
298 | 1029 | return result ; |
299 | |
} finally { |
300 | 1029 | if (debug()) |
301 | 0 | dprint( "<-getConnectionState: " + conn ) ; |
302 | |
} |
303 | |
} |
304 | |
} |
305 | |
|
306 | |
|