O Scaling Out Your Data Grid Aggregations Linearly

Coherence provides a data grid by dynamically, transparently, and automatically partitioning the data set across all storage enabled nodes in a cluster. We have been doing some scale out testing on our new Dual 2.3GHz PowerPC G5 Xserve cluster and here is one of the tests that we have performed using the data grid aggregation feature.

The new InvocableMap tightly binds the concepts of a data grid (that is, partitioned cache) and the processing of the data stored in the grid. When you take the InvocableMap and combine it with the linear scalability of Coherence itself you get an extremely powerful solution. The following tests show that you can take an application that Coherence provides you (the developer, the engineer, the architect, and so on) the ability to build an application when that can scale out to handle any SLA requirement, any increase in throughput requirements. For example, the following test demonstrate Coherence's ability to linearly increase the number of trades aggregated per second as you increase hardware. That is, if one machine can aggregate X trades per second, if you add a second machine to the data grid you will be able to aggregate 2X trades per second, if you add a third machine to the data grid you will be able to aggregate 3X trades per second and so on.

All of the Data Grid capabilities described below are features of Coherence Enterprise Edition and higher.

O.1 The Data

First, we need some data to aggregate. Example O-1 illustrates a Trade object with a three properties Id, Price, and Symbol.

Example O-1 Trade Object Defining Three Properties

package com.tangosol.examples.coherence.data;


import com.tangosol.util.Base;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.io.ExternalizableLite;

import java.io.IOException;
import java.io.NotActiveException;
import java.io.DataInput;
import java.io.DataOutput;


/**
* Example Trade class
*
* @author erm 2005.12.27
*/
public class Trade
        extends Base
        implements ExternalizableLite
    {
    /**
    * Default Constructor
    */
    public Trade()
        {
        }

    public Trade(int iId, double dPrice, String sSymbol)
        {
        setId(iId);
        setPrice(dPrice);
        setSymbol(sSymbol);
        }

    public int getId()
        {
        return m_iId;
        }

    public void setId(int iId)
        {
        m_iId = iId;
        }

    public double getPrice()
        {
        return m_dPrice;
        }

    public void setPrice(double dPrice)
        {
        m_dPrice = dPrice;
        }

    public String getSymbol()
        {
        return m_sSymbol;
        }

    public void setSymbol(String sSymbol)
        {
        m_sSymbol = sSymbol;
        }

    /**
    * Restore the contents of this object by loading the object's state from the
    * passed DataInput object.
    *
    * @param in the DataInput stream to read data from to restore the
    *           state of this object
    *
    * @throws IOException        if an I/O exception occurs
    * @throws NotActiveException if the object is not in its initial state, and
    *                            therefore cannot be deserialized into
    */
    public void readExternal(DataInput in)
            throws IOException
        {
        m_iId     = ExternalizableHelper.readInt(in);
        m_dPrice  = in.readDouble();
        m_sSymbol = ExternalizableHelper.readSafeUTF(in);
        }

    /**
    * Save the contents of this object by storing the object's state into the
    * passed DataOutput object.
    *
    * @param out the DataOutput stream to write the state of this object to
    *
    * @throws IOException if an I/O exception occurs
    */
    public void writeExternal(DataOutput out)
            throws IOException
        {
        ExternalizableHelper.writeInt(out, m_iId);
        out.writeDouble(m_dPrice);
        ExternalizableHelper.writeSafeUTF(out, m_sSymbol);
        }

    private int    m_iId;
    private double m_dPrice;
    private String m_sSymbol;
    }

O.2 Configure a Partitioned Cache

The cache configuration is easy through the XML Cache Configuration Elements. Example O-2 defines one wildcard cache-mapping that maps to one caching-scheme which has unlimited capacity:

Example O-2 Mapping a cache-mapping to a caching-scheme with Unlimited Capacity

<?xml version="1.0"?>

<!DOCTYPE cache-config SYSTEM "cache-config.dtd">

<cache-config>
  <caching-scheme-mapping>
    <cache-mapping>
      <cache-name>*</cache-name>
      <scheme-name>example-distributed</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>

  <caching-schemes>
    <!-- 
    Distributed caching scheme.
    -->
    <distributed-scheme>
      <scheme-name>example-distributed</scheme-name>
      <service-name>DistributedCache</service-name>

      <backing-map-scheme>
        <class-scheme>
          <scheme-ref>unlimited-backing-map</scheme-ref>
        </class-scheme>
      </backing-map-scheme>

      <autostart>true</autostart>
    </distributed-scheme>

    <!-- 
    Backing map scheme definition used by all the caches that do 
    not require any eviction policies 
    -->
    <class-scheme>
      <scheme-name>unlimited-backing-map</scheme-name>

      <class-name>com.tangosol.util.SafeHashMap</class-name>
      <init-params></init-params>
    </class-scheme>
  </caching-schemes>
</cache-config>

O.3 Add an Index to the Price Property

Example O-3 illustrates the code to add an index to the Price property. Adding an index to this property increases performance by allowing Coherence to access the values directly rather than having to deserialize each item to accomplish the calculation

Example O-3 Adding an Index to the Price Property

ReflectionExtractor extPrice  = new ReflectionExtractor("getPrice");
m_cache.addIndex(extPrice, true, null);

In our tests the aggregation speed was increased by more than 2x after an index was applied.

O.4 Code to perform a Parallel Aggregation

Example O-4 illustrates the code to perform a parallel aggregation across all JVMs in the data grid. The aggregation is initiated and results received by a single client. That is, a single "low-power" client is able to use the full processing power of the cluster/data grid in aggregate to perform this aggregation in parallel with just one line of code.

Example O-4 Perform a Parallel Aggregation Across all JVMs in the Grid

Double DResult;
DResult = (Double) m_cache.aggregate((Filter) null, new DoubleSum("getPrice"));

O.5 The Testing Environment and Process

O.5.1 Performing a "Test Run"

A test run does several things:

  1. Loads 200,000 trade objects into the data grid.

  2. Adds indexes to Price property.

  3. Performs a parallel aggregation of all trade objects stored in the data grid. This aggregation step is done 20 times to obtain an "average run time" to ensure that the test takes into account garbage collection.

  4. Loads 400,000 trade objects into the data grid.

  5. Repeats steps 2 and 3.

  6. Loads 600,000 trade objects into the data grid.

  7. Repeats steps 2 and 3.

  8. Loads 800,000 trade objects into the data grid.

  9. Repeats steps 2 and 3.

  10. Loads 1,000,000 trade objects into the data grid.

  11. Repeats steps 2 and 3.

Client Considerations: The test client itself is run on an Intel Core Duo iMac which is marked as local storage disabled. The command line used to start the client was:

java ... -Dtangosol.coherence.distributed.localstorage=false -Xmx128m -Xms128m com.tangosol.examples.coherence.invocable.TradeTest

O.5.2 This "Test Suite" (and Subsequent Results) Includes Data from Four "Test Runs":

  1. Start 4 JVMs on one Xserve - Perform a "test run"

  2. Start 4 JVMs on each of two Xserves - Perform a "test run"

  3. Start 4 JVMs on each of three Xserves - Perform a "test run"

  4. Start 4 JVMs on each of four Xserves - Perform a "test run"

Server Considerations: In this case a "JVM" refers to a cache server instance (that is, a data grid node) that is a standalone JVM responsible for managing/storing the data. I used the DefaultCacheServer helper class to accomplish this.

The command line used to start the server was:

java ... -Xmx384m -Xms384m -server com.tangosol.net.DefaultCacheServer

O.5.3 JDK Version

The JDK used on both the client and the servers was Java 2 Runtime Environment, Standard Edition (build 1.5.0_05-84)

O.6 The Results

As you can see in the following graph the average aggregation time for the aggregations decreases linearly as more cache servers/machines are added to the data grid!

Note:

The lowest and highest times were not used in the calculations below resulting in a data set of eighteen results used to create an average.

Figure O-1 Average Aggregation Time

Description of Figure O-1 follows
Description of "Figure O-1 Average Aggregation Time"

Similarly, the following graph illustrates how the aggregations per second scales linearly as you add more machines! When moving from 1 machine to 2 machines the trades aggregated per second double, when moving from 2 machines to 4 machines the trades aggregated per second double again.

Figure O-2 Aggregation Scale-Out

Description of Figure O-2 follows
Description of "Figure O-2 Aggregation Scale-Out"

Note:

FAILOVER!

The above aggregations will complete successfully and correctly even if one of the cache servers or and entire machine fails during the aggregation!

O.7 Conclusion

Combining the Coherence data grid (that is, partitioned cache) with the InvocableMap features enables:

  • Applications to scale out data grid calculations linearly;

  • Groups to meet increasingly aggressive SLAs by dynamically/transparently adding more resources to the data grid. That is, if you need to achieve 1,837,932 trade aggregations per second all that is required is to start 16 more cache servers across four more machines.