dev@glassfish.java.net

[gf-dev] Non-Blocking DataStructure for resource pooling

From: <Sven.Diedrichsen_at_edict.de>
Date: Wed, 3 Dec 2014 10:15:39 +0100

Hi everyone,

at my company we had a problem with a JDBC connection using the
RWDataStructure putting too many requesting threads into waits for
resources.
Our systems performance needs demanded a higher throuput than the above
mentioned data structure could deliver. So we wrote a non-blocking ds
which sacrifices the accuracy of the max size for higher performance. The
overall system performance in throuput was 3 times higher and the number
of max connections that could be created is the configured number of
connections plus the number of threads that can access the pool
concurrently.

Also the classloading of the custom datastructure within the
DataStructureFactory had to be modified.

What do you think about this attempt?

Cheers,

Sven Diedrichsen

-- code snipped

package com.sun.enterprise.resource.pool.datastructure;

import com.sun.appserv.connectors.internal.api.PoolingException;
import com.sun.enterprise.resource.ResourceHandle;
import com.sun.enterprise.resource.allocator.ResourceAllocator;
import com.sun.enterprise.resource.pool.ResourceHandler;
import
com.sun.enterprise.resource.pool.datastructure.strategy.ResourceSelectionStrategy
;
import com.sun.logging.LogDomains;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public class NonBlockingDataStructure implements DataStructure {

    private ResourceSelectionStrategy strategy;
    private final ResourceHandler handler;
    private final AtomicInteger maxSize;

    private final ConcurrentLinkedDeque<ResourceHandle>
freeResourceHandles = new ConcurrentLinkedDeque<ResourceHandle>();
    private final ConcurrentLinkedDeque<ResourceHandle> allResourceHandles
= new ConcurrentLinkedDeque<ResourceHandle>();

    protected final static Logger _logger =
            LogDomains.getLogger(NonBlockingDataStructure.class,
LogDomains.RSR_LOGGER);

    public NonBlockingDataStructure(String parameters, int maxSize,
                                    ResourceHandler handler, String
strategyClass) {
        this.maxSize = new AtomicInteger(maxSize);
        this.handler = handler;
        initializeStrategy(strategyClass);
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.log(Level.FINEST, "pool.datastructure.nblockds.init");
        }
    }

    private void initializeStrategy(String strategyClass) {
        //TODO
    }

    @Override
    public void setMaxSize(int i) {
        this.maxSize.set(i);
    }

    @Override
    public int addResource(ResourceAllocator resourceAllocator, int count)
throws PoolingException {
        if(_logger.isLoggable(Level.FINE)){
            _logger.fine("addResource for "+count+" resources.");
        }
        int addedResourceHandles = 0;
        try {
            for (int i = 0; i < count && allResourceHandles.size() <
maxSize.get(); i++) {
                ResourceHandle handle = handler
.createResource(resourceAllocator);
                allResourceHandles.offer(handle);
                freeResourceHandles.offer(handle);
                addedResourceHandles++;
            }
        } catch (Exception e) {
            PoolingException pe = new PoolingException(e.getMessage());
            pe.initCause(e);
            throw pe;
        }
        if(_logger.isLoggable(Level.FINE)){
            _logger.fine("addResource added "+addedResourceHandles+"
resources.");
        }
        return addedResourceHandles;
    }

    @Override
    public ResourceHandle getResource() {
        ResourceHandle resourceHandle = freeResourceHandles.pollFirst();
        if (resourceHandle != null) {
            resourceHandle.setBusy(true);
            if(_logger.isLoggable(Level.FINE)){
                _logger.fine("getResource found resource.");
            }
        }
        return resourceHandle;
    }

    @Override
    public void removeResource(ResourceHandle resourceHandle) {
        if(_logger.isLoggable(Level.FINE)){
            _logger.fine("removeResource called.");
        }
        allResourceHandles.remove(resourceHandle);
        freeResourceHandles.remove(resourceHandle);
        handler.deleteResource(resourceHandle);
    }

    @Override
    public void returnResource(ResourceHandle resourceHandle) {
        if(_logger.isLoggable(Level.FINE)){
            _logger.fine("returnResource called.");
        }
        resourceHandle.setBusy(false);
        freeResourceHandles.offerFirst(resourceHandle);
    }

    @Override
    public int getFreeListSize() {
        return freeResourceHandles.size();
    }

    @Override
    public void removeAll() {
        if(_logger.isLoggable(Level.FINE)){
            _logger.fine("removeAll called.");
        }
        Iterator<ResourceHandle> it = allResourceHandles.iterator();
        while (it.hasNext()) {
            ResourceHandle next = it.next();
            freeResourceHandles.remove(next);
            handler.deleteResource(next);
            it.remove();
        }
    }

    @Override
    public int getResourcesSize() {
        return allResourceHandles.size();
    }

    @Override
    public ArrayList<ResourceHandle> getAllResources() {
        throw new UnsupportedOperationException("Not supported yet.");
    }
}