Sorry about that, seems I haven't been precise enough as to where in my
code the problem lies. It is when I invoke
System.err.println("obtaining channel");
InetAddress localAddress = channel.socket().getLocalAddress();
It is in the BusinessLogicResponseHandler.processResponse() method.
Thanks,
Simon
-----Original Message-----
From: Simon Trudeau [mailto:strudeau_at_bluetreewireless.com]
Sent: March-11-08 10:49 AM
To: users_at_grizzly.dev.java.net
Subject: RE: Re: [Q] Adding delays to protocol filter
I have tried implementing your solution but I run into some problems,
it's just that I don't know where to save and reattach my keys! ... The
truth of the matter is I understand very little about the
registration/deregistration of keys and their interest! :.)
I have tried implementing an EchoFilter with a delayed response.
Unfortunately, I have screwed up somewhere with the key registration /
deregistration so when I invoke
SocketChannel channel = (SocketChannel)k.channel();
It blocks right there and my protocol filter can't handle any more
packets.
Here's my filter code, let me know what you think. I'm pretty sure it
has to do with the key registration which is screwed up.
Thanks,
Simon
P.S.: Is there a way to send properly formatted source code and the
likes to the mailing list? It must be very hard for you to read our code
snippets without pasting them into an editor.
**********
public class JitterEchoFilter implements ProtocolFilter {
private ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
private CountDownLatch latch;
public JitterEchoFilter(CountDownLatch latch) {
this.latch = latch;
}
private static int counter = 0;
public boolean execute(Context ctx) throws IOException {
final WorkerThread workerThread =
((WorkerThread)Thread.currentThread());
ByteBuffer buffer = workerThread.getByteBuffer();
buffer.flip();
System.err.println("executing filter: " + counter++);
System.err.println("buffer has remaining:
"+buffer.hasRemaining() + " remainingSize: "+ buffer.remaining());
/*
SelectionKey k = ctx.getSelectionKey();
InetAddress localAddress =
((SocketChannel)k.channel()).socket().getLocalAddress();
int localPort =
((SocketChannel)k.channel()).socket().getLocalPort();
byte[] buf = new byte[buffer.remaining()];
//System.err.println("remaining: "+buffer.remaining());
buffer.get(buf);
System.err.println("processing response --> ctx
instance: " + ctx + " Buffer content: " + new String(buf) + " Received
at: " + localAddress.getHostAddress() +":"+ localPort);
System.err.println("buffer.hasRemaining:
"+buffer.hasRemaining());
*/
if (buffer.hasRemaining()) {
/*
* Since response processing is performed
asynchronously, we need to
* save Grizzly's buffer into a new buffer for
response processing
* so we can release the buffer for message
reception.
*/
ByteBuffer duplicate = buffer.duplicate();
byte[] bufferAsBytes = new byte[duplicate.remaining()];
duplicate.get(bufferAsBytes);
ByteBuffer copyBuffer = ByteBuffer.wrap(bufferAsBytes);
ctx.setKeyRegistrationState(Context.KeyRegistrationState.NONE);
System.err.println("copyBuffer remaining: " +
copyBuffer.remaining());
executor.schedule(new BusinessLogicResponseHandler(ctx,
copyBuffer), 3, TimeUnit.SECONDS);
}
buffer.clear();
return false;
}
private class BusinessLogicResponseHandler implements
Callable<Void>
{
private Context ctx;
private ByteBuffer buffer;
public BusinessLogicResponseHandler(Context ctx,
ByteBuffer buffer) {
this.ctx = ctx;
this.buffer = buffer;
}
@Override
public Void call() throws Exception{
System.err.println("calling!");
ByteBuffer buffer = processResponse(ctx,
this.buffer);
/*
* Send the response back to the requester
*/
SelectionKey key = ctx.getSelectionKey();
SelectorHandler sl = ctx.getSelectorHandler();
sl.getSelectionKeyHandler().register(key, 15000L);
//Hasn't this method been deprecated?
SelectableChannel channel =
ctx.getSelectionKey().channel();
try {
if (ctx.getProtocol() ==
Controller.Protocol.TCP) { // TCP case
OutputWriter.flushChannel(channel,
buffer);
} else if (ctx.getProtocol() ==
Controller.Protocol.UDP) { // UDP case
DatagramChannel datagramChannel =
(DatagramChannel) channel;
SocketAddress address =
(SocketAddress)ctx.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
OutputWriter.flushChannel(datagramChannel, address, buffer);
}
} catch (IOException ex) {
// Store incoming data in byte[]
byte[] data = new byte[buffer.remaining()];
int position = buffer.position();
this.buffer.get(data);
this.buffer.position(position);
Controller.logger().log(Level.WARNING,
"Exception. Echo \"" + new String(data) + "\"");
throw ex;
}
System.err.println("has returned!");
return null;
}
protected ByteBuffer processResponse(Context ctx,
ByteBuffer buffer)
{
System.err.println("processing!");
/*
* This implements an Echo operation but with
delay!
*/
SelectionKey k = ctx.getSelectionKey();
System.err.println("obtaining selectionKey");
SocketChannel channel =
(SocketChannel)k.channel();
System.err.println("obtaining channel");
InetAddress localAddress =
channel.socket().getLocalAddress();
System.err.println("obtaining localAddress");
int localPort =
((SocketChannel)k.channel()).socket().getLocalPort();
System.err.println("obtaining local port");
byte[] buf = new byte[buffer.remaining()];
System.err.println("remaining:
"+buffer.remaining());
buffer.get(buf);
System.err.println("processing response --> ctx
instance: " + ctx + " Buffer content: " + new String(buf) + " Received
at: " + localAddress.getHostAddress() +":"+ localPort);
System.err.println("latch.countDown():
"+latch.getCount());
System.err.println("Buffer hasRemaining:
"+buffer.hasRemaining());
latch.countDown();
return buffer;
}
}
public boolean postExecute(Context ctx) throws IOException {
return true;
}
}
-----Original Message-----
From: Jeanfrancois.Arcand_at_Sun.COM [mailto:Jeanfrancois.Arcand_at_Sun.COM]
Sent: March-07-08 2:30 PM
To: users_at_grizzly.dev.java.net
Subject: Re: [Q] Adding delays to protocol filter
Hi Simon,
Simon Trudeau wrote:
> I want to test the performance of my application. To do so, I intend
on
> building an Echo server with delay. Basically, once my request reaches
> my server, I want to delay (simulate response processing) the sending
of
> a response. My first guess is to wrap the content of the execute
method
> of the EchoFilter inside a callable and have it executed by a
scheduled
> executor. Is that thread safe?
Yes you can, but make sure you call:
ctx.setKeyRegistrationState(
Context.KeyRegistrationState.NONE);
// Store those values somewhere:
SelectionKey key = ctx.getSelectionKey();
Selectorhandler sl = ctx.getSelectorHandler();
inside you Filter, because if not, the key will be registered back and
it will gives unexpected result. Once you have flushed the response,
just do:
sl.getSelectionKeyHandler().register(key);
So you can get other requests that are coming.
>
>
>
> What happens to the context object if another packet arrives from the
> same connection while my server is waiting to send back the response?
>
>
>
> The issue I want to simulate is a fast producer (client), slow
consumer
> (server).
Right. Try the above :-)
Thanks
-- Jeanfrancois
>
>
>
> Can I just wrap
>
>
>
> *public* *boolean* execute(Context ctx) *throws* IOException {
>
> *final* WorkerThread workerThread =
> ((WorkerThread)Thread./currentThread/());
>
> ByteBuffer buffer = workerThread.getByteBuffer();
>
> buffer.flip();
>
> *if* (buffer.hasRemaining()) {
>
> // Depending on protocol perform echo
>
> SelectableChannel channel =
ctx.getSelectionKey().channel();
>
> *try* {
>
> *if* (ctx.getProtocol() == Controller.Protocol./TCP/)
{
> // TCP case
>
> OutputWriter./flushChannel/(channel, buffer);
>
> } *else* *if* (ctx.getProtocol() ==
> Controller.Protocol./UDP/) { // UDP case
>
> DatagramChannel datagramChannel =
(DatagramChannel)
> channel;
>
> SocketAddress address = (SocketAddress)
>
>
> ctx.getAttribute(ReadFilter./UDP_SOCKETADDRESS/);
>
> OutputWriter./flushChannel/(datagramChannel,
> address, buffer);
>
> }
>
> } *catch* (IOException ex) {
>
> // Store incoming data in byte[]
>
> *byte*[] data = *new* *byte*[buffer.remaining()];
>
> *int* position = buffer.position();
>
> buffer.get(data);
>
> buffer.position(position);
>
>
>
> Controller./logger/().log(Level./WARNING/,
>
> "Exception. Echo \"" + *new* String(data) +
"\"");
>
> *throw* ex;
>
> }
>
> }
>
>
>
> buffer.clear();
>
> *return* *false*;
>
> }
>
>
>
> Inside a callable and execute it from a Schedule Executor or are there
> states that will be messed up (the context maybe...?). What do you
think?
>
>
>
> Thanks,
>
>
>
> Simon
>
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
For additional commands, e-mail: users-help_at_grizzly.dev.java.net
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
For additional commands, e-mail: users-help_at_grizzly.dev.java.net