Hello Oleksiy,
Please see below the essential classes attached. this is after many desperate attempts to solve the issue but still this does not work.
Let me know if I need to provide something else.
Basically what should happen is that I receive a some information from one server passing to the next but between then i must do some checks. The data is basically some XML but the first 4 bytes of data is the length of the incoming data in Little Endian reverse.
(little endian size of incoming data)+XML data.
...<upms><msg type="bind"/></upms>….<upms><msg type="bind"/></upms>...<upms><msg type="bind"/></upms>
I need the StringFilter to delimit the stream then i process it and return the result. My result must be calculated the same with the length of the bytes before the XML and sent to which ever party. I use this delimiter because for some reason when I try to make the little indian calculation it fails with the creating the buffers. So i delimit it, process it and it it is ok i pass the original xml and size that i got but if the stream is continuos i need to split it up and process it then pass along the separated with the calculated indian format.
I f i need to provide some else let me know. I really need this
Thanks, Donald
public class ProxyServer {
private static final Logger log = Logger.getLogger("PROXY");
public static void main(String[] args) throws IOException {
// Parse command line options.
int localPort = Integer.parseInt(config.getProperty(Config.RUNTIME_LOCAL_PORT));
String remoteHost = config.getProperty(Config.RUNTIME_REMOTE_HOST);
int remotePort = Integer.parseInt(config.getProperty(Config.RUNTIME_REMOTE_PORT));
log.info(
"Proxying *:" + localPort + " to "
+ remoteHost + ':' + remotePort + " ...");
// Create TCP transport
final TCPNIOTransport transport =
TCPNIOTransportBuilder.newInstance().build();
// Create a FilterChain using FilterChainBuilder
FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
// Add TransportFilter, which is responsible
// for reading and writing data to the connection
filterChainBuilder.add(new TransportFilter());
/delimit requests coming in
filterChainBuilder.add(new StringFilter( Charset.defaultCharset(),"</upms>"));
//to convert string to Buffer
filterChainBuilder.add(new InterceptingFilter());
filterChainBuilder.add(new CoreFilter());
//proxy filter for communication between end points
filterChainBuilder.add(new ProxyFilter(TCPNIOConnectorHandler.builder(transport).build(), remoteHost, remotePort));
transport.setProcessor(filterChainBuilder.build());
try {
// binding transport to start listen on certain host and port
transport.bind("localhost", localPort);
// start the transport
transport.start();
} catch(Exception e){
log.error("error in startup ", e);
}
}
}
===========================
public class InterceptingFilter extends BaseFilter {
private static final Logger log = Logger.getLogger(InterceptingFilter.class);
@Override
public NextAction handleRead(FilterChainContext ctx)
throws IOException {
Object peerAddress = ctx.getAddress();
String message = ctx.getMessage();
log.debug("incoming "+message.getBytes().length + "\n" +message);
ByteArrayOutputStream writer = new ByteArrayOutputStream();
//
writer.write(message.getBytes());
String out = writer.toString();
log.debug(out.toString());
HeapBuffer buf = new HeapMemoryManager().allocate(out.getBytes().length);
buf.put(out.getBytes());
buf.position(0);
buf.get();
log.debug(buf.toStringContent());
ctx.setMessage(buf);
// ctx.write(peerAddress, message, null);
return ctx.getInvokeAction();
}
}
====================================
public class CoreFilter extends BaseFilter {
private static final String UPMS = "<upms>";
private static final Logger log = Logger.getLogger(CoreFilter.class.getName());
private static final Logger eventslog = Logger.getLogger("events");
private Executor scheduler = Executors.newCachedThreadPool();
@Override
public NextAction handleRead(final FilterChainContext ctx)
throws IOException {
if (ctx.state() == FilterChainContext.State.RUNNING) {
return ctx.getInvokeAction();
}
if (ctx.getMessage() == null) {
return ctx.getStopAction();
}
// String raw = ctx.getMessage();
final Buffer raw = ctx.getMessage();
eventslog.debug("raw message \n" + raw.toStringContent());
final NextAction suspendAction = ctx.getSuspendAction();
// suspend the current execution
ctx.suspend();
// schedule async work
scheduler.execute(new ProxyHandler(ctx, raw));
// return suspend status
return suspendAction;
}
private boolean isBegin(String message) {
if(GenericValidator.isBlankOrNull(message))
return false;
boolean begin = false;
try {
XPathFactory xpathf = XPathFactory.newInstance();
XPath xpath = xpathf.newXPath();
String expr = "/upms/msg[@type='begin']/_at_type";
String result = xpath.evaluate(expr, new InputSource(new StringReader(message)));
if (!GenericValidator.isBlankOrNull(result)) {
begin = true;
}
} catch (Exception e) {
log.error("\n"+message);
log.error("error checking incoming message ", e);
}
return begin;
}
private void sendEndMessage(FilterChainContext context, String incomingXml, String msgId)
throws Exception {
try {
String endXml = Util.getEndMessage(msgId, incomingXml);
int msgLen = endXml.length();
StreamWriter writer = StandaloneProcessor.INSTANCE.getStreamWriter(context.getConnection());
writer.writeByte((byte) msgLen);
for (int i = 1; i <= 3; i++) {
writer.writeByte((byte) (msgLen >> i * 8));
}
writer.writeByteArray(endXml.getBytes(), 0, msgLen);
Future writeFuture = writer.flush();
writeFuture.get();
eventslog.debug("write proxy end message " + endXml);
} catch (Exception e) {
log.error("error writing end message", e);
throw new Exception(e);
}
}
private class ProxyHandler implements Runnable {
private FilterChainContext ctx;
private Buffer raw;
public ProxyHandler(FilterChainContext context, Buffer incomingXml) {
this.ctx = context;
this.raw = incomingXml;
}
@Override
public void run() {
try {
String msg = raw.toStringContent();
msg = msg.substring(msg.indexOf(UPMS));
if(!msg.contains("</upms>"))
msg += "</upms>";
log.debug("parsed message \n" + msg);
if (isBegin(msg.trim())) {
int status = CoreHandler.charge(msg);
log.debug("PROXY Charging status=" + status);
switch (status) {
case CoreHandler.ERROR:
sendEndMessage(ctx, msg, "error");
ctx.setMessage(null);
break;
case CoreHandler.INSUFFICEINT_FUNDS:
sendEndMessage(ctx, msg, "no.funds");
ctx.setMessage(null);
break;
default:
log.debug("successful--passing on " + msg);
ctx.setMessage(raw);
break;
}
} else {
ctx.setMessage(raw);
}
} catch (Exception e) {
log.error("error processing", e);
ctx.setMessage(null);
}
// Resume the FilterChain IOEvent processing
ctx.resume();
}
}
}
=====================
public class ProxyFilter extends BaseFilter {
private static final Logger logger = Logger.getLogger(ProxyFilter.class);
private Attribute<Connection> peerConnectionAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("TunnelFilter.peerConnection");
private final SocketConnectorHandler transport;
private final SocketAddress redirectAddress;
public ProxyFilter(SocketConnectorHandler transport, String host, int port) {
this(transport, new InetSocketAddress(host, port));
}
public ProxyFilter(SocketConnectorHandler transport, SocketAddress redirectAddress) {
this.transport = transport;
this.redirectAddress = redirectAddress;
}
@Override
public NextAction handleRead(final FilterChainContext ctx)
throws IOException {
Connection connection = ctx.getConnection();
final Connection peerConnection = (Connection) this.peerConnectionAttribute.get(connection);
if (!connection.isOpen()) {
return ctx.getStopAction();
}
if (peerConnection == null) {
this.transport.connect(this.redirectAddress, new ConnectCompletionHandler(ctx));
return ctx.getSuspendAction();
}
redirectToPeer(ctx, peerConnection);
return ctx.getStopAction();
}
@Override
public NextAction handleClose(FilterChainContext ctx)
throws IOException {
Connection connection = ctx.getConnection();
Connection peerConnection = (Connection) this.peerConnectionAttribute.get(connection);
if ((peerConnection != null) && (peerConnection.isOpen())) {
peerConnection.close();
}
return ctx.getInvokeAction();
}
private static void redirectToPeer(FilterChainContext context, Connection peerConnection)
throws IOException {
Connection srcConnection = context.getConnection();
Buffer message = context.getMessage();
logger.debug("Proxying messaging from " + srcConnection.getPeerAddress() + " to " + peerConnection.getPeerAddress());
// String message = context.getMessage();
logger.debug(message.toStringContent());
peerConnection.write(message);
// sendMessage(peerConnection, message.toStringContent());
}
private static void sendMessage(Connection connection, String outgoingXml){
try {
String endXml = outgoingXml.substring(outgoingXml.indexOf("<upms>")).trim()+"</upms>";
ByteArrayOutputStream writer = new ByteArrayOutputStream();
int msgLen = (endXml).trim().getBytes().length;
writer.write((byte) msgLen);
for (int i = 1; i <= 3; i++) {
writer.write((byte) (msgLen >> i * 8));
}
writer.write(endXml.getBytes());
String out = writer.toString();
logger.debug(out.getBytes().length+" final \n"+out);
connection.write(out);
} catch (Exception e) {
logger.error("error writing end message", e);
}
}
private class ConnectCompletionHandler implements CompletionHandler<Connection> {
private final FilterChainContext context;
private ConnectCompletionHandler(FilterChainContext context) {
this.context = context;
}
@Override
public void cancelled() {
close(this.context.getConnection());
resumeContext();
}
@Override
public void failed(Throwable throwable) {
close(this.context.getConnection());
resumeContext();
}
@Override
public void completed(Connection peerConnection) {
Connection connection = this.context.getConnection();
ProxyFilter.this.peerConnectionAttribute.set(connection, peerConnection);
ProxyFilter.this.peerConnectionAttribute.set(peerConnection, connection);
resumeContext();
}
@Override
public void updated(Connection peerConnection) {
}
private void resumeContext() {
this.context.resume();
}
private void close(Connection connection) {
try {
connection.close();
} catch (IOException e) {
logger.error("error closing connection", e);
}
}
}
}
On Sep 12, 2011, at 4:22 AM, Oleksiy Stashok wrote:
> Hi,
>
> can you pls. share the entire project sources, or at least part of it so we can reproduce the problem?
> What are you using as peer?
>
> Thanks.
>
> WBR,
> Alexey.
>
> On 09/11/2011 07:12 PM, donald.walters_at_gmail.com wrote:
>> I created a filter chain which includes a StringFilter and the
>> TunnelFilter example but for some reason the redirecttoPeer method in
>> the TunnelFilter is not working.
>>
>> filterChainBuilder.add(new StringFilter(Charset.defaultCharset(),
>> "</jps>"));
>> filterChainBuilder.add(new
>> TunnelFilter(TCPNIOConnectorHandler.builder(transport).build(),
>> remoteHost, remotePort));
>>
>> ....
>>
>> in the TunnelFilter
>>
>> private static void redirectToPeer(FilterChainContext context,
>> Connection peerConnection)
>> throws IOException {
>> Connection srcConnection = context.getConnection();
>> logger.debug("Proxying messaging from " +
>> srcConnection.getPeerAddress() + " to " +
>> peerConnection.getPeerAddress());
>>
>> String message = context.getMessage();
>> logger.debug(message);
>>
>> peerConnection.write(message);
>>
>> }
>>
>> when that method is called nothing happens. I see the output and the
>> messages are ok but nothing is passed to the peer. Please let me know
>> what I am doing wrong here.
>