Hello Jeanfrancois,
by "UDP problem" you mean some specific scenario, which doesn't work?
Because I wrote the simple UDPConnectorHandlerTest (similar to TCP one)
and it works fine.
WBR,
Alexey.
/*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the license at
*
https://glassfish.dev.java.net/public/CDDLv1.0.html or
* glassfish/bootstrap/legal/CDDLv1.0.txt.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at glassfish/bootstrap/legal/CDDLv1.0.txt.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* you own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* Copyright 2006 Sun Microsystems, Inc. All rights reserved.
*/
package com.sun.grizzly;
import com.sun.grizzly.filter.UDPReadFilter;
import com.sun.grizzly.utils.ControllerUtils;
import com.sun.grizzly.utils.EchoFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
/**
* Tests the default <code>UDPConnectorHandler</code>
*
* @author Jeanfrancois Arcand
*/
public class UDPConnectorHandlerTest extends TestCase {
public static final int PORT = 18888;
public static final int PACKETS_COUNT = 100;
/**
* A <code>CallbackHandler</code> handler invoked by the UDPSelectorHandler
* when a non blocking operation is ready to be processed.
*/
private CallbackHandler callbackHandler;
public void testSimplePacket() throws IOException {
final Controller controller = createController();
ControllerUtils.startController(controller);
final ConnectorHandler udpConnector =
controller.acquireConnectorHandler(Controller.Protocol.UDP);
try {
final byte[] testData = "Hello".getBytes();
final byte[] response = new byte[testData.length];
final ByteBuffer writeBB = ByteBuffer.wrap(testData);
final ByteBuffer readBB = ByteBuffer.wrap(response);
final CountDownLatch responseArrivedLatch = new CountDownLatch(1);
callbackHandler = new CallbackHandler<SelectionKey>(){
private int readTry;
public void onConnect(IOEvent<SelectionKey> ioEvent) {
SelectionKey key = ioEvent.attachment();
udpConnector.finishConnect(key);
controller.registerKey(key,
SelectionKey.OP_WRITE|SelectionKey.OP_READ);
}
public void onRead(IOEvent<SelectionKey> ioEvent) {
SelectionKey key = ioEvent.attachment();
DatagramChannel datagramChannel = (DatagramChannel)key.channel();
try {
int nRead = datagramChannel.read(readBB);
if (nRead == 0 && readTry++ < 2){
key.attach(callbackHandler);
controller.getSelectorHandler(Controller.Protocol.UDP)
.register(key,SelectionKey.OP_READ);
} else {
responseArrivedLatch.countDown();
}
} catch (IOException ex){
ex.printStackTrace();
controller.cancelKey(key);
}
}
public void onWrite(IOEvent<SelectionKey> ioEvent) {
SelectionKey key = ioEvent.attachment();
DatagramChannel datagramChannel = (DatagramChannel)key.channel();
try{
while(writeBB.hasRemaining()){
int nWrite = datagramChannel.write(writeBB);
if (nWrite == 0){
key.attach(callbackHandler);
controller.getSelectorHandler(Controller.Protocol.UDP)
.register(key,SelectionKey.OP_WRITE);
return;
}
}
udpConnector.read(readBB,false);
} catch (IOException ex){
ex.printStackTrace();
controller.cancelKey(key);
}
}
};
try{
udpConnector.connect(new InetSocketAddress("localhost",PORT)
,callbackHandler);
} catch (Throwable t){
t.printStackTrace();
}
long nWrite = udpConnector.write(writeBB,false);
long nRead = -1;
// All bytes written
if (nWrite == testData.length){
nRead = udpConnector.read(readBB,false);
}
if (nRead != nWrite){
try {
responseArrivedLatch.await(5,TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
assertTrue(Arrays.equals(testData, readBB.array()));
} finally {
try{
udpConnector.close();
controller.releaseConnectorHandler(udpConnector);
controller.stop();
} catch (Throwable t){
t.printStackTrace();
}
}
}
private Controller createController() {
final ProtocolFilter readFilter = new UDPReadFilter();
final ProtocolFilter echoFilter = new EchoFilter();
UDPSelectorHandler selectorHandler = new UDPSelectorHandler();
selectorHandler.setPort(PORT);
final Controller controller = new Controller();
controller.setSelectorHandler(selectorHandler);
controller.setProtocolChainInstanceHandler(
new DefaultProtocolChainInstanceHandler(){
public ProtocolChain poll() {
ProtocolChain protocolChain = protocolChains.poll();
if (protocolChain == null){
protocolChain = new DefaultProtocolChain();
protocolChain.addFilter(readFilter);
protocolChain.addFilter(echoFilter);
}
return protocolChain;
}
});
return controller;
}
}
attached mail follows:
Author: jfarcand
Date: 2007-05-16 21:47:36+0000
New Revision: 181
Added:
trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
Modified:
trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
Log:
Add UDP client side support (still not fully working...need a UDP expert :-)). Also tigth the SelectorHandler with the ConnectorHandler so client instance can be retrieved using the Protocol.
Hopefully I will figure out what's wrong with the UDP client (receive doesn't work).
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java 2007-05-16 21:47:36+0000
@@ -39,7 +39,15 @@
* @author Jeanfrancois Arcand
*/
public interface ConnectorHandler<E extends SelectorHandler> extends Handler{
-
+
+
+ /**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol();
+
+
/**
* Connect to hostname:port. When an aysnchronous event happens (e.g
* OP_READ or OP_WRITE), the <code>Controller</code> will invoke
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java 2007-05-16 21:47:36+0000
@@ -113,6 +113,9 @@
*/
public class Controller implements Runnable, Lifecycle, Copyable {
+ public enum Protocol { UDP, TCP , TLS, CUSTOM }
+
+
/**
* A cached list of Context. Context are by default stateless.
*/
@@ -138,13 +141,6 @@
* of the TCPSelectorHandler will be created.
*/
protected ConcurrentLinkedQueue<SelectorHandler> selectorHandlers;
-
-
- /**
- * The ConnectorInstanceHandler used to return a new or pooled
- * ConnectorHandler
- */
- private ConnectorInstanceHandler connectorInstanceHandler;
/**
@@ -408,10 +404,15 @@
/**
- * Return the first <code>SelectorHandler</code>
+ * Return the <code>SelectorHandler</code> associated with the protocol.
*/
- public SelectorHandler getSelectorHandler(){
- return selectorHandlers.peek();
+ public SelectorHandler getSelectorHandler(Protocol protocol){
+ for (SelectorHandler selectorHandler: selectorHandlers){
+ if (selectorHandler.protocol() == protocol){
+ return selectorHandler;
+ }
+ }
+ return null;
}
@@ -492,11 +493,7 @@
if (selectionKeyHandler == null){
selectionKeyHandler = new DefaultSelectionKeyHandler();
}
-
- if (connectorInstanceHandler == null){
- connectorInstanceHandler = new TCPConnectorInstanceHandler();
- }
-
+
if (readThreadsCount > 1) {
initReadThreads();
selectorHandlers.clear();
@@ -599,22 +596,28 @@
* Return an instance of the default <code>ConnectorHandler</code>,
* which is the <code>TCPConnectorHandler</code>
*/
- public ConnectorHandler acquireConnectorHandler(){
- if (state != state.STARTED){
- throw new IllegalStateException("Controller not yet started");
+ public ConnectorHandler acquireConnectorHandler(Protocol protocol){
+ ConnectorHandler ch = null;
+ for (SelectorHandler selectorHandler: selectorHandlers){
+ if (selectorHandler.protocol() == protocol){
+ ch = selectorHandler.acquireConnectorHandler();
+ ch.setController(this);
+ break;
+ }
}
-
- ConnectorHandler connectorHandler = connectorInstanceHandler.acquire();
- connectorHandler.setController(this);
- return connectorHandler;
+ return ch;
}
/**
* Release a ConnectorHandler.
*/
- public void releaseConnectorHandler(ConnectorHandler connectorHandler){
- connectorInstanceHandler.release(connectorHandler);
+ public void releaseConnectorHandler(ConnectorHandler connectorHandler){
+ for (SelectorHandler selectorHandler: selectorHandlers){
+ if (selectorHandler.protocol() == connectorHandler.protocol()){
+ selectorHandler.releaseConnectorHandler(connectorHandler);
+ }
+ }
}
@@ -646,12 +649,4 @@
logger.log(Level.SEVERE,"Controller.start()",ex);
}
}
-
- public ConnectorInstanceHandler getConnectorInstanceHandler() {
- return connectorInstanceHandler;
- }
-
- public void setConnectorInstanceHandler(ConnectorInstanceHandler connectorInstanceHandler) {
- this.connectorInstanceHandler = connectorInstanceHandler;
- }
}
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java 2007-05-16 21:47:36+0000
@@ -53,7 +53,7 @@
channels.add(channel);
}
- getSelectorHandler().getSelector().wakeup();
+ getSelectorHandler(Protocol.TCP).getSelector().wakeup();
}
/**
@@ -62,7 +62,8 @@
private void registerNewChannels() throws IOException {
synchronized(channels) {
int size = channels.size();
- TCPSelectorHandler selectorHandler = (TCPSelectorHandler) getSelectorHandler();
+ TCPSelectorHandler selectorHandler =
+ (TCPSelectorHandler) getSelectorHandler(Protocol.TCP);
Selector auxSelector = selectorHandler.getSelector();
for (int i = 0; i < size; i++) {
SocketChannel channel = channels.get(i);
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java 2007-05-16 21:47:36+0000
@@ -39,17 +39,26 @@
public interface SelectorHandler extends Handler, Copyable {
/**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol();
+
+
+ /**
* Gets the underlying selector.
* @return underlying <code>Selector</code>
*/
public Selector getSelector();
+
/**
* Sets the underlying <code>Selector</code>
* @param selector underlying <code>Selector</code>
*/
public void setSelector(Selector selector);
+
/**
* The SelectionKey that has been registered.
* @return <code>Set</code> of <code>SelectionKey</code>
@@ -149,4 +158,16 @@
*/
public boolean onConnectInterest(SelectionKey key,Context controllerCtx)
throws IOException;
+
+
+ /**
+ * Return an instance of the <code>ConnectorHandler</code>
+ */
+ public ConnectorHandler acquireConnectorHandler();
+
+
+ /**
+ * Release a ConnectorHandler.
+ */
+ public void releaseConnectorHandler(ConnectorHandler connectorHandler);
}
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java 2007-05-16 21:47:36+0000
@@ -179,6 +179,15 @@
/**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol(){
+ return Controller.Protocol.TLS;
+ }
+
+
+ /**
* Set the SSLContext required to support SSL over NIO.
* @param sslContext <code>SSLContext</code>
*/
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java 2007-05-16 21:47:36+0000
@@ -146,7 +146,7 @@
}
connect(remoteAddress,localAddress,callbackHandler,
- (TCPSelectorHandler)controller.getSelectorHandler());
+ (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
}
@@ -281,7 +281,7 @@
}
connect(remoteAddress,localAddress,callbackHandler,
- (TCPSelectorHandler)controller.getSelectorHandler());
+ (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
}
@@ -402,6 +402,15 @@
/**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol(){
+ return Controller.Protocol.TCP;
+ }
+
+
+ /**
* Is the underlying SocketChannel connected.
*/
public boolean isConnected(){
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java 2007-05-16 21:47:36+0000
@@ -56,7 +56,15 @@
* @author Jeanfrancois Arcand
*/
public class TCPSelectorHandler implements SelectorHandler {
-
+
+
+ /**
+ * The ConnectorInstanceHandler used to return a new or pooled
+ * ConnectorHandler
+ */
+ protected ConnectorInstanceHandler connectorInstanceHandler;
+
+
/**
* The list of SelectionKey to register next time the Selector.select is
* invoked.
@@ -181,6 +189,7 @@
copyHandler.socketTimeout = socketTimeout;
copyHandler.logger = logger;
copyHandler.reuseAddress = reuseAddress;
+ copyHandler.connectorInstanceHandler = connectorInstanceHandler;
}
@@ -215,6 +224,9 @@
public void preSelect(Context ctx) throws IOException {
if (selector == null){
try{
+
+ connectorInstanceHandler = new TCPConnectorInstanceHandler();
+
// Create the socket listener
selector = Selector.open();
@@ -358,7 +370,8 @@
if (opConnectToRegister== null){
opConnectToRegister = new ConcurrentHashMap
<SocketAddress[],CallbackHandler>();
- }
+ }
+
opConnectToRegister.put(new SocketAddress[]{remoteAddress,localAddress},
callBackHandler);
selector.wakeup();
@@ -491,7 +504,37 @@
}
return false;
}
-
+
+
+ /**
+ * Return an instance of the default <code>ConnectorHandler</code>,
+ * which is the <code>TCPConnectorHandler</code>
+ */
+ public ConnectorHandler acquireConnectorHandler(){
+ if (selector == null && !selector.isOpen()){
+ throw new IllegalStateException("SelectorHandler not yet started");
+ }
+
+ ConnectorHandler connectorHandler = connectorInstanceHandler.acquire();
+ return connectorHandler;
+ }
+
+
+ /**
+ * Release a ConnectorHandler.
+ */
+ public void releaseConnectorHandler(ConnectorHandler connectorHandler){
+ connectorInstanceHandler.release(connectorHandler);
+ }
+
+
+ /**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol(){
+ return Controller.Protocol.TCP;
+ }
// ------------------------------------------------------ Utils ----------//
Added: trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java?view=auto&rev=181
==============================================================================
--- (empty file)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java 2007-05-16 21:47:36+0000
@@ -0,0 +1,449 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ *
https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly;
+
+import com.sun.grizzly.util.ByteBufferInputStream;
+import com.sun.grizzly.util.OutputWriter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.AlreadyConnectedException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.NotYetConnectedException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.DatagramChannel;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Client side interface used to implement non blocking client operation.
+ * Implementation of this class must make sure the following methods are
+ * invoked in that order:
+ *
+ * (1) connect()
+ * (2) read() or write().
+ *
+ * @author Jeanfrancois Arcand
+ */
+public class UDPConnectorHandler implements ConnectorHandler<UDPSelectorHandler>{
+
+ /**
+ * The underlying UDPSelectorHandler used to mange SelectionKeys.
+ */
+ private UDPSelectorHandler selectorHandler;
+
+
+ /**
+ * A <code>CallbackHandler</code> handler invoked by the UDPSelectorHandler
+ * when a non blocking operation is ready to be processed.
+ */
+ private CallbackHandler callbackHandler;
+
+
+ /**
+ * The connection's DatagramChannel.
+ */
+ private DatagramChannel datagramChannel;
+
+
+ /**
+ * Is the connection established.
+ */
+ private volatile boolean isConnected;
+
+
+ /**
+ * The internal Controller used (in case not specified).
+ */
+ private Controller controller;
+
+
+ /**
+ * IsConnected Latch related
+ */
+ private CountDownLatch isConnectedLatch;
+
+
+ /**
+ * Are we creating a controller every run.
+ */
+ private boolean isStandalone = false;
+
+
+ /**
+ * A blocking <code>InputStream</code> that use a pool of Selector
+ * to execute a blocking read operation.
+ */
+ private ByteBufferInputStream inputStream;
+
+
+ /**
+ * Connect to hostname:port. When an aysnchronous event happens (e.g
+ * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
+ * the CallBackHandler.
+ * @param remoteAddress remote address to connect
+ * @param callbackHandler the handler invoked by the Controller when
+ * an non blocking operation is ready to be handled.
+ */
+ public void connect(SocketAddress remoteAddress,
+ CallbackHandler callbackHandler) throws IOException {
+
+ connect(remoteAddress,null,callbackHandler);
+ }
+
+
+ /**
+ * Connect to hostname:port. When an aysnchronous event happens (e.g
+ * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
+ * the CallBackHandler.
+ * @param remoteAddress remote address to connect
+ * @param localAddress local address to bind
+ * @param callbackHandler the handler invoked by the Controller when
+ * an non blocking operation is ready to be handled.
+ */
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress,
+ CallbackHandler callbackHandler) throws IOException {
+
+ if (controller == null){
+ throw new IllegalStateException("Controller cannot be null");
+ }
+
+ connect(remoteAddress,localAddress,callbackHandler,
+ (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
+ }
+
+
+ /**
+ * Connect to hostname:port. When an aysnchronous event happens (e.g
+ * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
+ * the CallBackHandler.
+ * @param remoteAddress remote address to connect
+ * @param callbackHandler the handler invoked by the Controller when
+ * an non blocking operation is ready to be handled.
+ * @param selectorHandler an instance of SelectorHandler.
+ */
+ public void connect(SocketAddress remoteAddress,
+ CallbackHandler callbackHandler,
+ UDPSelectorHandler selectorHandler) throws IOException {
+
+ connect(remoteAddress,null,callbackHandler,selectorHandler);
+ }
+
+ /**
+ * Connect to hostname:port. When an aysnchronous event happens (e.g
+ * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
+ * the CallBackHandler.
+ * @param remoteAddress remote address to connect
+ * @param localAddress local address to bin
+ * @param callbackHandler the handler invoked by the Controller when
+ * an non blocking operation is ready to be handled.
+ * @param selectorHandler an instance of SelectorHandler.
+ */
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress,
+ CallbackHandler callbackHandler,
+ UDPSelectorHandler selectorHandler) throws IOException {
+
+ if (isConnected){
+ throw new AlreadyConnectedException();
+ }
+
+ if (controller == null){
+ throw new IllegalStateException("Controller cannot be null");
+ }
+
+ if (selectorHandler == null){
+ throw new IllegalStateException("Controller cannot be null");
+ }
+
+ this.selectorHandler = selectorHandler;
+ this.callbackHandler = callbackHandler;
+
+ // Wait for the onConnect to be invoked.
+ isConnectedLatch = new CountDownLatch(1);
+
+ selectorHandler.connect(remoteAddress,localAddress,callbackHandler);
+ inputStream = new ByteBufferInputStream();
+
+ try {
+ isConnectedLatch.await();
+ } catch (InterruptedException ex) {
+ ; // LOG ME
+ }
+ }
+
+
+ /**
+ * Connect to hostname:port. Internally an instance of Controller and
+ * its default SelectorHandler will be created everytime this method is
+ * called. This method should be used only and only if no external
+ * Controller has been initialized.
+ * @param remoteAddress remote address to connect
+ */
+ public void connect(SocketAddress remoteAddress)
+ throws IOException {
+ connect(remoteAddress,(SocketAddress)null);
+ }
+
+
+ /**
+ * Connect to hostname:port. Internally an instance of Controller and
+ * its default SelectorHandler will be created everytime this method is
+ * called. This method should be used only and only if no external
+ * Controller has been initialized.
+ * @param remoteAddress remote address to connect
+ * @param localAddress local address to bin
+ */
+ public void connect(SocketAddress remoteAddress, SocketAddress localAddress)
+ throws IOException {
+
+ if (isConnected){
+ throw new AlreadyConnectedException();
+ }
+
+ if (controller == null){
+ isStandalone = true;
+ controller = new Controller();
+ controller.setSelectorHandler(new UDPSelectorHandler(true));
+ DefaultPipeline pipeline = new DefaultPipeline();
+ pipeline.setMaxThreads(1);
+ pipeline.initPipeline();
+ pipeline.startPipeline();
+ controller.setPipeline(pipeline);
+
+ callbackHandler = new CallbackHandler<SelectionKey>(){
+ public void onConnect(IOEvent<SelectionKey> ioEvent) {
+ SelectionKey key = ioEvent.attachment();
+ finishConnect(key);
+ }
+ public void onRead(IOEvent<SelectionKey> ioEvent) {
+ }
+ public void onWrite(IOEvent<SelectionKey> ioEvent) {
+ }
+ };
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ try{
+ pipeline.execute(new Context(){
+ public Object call() throws Exception {
+ latch.countDown();
+ controller.start();
+ return null;
+ }
+ });
+ } catch (PipelineFullException ex){
+ throw new IOException(ex.getMessage());
+ }
+
+ try {
+ latch.await();
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ }
+ }
+
+ connect(remoteAddress,localAddress,callbackHandler,
+ (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
+ }
+
+
+ /**
+ * Register the <code>CallbackHandler</code> so when an asynchonous
+ * event occurs, the <code>CallbackHandler</code> will be invoked.
+ * @param callbackHandler the handler invoked by the Controller when
+ * an non blocking operation is ready to be handled.
+ */
+ public void register(CallbackHandler callbackHandler){
+ this.callbackHandler = callbackHandler;
+ }
+
+
+ /**
+ * Read bytes. If blocking is set to <tt>true</tt>, a pool of temporary
+ * <code>Selector</code> will be used to read bytes.
+ * @param byteBuffer The byteBuffer to store bytes.
+ * @param blocking <tt>true</tt> if a a pool of temporary Selector
+ * is required to handle a blocking read.
+ */
+ public long read(ByteBuffer byteBuffer, boolean blocking) throws IOException {
+ if (!isConnected){
+ throw new NotYetConnectedException();
+ }
+
+ SelectionKey key = datagramChannel.keyFor(selectorHandler.getSelector());
+ if (blocking){
+ inputStream.setSelectionKey(key);
+ inputStream.setChannelType(
+ ByteBufferInputStream.ChannelType.DatagramChannel);
+ int nRead = inputStream.read(byteBuffer);
+ return nRead;
+ } else {
+ if (callbackHandler == null){
+ throw new IllegalStateException
+ ("Non blocking read needs a CallbackHandler");
+ }
+ int nRead = datagramChannel.read(byteBuffer);
+
+ if (nRead == 0){
+ key.attach(callbackHandler);
+ selectorHandler.register(key,
+ SelectionKey.OP_READ|SelectionKey.OP_WRITE);
+ }
+ return nRead;
+ }
+ }
+
+
+ /**
+ * Writes bytes. If blocking is set to <tt>true</tt>, a pool of temporary
+ * <code>Selector</code> will be used to writes bytes.
+ * @param byteBuffer The byteBuffer to write.
+ * @param blocking <tt>true</tt> if a a pool of temporary Selector
+ * is required to handle a blocking write.
+ */
+ public long write(ByteBuffer byteBuffer, boolean blocking) throws IOException {
+ if (!isConnected){
+ throw new NotYetConnectedException();
+ }
+
+ SelectionKey key = datagramChannel.keyFor(selectorHandler.getSelector());
+ if (blocking){
+ throw new IllegalStateException("Blocking mode not supported");
+ } else {
+ if (callbackHandler == null){
+ throw new IllegalStateException
+ ("Non blocking write needs a CallbackHandler");
+ }
+ int nWrite = datagramChannel.write(byteBuffer);
+
+ if (nWrite == 0){
+ key.attach(callbackHandler);
+ selectorHandler.register(key,
+ SelectionKey.OP_READ|SelectionKey.OP_WRITE);
+ }
+ return nWrite;
+ }
+ }
+
+
+ /**
+ * Receive bytes.
+ * @param byteBuffer The byteBuffer to store bytes.
+ */
+ public long send(ByteBuffer byteBuffer, SocketAddress socketAddress)
+ throws IOException {
+ if (!isConnected){
+ throw new NotYetConnectedException();
+ }
+
+ if (callbackHandler == null){
+ throw new IllegalStateException
+ ("Non blocking read needs a CallbackHandler");
+ }
+
+ return datagramChannel.send(byteBuffer,socketAddress);
+ }
+
+
+ /**
+ * Receive bytes.
+ * @param byteBuffer The byteBuffer to store bytes.
+ */
+ public SocketAddress receive(ByteBuffer byteBuffer) throws IOException {
+ if (!isConnected){
+ throw new NotYetConnectedException();
+ }
+
+ SelectionKey key = datagramChannel.keyFor(selectorHandler.getSelector());
+
+ if (callbackHandler == null){
+ throw new IllegalStateException
+ ("Non blocking read needs a CallbackHandler");
+ }
+
+ SocketAddress socketAddress = datagramChannel.receive(byteBuffer);
+ return socketAddress;
+ }
+
+
+ /**
+ * Close the underlying connection.
+ */
+ public void close() throws IOException{
+ if (datagramChannel != null){
+ if (selectorHandler != null){
+ SelectionKey key =
+ datagramChannel.keyFor(selectorHandler.getSelector());
+
+ if (key == null) return;
+
+ key.cancel();
+ key.attach(null);
+ }
+ datagramChannel.close();
+ }
+
+ if (controller != null && isStandalone){
+ controller.stop();
+ controller = null;
+ }
+ isStandalone = false;
+ }
+
+
+ /**
+ * Finish handling the OP_CONNECT interest ops.
+ */
+ public void finishConnect(SelectionKey key){
+ datagramChannel = (DatagramChannel)key.channel();
+ isConnected = datagramChannel.isConnected();
+ isConnectedLatch.countDown();
+ }
+
+
+ /**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol(){
+ return Controller.Protocol.UDP;
+ }
+
+
+ /**
+ * Is the underlying DatagramChannel connected.
+ */
+ public boolean isConnected(){
+ return isConnected;
+ }
+
+
+ public Controller getController() {
+ return controller;
+ }
+
+
+ public void setController(Controller controller) {
+ this.controller = controller;
+ }
+
+}
Added: trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java?view=auto&rev=181
==============================================================================
--- (empty file)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java 2007-05-16 21:47:36+0000
@@ -0,0 +1,66 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ *
https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Default <code>ConnectorInstanceHandler</code> which use a
+ * <code>List</code> to pool <code>ConnectorHandler</code>
+ *
+ * @author Jeanfrancois
+ */
+public class UDPConnectorInstanceHandler implements
+ ConnectorInstanceHandler<UDPConnectorHandler> {
+
+ /**
+ * Simple queue used to pool <code>UDPConnectorHandler</code>
+ */
+ private ConcurrentLinkedQueue<UDPConnectorHandler> pool;
+
+
+ public UDPConnectorInstanceHandler(){
+ pool = new ConcurrentLinkedQueue<UDPConnectorHandler>();
+ }
+
+ /**
+ * Acquire a <code>ConnectorHandler</code>
+ */
+ public UDPConnectorHandler acquire(){
+ UDPConnectorHandler UDPConnectorHandler = pool.poll();
+ if (UDPConnectorHandler == null){
+ UDPConnectorHandler = new UDPConnectorHandler();
+ }
+ return UDPConnectorHandler;
+ }
+
+
+ /**
+ * Release a <code>ConnectorHandler</code>
+ */
+ public void release(UDPConnectorHandler connectorHandler){
+ pool.offer(connectorHandler);
+ }
+
+}
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java 2007-05-16 21:47:36+0000
@@ -28,10 +28,15 @@
import java.net.BindException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
@@ -66,6 +71,11 @@
public UDPSelectorHandler() {
}
+
+
+ public UDPSelectorHandler(boolean isClient) {
+ super(isClient);
+ }
@Override
@@ -84,6 +94,7 @@
public void preSelect(Context ctx) throws IOException {
if (selector == null){
try{
+ connectorInstanceHandler = new UDPConnectorInstanceHandler();
datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
selector = Selector.open();
@@ -103,7 +114,76 @@
datagramSocket.setSoTimeout(serverTimeout);
} else {
- super.preSelect(ctx);
+ if (opReadToRegister == null){
+ opReadToRegister = new ConcurrentLinkedQueue<SelectionKey>();
+ }
+
+ SelectionKey key;
+ int size = opReadToRegister.size();
+ long currentTime = 0L;
+ if (size > 0){
+ currentTime = System.currentTimeMillis();
+ }
+
+ for (int i=0; i < size; i++) {
+ key = opReadToRegister.poll();
+
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+
+ if (key.attachment() == null)
+ key.attach(currentTime);
+ }
+
+ if (opWriteToRegister== null){
+ opWriteToRegister = new ConcurrentLinkedQueue<SelectionKey>();
+ }
+
+ size = opWriteToRegister.size();
+ currentTime = 0L;
+ if (size > 0){
+ currentTime = System.currentTimeMillis();
+ }
+
+ for (int i=0; i < size; i++) {
+ key = opWriteToRegister.poll();
+
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+
+ if (key.attachment() == null)
+ key.attach(currentTime);
+ }
+
+ if (opConnectToRegister== null){
+ opConnectToRegister = new ConcurrentHashMap
+ <SocketAddress[],CallbackHandler>();
+ }
+
+ size = opConnectToRegister.size();
+ currentTime = 0L;
+ if (size > 0){
+ currentTime = System.currentTimeMillis();
+ } else {
+ return;
+ }
+
+ CallbackHandler<SocketChannel> callbackHandler;
+ Iterator<SocketAddress[]> iterator =
+ opConnectToRegister.keySet().iterator();
+ SocketAddress[] remoteLocal;
+ while(iterator.hasNext()){
+ remoteLocal = iterator.next();
+ final DatagramChannel datagramChannel = DatagramChannel.open();
+ datagramChannel.socket().setReuseAddress(true);
+ if (remoteLocal[1] != null){
+ datagramChannel.socket().bind(remoteLocal[1]);
+ }
+ datagramChannel.configureBlocking(false);
+ datagramChannel.connect(remoteLocal[0]);
+ key = datagramChannel.register(selector,
+ SelectionKey.OP_READ|SelectionKey.OP_WRITE,
+ opConnectToRegister.remove(remoteLocal));
+ onConnectInterest(key, ctx);
+ }
}
}
@@ -153,28 +233,88 @@
/**
* Handle OP_READ.
*/
- @Override
- public boolean onReadInterest(SelectionKey key,Context ctx)
+ public boolean onReadInterest(final SelectionKey key,Context ctx)
throws IOException{
-
// disable OP_READ on key before doing anything else
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
- return true;
+ if (key.attachment() instanceof CallbackHandler){
+ ((CallbackHandler)key.attachment())
+ .onRead(new IOEvent<SelectionKey>(){
+
+ public SelectionKey attach(SelectionKey sc){
+ return key;
+ }
+
+ public SelectionKey attachment(){
+ return key;
+ }
+ });
+ return false;
+ } else {
+ return true;
+ }
}
/**
* Handle OP_WRITE.
+ *
*/
- @Override
- public boolean onWriteInterest(SelectionKey key, Context ctx)
+ public boolean onWriteInterest(final SelectionKey key, Context ctx)
throws IOException{
-
+ // disable OP_WRITE on key before doing anything else
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- return true;
+ if (key.attachment() instanceof CallbackHandler){
+ ((CallbackHandler)key.attachment())
+ .onWrite(new IOEvent<SelectionKey>(){
+
+ public SelectionKey attach(SelectionKey sc){
+ return key;
+ }
+
+ public SelectionKey attachment(){
+ return key;
+ }
+ });
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+
+ /**
+ * Handle OP_CONNECT.
+ */
+ public boolean onConnectInterest(final SelectionKey key, Context ctx)
+ throws IOException{
+ if (key.attachment() instanceof CallbackHandler){
+ ((CallbackHandler)key.attachment())
+ .onConnect(new IOEvent<SelectionKey>(){
+
+ public SelectionKey attach(SelectionKey sc){
+ return key;
+ }
+
+ public SelectionKey attachment(){
+ return key;
+ }
+ });
+
+ }
+ return false;
}
+ /**
+ * A token decribing the protocol supported by an implementation of this
+ * interface
+ */
+ public Controller.Protocol protocol(){
+ return Controller.Protocol.UDP;
+ }
+
+
public int getSsBackLog() {
throw new IllegalStateException(NOT_SUPPORTED);
}
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java 2007-05-16 21:47:36+0000
@@ -26,8 +26,10 @@
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
/**
@@ -39,6 +41,12 @@
*/
public class ByteBufferInputStream extends InputStream {
+ public enum ChannelType { SocketChannel, DatagramChannel }
+
+
+ private ChannelType defaultChannelType = ChannelType.SocketChannel;
+
+
private static int defaultReadTimeout = 15000;
/**
@@ -238,9 +246,9 @@
SelectionKey tmpKey = null;
try{
- SocketChannel socketChannel = (SocketChannel)key.channel();
+ ReadableByteChannel readableChannel = (ReadableByteChannel)key.channel();
while (count > 0){
- count = socketChannel.read(byteBuffer);
+ count = readableChannel.read(byteBuffer);
if ( count > -1 )
byteRead += count;
else
@@ -254,8 +262,15 @@
return 0;
}
count = 1;
- tmpKey = socketChannel
- .register(readSelector,SelectionKey.OP_READ);
+
+ tmpKey = null;
+ if (defaultChannelType == ChannelType.SocketChannel){
+ tmpKey = ((SocketChannel)readableChannel)
+ .register(readSelector,SelectionKey.OP_READ);
+ } else {
+ tmpKey = ((DatagramChannel)readableChannel)
+ .register(readSelector,SelectionKey.OP_READ);
+ }
tmpKey.interestOps(tmpKey.interestOps() | SelectionKey.OP_READ);
int code = readSelector.select(readTimeout);
tmpKey.interestOps(
@@ -266,7 +281,7 @@
}
while (count > 0){
- count = socketChannel.read(byteBuffer);
+ count = readableChannel.read(byteBuffer);
if ( count > -1 )
byteRead += count;
else
@@ -318,5 +333,15 @@
public static void setDefaultReadTimeout(int aDefaultReadTimeout) {
defaultReadTimeout = aDefaultReadTimeout;
}
+
+
+ public ChannelType getChannelType() {
+ return defaultChannelType;
+ }
+
+
+ public void setChannelType(ChannelType channelType) {
+ this.defaultChannelType = channelType;
+ }
}
Modified: trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java&p2=trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java (original)
+++ trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java 2007-05-16 21:47:36+0000
@@ -53,7 +53,8 @@
public void testSimplePacket() throws IOException {
final Controller controller = createController();
ControllerUtils.startController(controller);
- final ConnectorHandler tcpConnector = controller.acquireConnectorHandler();
+ final ConnectorHandler tcpConnector =
+ controller.acquireConnectorHandler(Controller.Protocol.TCP);
try {
final byte[] testData = "Hello".getBytes();
@@ -82,7 +83,7 @@
int nRead = socketChannel.read(readBB);
if (nRead == 0 && readTry++ < 2){
key.attach(callbackHandler);
- controller.getSelectorHandler()
+ controller.getSelectorHandler(Controller.Protocol.TCP)
.register(key,SelectionKey.OP_READ);
} else {
responseArrivedLatch.countDown();
@@ -102,7 +103,7 @@
if (nWrite == 0){
key.attach(callbackHandler);
- controller.getSelectorHandler()
+ controller.getSelectorHandler(Controller.Protocol.TCP)
.register(key,SelectionKey.OP_WRITE);
return;
}
@@ -144,6 +145,7 @@
} finally {
try{
tcpConnector.close();
+ controller.releaseConnectorHandler(tcpConnector);
controller.stop();
} catch (Throwable t){
t.printStackTrace();
Added: trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java?view=auto&rev=181
==============================================================================
--- (empty file)
+++ trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java 2007-05-16 21:47:36+0000
@@ -0,0 +1,75 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ *
https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.utils;
+
+import com.sun.grizzly.UDPConnectorHandler;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+/**
+ * UDP client that exercise the non blocking ConnectorHandler.
+ *
+ * @author Jeanfrancois Arcand
+ */
+public class NonBlockingUDPIOClient {
+ private String host;
+ private int port;
+
+ private UDPConnectorHandler udpConnectorHandler;
+
+ public NonBlockingUDPIOClient(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public void connect() throws IOException {
+ udpConnectorHandler = new UDPConnectorHandler();
+ udpConnectorHandler.connect(new InetSocketAddress(host,port));
+ }
+
+ public void send(byte[] msg) throws IOException {
+ udpConnectorHandler.send(ByteBuffer.wrap(msg),
+ new InetSocketAddress(host,port));
+ }
+
+ public int receive(byte[] buf) throws IOException {
+ return receive(buf, buf.length);
+ }
+
+ public int receive(byte[] buf, int length) throws IOException {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(buf,0,length);
+ SocketAddress sa = udpConnectorHandler.receive(byteBuffer);
+ System.out.println("sa: " + sa);
+ System.out.println("nRead: " + byteBuffer.position());
+ return byteBuffer.position();
+ }
+
+ public void close() throws IOException {
+ if (udpConnectorHandler != null) {
+ udpConnectorHandler.close();
+ }
+ }
+}
Modified: trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java&p2=trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java&r1=180&r2=181
==============================================================================
--- trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java (original)
+++ trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java 2007-05-16 21:47:36+0000
@@ -45,6 +45,8 @@
public void connect() throws IOException {
socket = new DatagramSocket();
socket.connect(InetAddress.getByName(host), port);
+
+
}
public void send(byte[] msg) throws IOException {
Modified: trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
Url:
https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java?view=diff&rev=181&p1=trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java&p2=trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java&r1=180&r2=181
==============================================================================
--- trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java (original)
+++ trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java 2007-05-16 21:47:36+0000
@@ -597,7 +597,8 @@
* running of this thread.
*/
public void registerKey(SelectionKey key){
- controller.getSelectorHandler().register(key,SelectionKey.OP_READ);
+ controller.getSelectorHandler(Controller.Protocol.TCP)
+ .register(key,SelectionKey.OP_READ);
}
// -------------------------------------------------------------- Init //
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe_at_grizzly.dev.java.net
For additional commands, e-mail: commits-help_at_grizzly.dev.java.net