Oracle® Enterprise Service Bus Developer's Guide 10g (10.1.3.4.0) Part Number E12638-02 |
|
|
View PDF |
Oracle Application Server release 10.1.3.4 introduces support for implementing the Resequencer in ESB. This section contains the following topics:
A Resequencer is used to rearrange a stream of related but out-of-sequence messages back into order. The messages are rearranged on an identifier that is a part of the message. The Resequencer should be aware of the identifier sequence for rearranging the messages. While rearranging the messages, the Resequencer typically uses the identifier sequence to decide the position of a message. This section describes the following:
Caution:
The Resequencer is available inpreview
mode and is disabled by default. When disabled, it does not affect the existing product functionality. If you want to use the Resequencer, then you must enable it by setting the value of the ESB configuration parameter EnableResequencer
to true
. For more information, refer to Development.ESB can resequence the incoming messages in a user-specified order. This implementation allows you to specify the three types of orders:
First In First Out (FIFO)
The FIFO Oracle ESB Resequencer supports the FIFO pattern, which is relevant for applications that need sequencing based on the time of arrival of the message to the ESB Routing Service configured as a Resequencer. The FIFO Resequencer receives a stream of messages that are in order, and uses an external data store to store the messages in order until they are processed asynchronously.
When the messages are processed, they are processed in sequence based on the time of arrival to the routing service. It is important to note that the messages to outbound services of the FIFO Resequencer routing service are guaranteed to arrive in order based on the time of arrival. The FIFO Resequencer does not modify the message contents; it just delivers them in FIFO order.
Note:
The ESB FIFO Resequencer delivers messages in the same order in which messages are received by the ESB routing service. However, in a flow, where ESB receives the input from a Database adapter or a File adapter, the processing may not be in a FIFO fashion because the Database adapter and the File adapter do not process messages in a FIFO fashion. Refer to Configuring Database Adapters and File Adapters for FIFO for workarounds that need to be applied on the File adapter and the Database adapter to make them process messages in a FIFO fashion.Standard Resequencer
The Standard Oracle ESB Resequencer supports a standard Resequencer pattern, which is relevant for applications that use identifiers from a simple numeric identifier sequence in their messages. The Standard Resequencer receives a stream of messages that may not arrive in order. This uses an internal data store to the messages until a complete sequence based on the SequenceId
is obtained. The messages are processed as soon as they are in order.
The in-sequence messages, based on SequenceId
, are then processed asynchronously in their own transaction one by one. It is important to note that the messages to outbound services of the Standard Resequencer routing service are guaranteed to arrive in sequence. The Standard Resequencer does not modify the message contents; it just orders them.
BestEffort Resequencer
The Oracle ESB Resequencer supports a BestEffort
pattern, which is relevant for applications that produce a large number of messages in a short span of time and cannot inform the Oracle ESB Resequencer about the sequence identifier sequence. This way, ESB makes the best effort to ensure that the messages are delivered in sequence. The Oracle ESB BestEffort
Resequencer can receive a stream of messages that are in order or just a little bit out of order. The messages are processed as soon as they are in order. This Resequencer can also reorder messages based on no knowledge of SequenceId
. This means that you do not define the sequence such as with the Standard Resequencer.
The BestEffort Resequencer uses an internal data store to store the messages until they are processed asynchronously. When the messages are processed, they are processed in sequence based on the specified SequenceId
and the messages that have arrived at that point in time, no matter if a true sequence cannot be obtained. It is important to note that the messages to outbound services of the BestEffort
routing service are not guaranteed to arrive in order based on SequenceId
. This does not modify the message contents; it just tries to deliver them in sequence.
You can use Date and Time data to sequence the messages, when the resequencing order is BestEffort
Resequencer. For this, you need to add the endpoint property ResequencerSequenceIdDataType
, which specifies that the data type of ResequencerSequenceIdXPath
is xsd:dateTime
. The Resequencer then interprets ResequencerSequenceIdXPath
based on the xsd:dateTime
type and sequences the messages.
Oracle Resequencer adds three new thread pools to the ESB run-time, which are used to deliver queued messages for resequencing:
Locker Thread
The locker threads are used to populate a work queue. The locker threads periodically search the database tables to find group IDs that can be processed by the worker threads. The locker threads put these group IDs in a work queue.
Worker Thread
The worker threads pick up the group IDs from the work queue and run the resequencing logic.
HeartBeat Thread
The heartbeat thread is used for unlocking groups that were locked by a crashed container. There will be only one instance of this thread.
Note:
For Oracle ESB samples using Resequencer, visit the following URLhttp://www.oracle.com/technology/products/integration/esb/index.html
This section describes how you can use the resequencing feature. You will use the resequencing feature at the application design and development phase, when developing an ESB-based application and also while administering the deployed application. This section includes the following topics:
Development
In the application design and development phase, you can enable resequencing support for the routing service by performing the following steps:
Enable resequencing support in ESB - by setting the value of the ESB configuration parameter EnableResequencer
(in the esb_config.ini
file or using the -DEnableResequencer
option when ESB starts) to true
. You must set the value of this ESB configuration parameter to provide sequencing for a routing service also.
Enable resequencing support for a routing service - by setting the ResequencerType
endpoint property to Standard
, BestEffort
, or FIFO
. You also need to set the value of the ESB configuration parameter EnableResequencer
(in esb_config.ini
or using the -DEnableResequencer
option when ESB starts) to true
.
Specify the Group ID field - by setting the ResequencerGroupXPath
endpoint property expression that specifies the group identifier field in the incoming messages. All the messages with the same value for the group ID field will form the set of incoming messages on which resequencing (of ResequencerType) will be done. If ResequencerGroupXPath
is not specified, then all the messages received by the routing service will belong to one group.
The group ID XPath and sequence ID XPath use the following format:
{<<expression>>};{namespace ns1=<namespace url> namespace ns2=<namespaceurl>...}.
The format is the same as that used in the subscription filters. An example of XPath is as follows:
{//ns1:owner};{namespacens1=http://www.w3schools.com}
You will have to set additional endpoint properties to configure the desired resequencing. These additional properties will vary with the desired resequencing and are mentioned in the table in step 4.
Configure Standard Resequencing - by specifying the endpoint properties mentioned in the following table:
Sl. no | Endpoint Property Name | Description |
---|---|---|
1. | ResequencerSequenceIdXPath |
XPath that points to the field in the incoming message on which resequencing will be done. |
2. | ResequencerTimeout |
The time period in milliseconds to wait for an expected message.The Resequencer will lock the group as errored out if timeout happens. The group will be unlocked when a new message for the locked group arrives. The default value for timeout is Long.MAX_VALUE .
See Also: Refer to "Resequencer Timeout Feature" for limitations of this endpoint property. |
3. | ResequencerSequenceStart and ResequencerSequenceIncrement |
The identifier sequence will be a natural number sequence, and the user will have to specify the SEQUENCE_START and SEQUENCE_INCREMENT values. The default value of both of these properties is 1. |
Configure BestEffort Resequencing - by specifying the endpoint property mentioned in the following table:
Sl. no | Endpoint Property Name | Description |
---|---|---|
1. | ResequencerSequenceIdXPath |
XPath that points to the field in the incoming message on which resequencing will be done. |
2. | ResequencerSequenceIdDataType |
Set its value to dateTime to pass an input of type xsd:dateTime . (http://www.w3schools.com/Schema/schema_dtypes_date.asp). By default, the data type is assumed to be a number. |
Configure FIFO Resequencing - by specifying the endpoint property mentioned in the following table:
Sl. no | Endpoint Property Name | Description |
---|---|---|
1. | ResequencerSequenceIdXPath |
XPath that points to the field in the incoming message on which resequencing will be done. |
Configuring the Dequeue System
For a Resequencer-enabled routing service, the ESB enqueues the incoming messages to the database. The dequeue system of the Resequencer consists of a locker thread and a worker thread pool. The locker thread and the worker thread pool are tied to a system. Each system has its own locker thread and worker thread pool. The locker thread locks the groups to avoid concurrent access and then passes the locked groups to the worker thread pool. The worker thread pool retrieves the messages in bulk for a locked group and processes the messages. The following table describes the parameters to configure the dequeue system:
Name | Type | Description | Default |
---|---|---|---|
ResequencerMaxGroupsLocked |
ECP | Number of groups locked by the dequeue system for processing. | Number of all available groups. |
ResequencerMaxRowsRetrieved |
EPP | Number of in-order messages that Resequencer should pick from the data store at a time. | 1 for BestEffort and 5 for Standard and FIFO. |
ResequencerWorkerThreadPoolSize |
ECP | Number of threads per system for processing messages. | 1 |
systemName_ResequencerWorkerThreadPoolSize |
ECP | Number of threads per system for processing messages. The value of this parameter overrides the value of the ResequencerWorkerThreadPoolSize parameter for the particular system. |
1 |
ResequencerLockerThreadSleep |
ECP | Time in milliseconds for which the dequeue system thread sleeps, if it does not find a group for processing. | 2000 milliseconds |
where EPP is End Point Property and ECP is ESB Configuration Parameter.
Configuring the Heartbeat Infrastructure
The Resequencer's dequeue system locks a group and then processes the messages of the group. The dequeue system uses an instance identifier (ID) to lock a group. The instance ID is created and maintained by the heartbeat infrastructure. There is a single instance ID for every ESB instance. The heartbeat infrastructure creates this unique instance ID for the ESB instance. It inserts the instance ID, along with the current time into a table called ESB_CONTAINERID_LEASE
.
The heartbeat infrastructure has one heartbeat thread. The heartbeat thread periodically updates the time associated with the instance ID to announce its presence to other ESB instances. The ESB configuration parameter ResequencerContainerIdLeaseRefresh
, specified in minutes, is used for this purpose. The default value of this parameter is 10 minutes.
The heartbeat thread also looks for instance IDs that have not been updated within a configurable time period. The ESB configuration parameter ResequencerContainerIdLeaseTimeout
, specified in minutes, is used for this purpose. The default value of this parameter is 15 minutes. The thread releases the locks held by these instance IDs. Once the groups are unlocked, the dequeue system of other ESB instances can pick up the groups for processing.
Administration
The administrative tasks deal with the management of Resequencer support in ESB. In this release, the following administrative tasks are supported:
Resubmitting an error-generating message.
The message that generates an error is moved to the error hospital. The group is moved to an error state and is not picked up by the dequeue system of any ESB instance. The user can resubmit the error-generating message using the error hospital. On resubmission of the message, the group is moved to a normal state, so that it can be picked up by the dequeue system of any ESB instance.
Managing timed out groups.
A timed out group is not picked for processing. The resequencer_list_timedout_groups.sql
SQL script can be used to list the routing service GUID, the group ID, and the group name of timed out groups. You can then pass this information to the resequencer_restart_processing_group.sql
SQL script, so that a group is reopened for processing.
Purging the message store.
Resequencer does not delete any message that it receives. So, you must manually run a purge script to remove the processed messages that are older than a given time stamp. The purge script is a PL/SQL procedure called resequencer_purge_messages
and is available in the resequencer_purge_message_store.sql
file.
Caution:
Use this script with caution to avoid unintended deletion of messages.For example, use the following command to delete all instances older than a given time stamp:
resequencer_purge_messages(to_timestamp('2006-05-11 00:00:00','YYYY-MM-DD HH24:MI:SS'))
Again, use the following command to delete all instances that are older than 120 days:
resequencer_purge_messages(SYSDATE - 120)
Managing Locked Groups.
When a project is undeployed, the messages for a Resequencer enabled routing service are rejected. If the messages, already existing in the data store, are picked up for processing, then the Resequencer dequeue system is unable to process the messages and locks the groups because metadata for the routing service is not available. Such locked groups may be listed by using the PL/SQL procedure in the file resequencer_list_erroredout_groups.sql
. This procedure lists the group identifier and routing service GUID for the groups that have reached an error state because the project was undeployed. You can pass the group ID and routing service GUID to the procedure resequencer_restart_processing_group.sql
to reopen a group for processing.
The following limitations of Resequencer have been noted in this release:
Change in Resequencer Type Not Supported
In the 10.1.3.4 release, ESB provides three types of resequencing. The type of resequencing required by a routing service is specified at design time by setting the value of the ResequencerType
endpoint property to any of the following: Standard
, BestEffort
, or FIFO
. The Resequencer run-time maintains information about the state of the message sequence that is used to pick the next message for processing.
In the 10.1.3.4 release, the Resequencer does not support a change in the Resequencer type at design time, because the change in the Resequencer type will corrupt the state information about the message sequence. This may lead to undesirable effects such as messages not picked up for processing.
This issue is tracked with Oracle bug 7117296.
Resequencer Timeout Feature
For the standard Resequencer, an endpoint property called ResequencerTimeout
can be used to set the time that the Resequencer should wait for a group's next message. If the next message is not received within the ResequencerTimeout
period, then the Resequencer will mark the group in the timed out state and the group will not be picked up for processing.
In the 10.1.3.4 release, the timeout feature is not available. This is because of the absence of a user-friendly way to open the groups in the timed out state for processing. Currently, the timeout feature requires the use of SQL to open the groups in the timed out state. In this release, the default value of the ResequencerTimeout
endpoint property is set to Long.MAX_VALUE
(http://java.sun.com/j2se/1.4.2/docs/api/java/lang/Long.html#MAX_VALUE).
This issue is tracked with Oracle bug 7117346.
Resequencer Endpoints Need to Be on the Same System
When, Resequencer is enabled on the routing service, then all routing rules in that routing service should connect to the endpoints that are on the same system.
This way, for routing rule targets belonging to different systems, such targets should be faced with Resequencer-enabled routing service.
This issue is tracked with Oracle bug 7143449.
Resequencer Fails If XSD File Contains Multibyte Characters That the Server Locale Encoding Does Not Support
If the XSD file contains multibyte characters that the server locale encoding does not support, then the resquencer execution fails after triggering the project flow. ESB control also shows failure.
This issue is tracked with Oracle bug 7218685.
This section describes how to make the Database adapters and the File adapters to work in FIFO fashion.
Perform the following for a File adapter:
Set the number of processor threads to 1 by performing the following steps:
Rename the $ORACLE_HOME\integration\esb\config\pc.properties.esb
file to $ORACLE_HOME\integration\esb\config\pc.properties
and set the value of the oracle.tip.adapter.file.numProcessorThreads
parameter to 1.
Edit the $ORACLE_HOME\bpel\system\service\config\pc.properties
file and set the value of the oracle.tip.adapter.file.numProcessorThreads
parameter to 1. The default value of this parameter is 4.
Note:
For SOA installations, the settings specified in the BPEL file$ORACLE_HOME\bpel\system\service\config\pc.properties
takes precedence over the corresponding ESB settings. So, you must edit this BPEL file to have the correct configuration settings for ESB.Configure the inbound adapter by performing the following steps:
In JDeveloper, edit the WSDL file of the inbound file adapter to add Sorter="oracle.tip.adapter.file.sorter.TimestampSorterAscending"
to ActivationSpec
in the following way:
<jca:operation PhysicalDirectory="/home/Xiaocheng/oracle/product/10.1.3.1soa/resequencer/resequencer/F2Fdata/in" ActivationSpec="oracle.tip.adapter.file.inbound.FileActivationSpec" Sorter="oracle.tip.adapter.file.sorter.TimestampSorterAscending" DeleteFile="true" IncludeFiles=".*\.xml" PollingFrequency="3" MinimumAge="2" OpaqueSchema="false" > </jca:operation>
Note:
The changes made to this file will not take effect at this point.Save the file.
In JDeveloper, open the .esbsvc
file of the inbound file adapter.
Click Configure tech adapter service wsdl and follow the wizard without making any modification to the WSDL file.
Save the file.
Open the WSDL file of the inbound file adapter again and check whether or not the settings you made in Step 1 are intact and the file looks like the following:
<jca:operation PhysicalDirectory="/home/Xiaocheng/oracle/product/10.1.3.1soa/resequencer/resequencer/F2Fdata/in" ActivationSpec="oracle.tip.adapter.file.inbound.FileActivationSpec" Sorter="oracle.tip.adapter.file.sorter.TimestampSorterAscending" DeleteFile="true" IncludeFiles=".*\.xml" PollingFrequency="3" MinimumAge="2" OpaqueSchema="false" > </jca:operation>
Register the project with ESB again.
You will receive a notification regarding successful registration.
Note:
After you complete the settings, the file adapter guarantees to send the files to the ESB Routing Service based on the time stamp of the files. However, if you check the time stamp of the files by using the ls --full-time
command, then you will see the time to be like '2008-06-25 09:36:45.000000000 +0800', that is, the granularity is on second level. So, if files are copied into the folder of the adapter too fast, then they will have the same time stamp. In such a situation, this workaround cannot guarantee that the order is FIFO because the messages will have the same time of arrival.
If you make the thread, copying the files, sleep for 1000 milliseconds, then you are guaranteed to have s different time stamp for each file.
Perform the following for a Database adapter:
Set the number of Database threads to 1 by performing the following steps:
In JDeveloper, open the WSDL file of the inbound database adapter.
Set the value of the NumberOfThreads
parameter to 1 in the following way:
<jca:operation ActivationSpec="oracle.tip.adapter.db.DBActivationSpec" DescriptorName="GetTransactions.OrderTransactionsFifo1" QueryName="GetTransactions" PollingStrategyName="DeletePollingStrategy" SequencingFieldName="TS" MaxRaiseSize="1" MaxTransactionSize="unlimited" PollingInterval="5" NumberOfThreads="1" UseBatchDestroy="false" ReturnSingleResultSet="false" MappingsMetaDataURL="GetTransactions_toplink_mappings.xml" />
Save the file.
Add a column to store the time stamp when a row is inserted into the table so that you can sort by this column while configuring the Database adapter. Perform the following steps to achieve this:
In JDeveloper, open the .esbsvc
file of the inbound file adapter.
Click Configure tech adapter service wsdl and follow the wizard.
On the Select Table page, import the table of the Database adapter again to add the newly added timestamp column.
If the table does not have a primary key, then specify the time stamp column as the virtual primary key.
On the Polling Options page, specify time stamp as the value of the Order By field.
Save the file.
Open the WSDL file again to check whether or not SequencingFieldName
is set as the column name of the time stamp column (TS). The WSDL file should look like the following:
<jca:operation ActivationSpec="oracle.tip.adapter.db.DBActivationSpec" DescriptorName="GetTransactions.OrderTransactionsFifo1" QueryName="GetTransactions" PollingStrategyName="DeletePollingStrategy" SequencingFieldName="TS" MaxRaiseSize="1" MaxTransactionSize="unlimited" PollingInterval="5" NumberOfThreads="1" UseBatchDestroy="false" ReturnSingleResultSet="false" MappingsMetaDataURL="GetTransactions_toplink_mappings.xml" />
Register the project with ESB again.
You will receive a notification regarding successful registration.