Timeout and Synchronous processes

A BPEL White Paper
Written by Olivier Le Diouris, Oracle Corporation
December, 2005

It is not possible - and if it was it would be highly decommended - to use the inMemoryOptimization with an Asynchronous process. An asynchronous process is also known as a durable process.
A durable process makes use of the dehydration datastore, so that the state of a process is maintained, even if the BPEL Process Manager goes down, is stopped or bounced.
Anyhow, for some processes, it could be useful, like the ones waiting for a short time before raising a timeout flag. Even the processes waiting for one second or so are not possible to implement using the inMemoryOptimization flag in bpel.xml.

The bottom line is that any process implementing somehow a Wait activity cannot use the inMemoryOptimization flag. receive, onAlarm, onMessage, and as a result pick, are using the Wait activity.
If you try to deply a process using such an activity with the inMemoryOptimization flag set, you will have a message like the following one:

Error(49): 
 [Error ORABPEL-10102]: Incompatible inMemoryOptimization config
 [Description]: in line 49 of "C:\_myWork\Projects\MiscTests\SynchronousTimeOut\SynchronousTimeOut.bpel", 
 wait, receive, onMessage or onAlarm creates dehydration point. 
 This process cannot be marked to do inMemoryOptimization.
 [Potential fix]: Remove this activity, or set inMemoryOptimization to be false in deployment descriptor.
        

We suggest here a workaround this aspect, so that inMemoryOptimization can be used for some processes, even if they are implementing a timeout-like pattern.

In the example we use to illustrate this workaround, we are talking about using a partner link actually accessing a Web Service. But the same pattern could probably be reproduced for any kind of partner link, using an Adapter or not.

The main idea is to push the timeout feature in the code to be exposed as a Web Service, and have the timeout value sent to this code as an extra parameter.
What we suggest here is an approach as generic as possible, in order to minimize the amount of code to write in the case a timeout is to be implemented along with a Web Service call.

Content

Example

Content

We are going here to start with a very simple web service, and see how to implement a timeout for its response.
We are actually going to implement a Web Service Client using an Observer design pattern, and trigger a timer as the client will be waiting for the reponse to the Web Service request.

The Web Service of the example will be the usual Greetings one, returning "Hello XXXX!" when you provide an "XXXX" value for the parameter of its "sayHello" method...

This Web Service is deployed, and exposed through its WSDL document.
In a nutshell, the path we will follow is the following one:

Infrastructure Classes and Interfaces

Content

ws.client.wait.WSException
          
package ws.client.wait;

public class WSException extends Exception 
{
  public WSException()
  {
    super();
  }
  public WSException(String s)
  {
    super(s);
  }
}
          

ws.client.wait.WSEvent
          
package ws.client.wait;

import java.util.EventObject;

public class WSEvent extends EventObject
{
  private String content = "";

  public WSEvent(Object source)
  {
    super(source);
  }
  public WSEvent(Object source, String s)
  {
    super(source);
    content = s;
  }

  public String getContent()
  { return this.content; }
}          

ws.client.wait.WSListener
          
package ws.client.wait;

import java.util.EventListener;
import ws.client.wait.WSEvent;

public class WSListener implements EventListener 
{
  public void dataDetected(WSEvent e)
  {
  }
  public void timeoutExpired(WSEvent e)
  {
  }
}
          

ws.client.wait.WSReader
          
package ws.client.wait;

import java.util.ArrayList;

public abstract class WSReader extends Thread
{
  private transient ArrayList WSListeners = null;

  boolean goRead = true;
    
  public WSReader(ArrayList al)
  {
//  System.out.println("Creating reader");
    WSListeners = al;
    this.addWSListener(new WSListener()
      {
        public void timeoutExpired(WSEvent e)
        {
          System.out.println("Stopping reading (cancel)");
          goRead = false;
          /** 
           * @todo Some action should be taken if goRead is set to false...
           */ 
        }
      });
  }

  protected void fireDataRead(WSEvent e)
  {
    for (int i=0; i<WSListeners.size(); i++)
    {
      WSListener l = (WSListener)WSListeners.get(i);
      l.dataDetected(e);
    }
  }

  protected void fireTimeout(WSEvent e)
  {
    for (int i=0; i<WSListeners.size(); i++)
    {
      WSListener l = (WSListener)WSListeners.get(i);
      l.timeoutExpired(e);
    }
  }

  public synchronized void addWSListener(WSListener l)
  {
    if (!WSListeners.contains(l))
    {
      WSListeners.add(l);
    }
  }

  public synchronized void removeWSListener(WSListener l)
  {
    WSListeners.remove(l);
  }

  public abstract void read();
  public abstract void stopReading();

  public void run()
  {
//  System.out.println("Reader Running");
    read();
  }
}
          

ws.client.wait.WSClient
          
package ws.client.wait;

import java.util.ArrayList;

public abstract class WSClient 
{
  private transient ArrayList WSListeners = new ArrayList(2);

  private WSReader reader;
  
  public WSClient()
  {
  }

  public void initClient()
  {
    this.addWSListener(new WSListener()
      {
        public void dataDetected(WSEvent e)
        {
          dataDetectedEvent(e); 
        }
      });
  }

  public void setReader(WSReader r)
  { this.reader = r; }
  public WSReader getReader()
  { return this.reader; }

  public ArrayList getListeners()
  {  return this.WSListeners; }
  
  public void startWorking()
  {
    this.reader.start();
  }

  public abstract void dataDetectedEvent(WSEvent e);
  
  public synchronized void addWSListener(WSListener l)
  {
    if (!WSListeners.contains(l))
    {
      WSListeners.add(l);
    }
  }

  public synchronized void removeWSListener(WSListener l)
  {
    WSListeners.remove(l);
  }
}
          

And finally

ws.client.wait.GenericTopClientInterface
          
package ws.client.wait;

public interface GenericTopClientInterface 
{
  public void wakeUp();
}
          

All the classes and interfaces above are generic, and will be used as they are by the classes actually extending or implementing them.

Implementation Classes

Content

ws.client.wait.example.CustomClient
          
package ws.client.wait.example;
import ws.client.wait.GenericTopClientInterface;
import ws.client.wait.WSClient;
import ws.client.wait.WSEvent;

public class CustomClient extends WSClient
{
  public CustomClient()
  {
    super();
  }
  
  GenericTopClientInterface parent = null;
  
  public void setClient(GenericTopClientInterface gtci)
  {
    parent = gtci;  
  }
  
  public void dataDetectedEvent(WSEvent e)
  {
    returnedValue = e.getContent();
    dataDetected = true;
    if (parent != null)
      parent.wakeUp();
  }

  private String returnedValue = "";
  private boolean dataDetected = false;
  public boolean isDataDetected()
  { return dataDetected; }
  public String getReturnedValue()
  { return returnedValue; }
}
          

ws.client.wait.example.CustomReader
          
package ws.client.wait.example;
import java.util.ArrayList;
import ws.client.AllPurposeWebServiceStub;
import ws.client.wait.WSEvent;
import ws.client.wait.WSReader;

public class CustomReader extends WSReader
{
  public CustomReader(ArrayList al)
  {
    super(al);
  }
  
  private String name = "";
  
  public void setInputName(String name)
  {
    this.name = name;
  }
  
  public void read(String s)
  {
    setInputName(s);
    read();
  }
  
  public void read()
  {
    AllPurposeWebServiceStub stub = new AllPurposeWebServiceStub();
    String resp = "";
    try 
    { 
      resp = stub.sayHello(name); 
      fireDataRead(new WSEvent(this, resp));
    }
    catch (Exception e) { e.printStackTrace(); }
  }

  public void stopReading()
  {
    fireTimeout(new WSEvent(this, "Expired"));
  }
}
          
This is where the stub for the original web service is invoked.

The class to expose as the new Web Service

Content

ws.client.wait.example.to.expose.WebService
          
package ws.client.wait.example.to.expose;

import ws.client.wait.GenericTopClientInterface;
import ws.client.wait.WSException;
import ws.client.wait.example.CustomClient;
import ws.client.wait.example.CustomReader;

public class WebService implements GenericTopClientInterface 
{
  CustomClient cc = null;  
  
  public void wakeUp()
  {
    if (toNotify != null)
    {
      synchronized (toNotify)
      {
        toNotify.notify();
      }
    }
  }
  
  Thread toNotify = null;

  /**
   * timemout: in milliseconds
   * @webmethod 
   */
  public String invokeWebService(String s, long timeout) throws WSException
  {
    String returnedValue = "";
    
    cc = new CustomClient();
    cc.initClient();
    CustomReader cr = new CustomReader(cc.getListeners());
    cr.setInputName(s);
    cc.setReader(cr);
    cc.startWorking();
    cc.setClient(this);
    toNotify = Thread.currentThread();
    synchronized(toNotify)
    {
      try { toNotify.wait(timeout); } 
      catch (Exception e) 
      { 
        System.err.println("In the wait block");
        e.printStackTrace(); 
      }
    }
    if (cc.isDataDetected())
    {
      returnedValue = cc.getReturnedValue();
    }    
    else
    {
      returnedValue = "Timeout Expired"; // Useless...
      throw new WSException("Timeout Expired (" + timeout + " ms)");
    }
    return (returnedValue);
  }
}
          

The method to call on this web service will then be invokeWebService.

Sample Projects

Content

The original Web Service.

The new Web Service, implementing the patterns above.

The BPEL Process, synchronously calling a Web Service, along with a Timer.

Sample Snapshots

Content

First call
We call the process, with a time out big enough not to expire. The value is in milliseconds.

First call
We have a reply. But as inMemoryOptimization is set to true, and as the instance is not faulted, not audit trace is kept.

Second call
We call the process, with a time out small enough to expire. The value is in milliseconds.

Second call
No reply is returned, but as the instance is faulted, we have a trace.

Second call
This is the raised exception, timeout has expired.


© 2005, Oracle