dev@jax-ws.java.net

Re: XMLStreamReader recycling <was> [Fwd: CVS update [mr-21]: /jax-ws-sources/jaxws-ri/rt/src/com/sun/xml/ws/message/stream/]

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Tue, 16 Jan 2007 18:32:40 +0100

Hi.

Can you pls. review XMLStreamReaderFactory I'm proposing. I made changes
to reuse it for FastInfoset's XMLStreamReader.
Actually for now I made Zephyr's factory implementation to be able to
work as common pool for all possible implementation of XMLStreamReader.
In this case seems name could be changed...

WBR,
Alexey.


Paul Sandoz wrote:
> Kohsuke Kawaguchi wrote:
>> Paul Sandoz wrote:
>>> This recycling is related to the same issue for the FI StAX parsers.
>>>
>>> I have not looked deeply at the code but it looks like the right
>>> place to support the recycling of XML *and* FI XMLStreamReader
>>> instances.
>>
>> Yeah, we can do that. The easiet way is to have FI's XMLStreamReader
>> implement RecycleAware. If needed we can move this interface on stax-ex
>> to do this.
>>
>
> A FI XMLStreamReader instance automatically resets itself when an
> InputStream is set so it is not strictly necessary to implement this
> interface.
>
>
>>> I am wondering if it is possible to also recycle stream buffers i.e.
>>> when the message is no longer used.
>>
>> I think we should first check if it's indeed costly.
>
> Agreed, profiling for cases where non-security header blocks are
> present should tell is more.
>
> The reason i have a hunch is the fact that XML StAX parser creation
> showed up in the profiler mainly because it created a whole bunch of
> memory that could be reused. The same will happen with the
> streambuffer for headers.
>
> Paul.
>
>> So far, it doesn't seem like this is a big enough problem. Pooling
>> has its own overhead and problems, too.
>>
>> Finding out when a Message is no longer in use is harder than finding
>> that out for XMLStreamReader, so doing this is more work. We need
>> benefits that justify this work.
>>
>


package com.sun.xml.ws.api.streaming;

import com.sun.istack.NotNull;
import com.sun.istack.Nullable;
import com.sun.xml.ws.streaming.XMLReaderException;
import java.util.HashMap;
import java.util.Map;
import org.xml.sax.InputSource;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;

/**
 * Factory for {_at_link XMLStreamReader}.
 *
 * <p>
 * This wraps {_at_link XMLInputFactory} and allows us to reuse {_at_link XMLStreamReader} instances
 * when appropriate.
 *
 * @author Kohsuke Kawaguchi
 */
public abstract class XMLStreamReaderFactory {
    
    /**
     * Singleton instance.
     */
    private static volatile @NotNull XMLStreamReaderFactory theInstance;
    
    static {
        XMLInputFactory xif = XMLInputFactory.newInstance();
        xif.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, true);
        
        XMLStreamReaderFactory f=null;
        
        // this system property can be used to disable the pooling altogether,
        // in case someone hits an issue with pooling in the production system.
        if(!Boolean.getBoolean(XMLStreamReaderFactory.class.getName()+".noPool"))
            f = Zephyr.newInstance(xif);
        if(f==null)
            f = new Default(xif);
        
        theInstance = f;
    }
    
    /**
     * Overrides the singleton {_at_link XMLStreamReaderFactory} instance that
     * the JAX-WS RI uses.
     */
    public static void set(XMLStreamReaderFactory f) {
        if(f==null) throw new IllegalArgumentException();
        theInstance = f;
    }
    
    public static XMLStreamReaderFactory get() {
        return theInstance;
    }
    
    public static XMLStreamReader create(InputSource source, boolean rejectDTDs) {
        try {
            // Char stream available?
            if (source.getCharacterStream() != null) {
                return get().doCreate(source.getSystemId(), source.getCharacterStream(), rejectDTDs);
            }
            
            // Byte stream available?
            if (source.getByteStream() != null) {
                return get().doCreate(source.getSystemId(), source.getByteStream(), rejectDTDs);
            }
            
            // Otherwise, open URI
            return get().doCreate(source.getSystemId(), new URL(source.getSystemId()).openStream(),rejectDTDs);
        } catch (IOException e) {
            throw new XMLReaderException("stax.cantCreate",e);
        }
    }
    
    public static XMLStreamReader create(@Nullable String systemId, InputStream in, boolean rejectDTDs) {
        return get().doCreate(systemId,in,rejectDTDs);
    }
    
    public static XMLStreamReader create(@Nullable String systemId, Reader reader, boolean rejectDTDs) {
        return get().doCreate(systemId,reader,rejectDTDs);
    }
    
    /**
     * Should be invoked when the code finished using an {_at_link XMLStreamReader}.
     *
     * <p>
     * If the recycled instance implements {_at_link RecycleAware},
     * {_at_link RecycleAware#onRecycled()} will be invoked to let the instance
     * know that it's being recycled.
     *
     * <p>
     * It is not a hard requirement to call this method on every {_at_link XMLStreamReader}
     * instance. Not doing so just reduces the performance by throwing away
     * possibly reusable instances. So the caller should always consider the effort
     * it takes to recycle vs the possible performance gain by doing so.
     *
     * <p>
     * This method may be invked by multiple threads concurrently.
     *
     * @param r
     * The {_at_link XMLStreamReader} instance that the caller finished using.
     * This could be any {_at_link XMLStreamReader} implementation, not just
     * the ones that were created from this factory. So the implementation
     * of this class needs to be aware of that.
     */
    public static void recycle(XMLStreamReader r) {
        get().doRecycle(r);
    }
    
    // implementations
    
    public abstract XMLStreamReader doCreate(String systemId, InputStream in, boolean rejectDTDs);
    
    public abstract XMLStreamReader doCreate(String systemId, Reader reader, boolean rejectDTDs);
    
    public abstract XMLStreamReader doCreate(Class<? extends XMLStreamReader> implClass);
    
    public abstract void doRecycle(XMLStreamReader r);
    
    /**
     * Interface that can be implemented by {_at_link XMLStreamReader} to
     * be notified when it's recycled.
     *
     * <p>
     * This provides a filtering {_at_link XMLStreamReader} an opportunity to
     * recycle its inner {_at_link XMLStreamReader}.
     */
    public interface RecycleAware {
        void onRecycled();
    }
    
    /**
     * {_at_link XMLStreamReaderFactory} implementation for SJSXP/JAXP RI.
     */
    public static final class Zephyr extends XMLStreamReaderFactory {
        private final XMLInputFactory xif;
        
        private final ThreadLocal<XMLStreamReader> pool = new ThreadLocal<XMLStreamReader>();
        
        private final ThreadLocal<Map<Class, XMLStreamReader>> customPool = new ThreadLocal<Map<Class, XMLStreamReader>>();
        
        /**
         * Sun StAX impl <code>XMLReaderImpl.setInputSource()</code> method via reflection.
         */
        private final Method setInputSourceMethod;
        
        /**
         * Sun StAX impl <code>XMLReaderImpl.reset()</code> method via reflection.
         */
        private final Method resetMethod;
        
        /**
         * The Sun StAX impl's {_at_link XMLStreamReader} implementation clas.
         */
        private final Class zephyrClass;
        
        /**
         * Creates {_at_link Zephyr} instance if the given {_at_link XMLInputFactory} is the one
         * from Zephyr.
         */
        public static @Nullable
                XMLStreamReaderFactory newInstance(XMLInputFactory xif) {
            // check if this is from Zephyr
            try {
                Class<?> clazz = xif.createXMLStreamReader(new StringReader("<foo/>")).getClass();
                
                if(!clazz.getName().startsWith("com.sun.xml.stream."))
                    return null; // nope
                
                return new Zephyr(xif,clazz);
            } catch (NoSuchMethodException e) {
                return null; // this factory is not for zephyr
            } catch (XMLStreamException e) {
                return null; // impossible to fail to parse <foo/>, but anyway
            }
        }
        
        public Zephyr(XMLInputFactory xif, Class clazz) throws NoSuchMethodException {
            zephyrClass = clazz;
            setInputSourceMethod = clazz.getMethod("setInputSource", InputSource.class);
            resetMethod = clazz.getMethod("reset");
            
            try {
                // Turn OFF internal factory caching in Zephyr.
                // Santiago told me that this makes it thread-safe.
                xif.setProperty("reuse-instance", false);
            } catch (IllegalArgumentException e) {
                // falls through
            }
            this.xif = xif;
        }
        
        /**
         * Fetchs an instance from the pool if available, otherwise null.
         */
        private @Nullable XMLStreamReader fetch() {
            XMLStreamReader sr = pool.get();
            if(sr==null) return null;
            pool.set(null);
            return sr;
        }
        
        private @Nullable XMLStreamReader fetch(Class implClass) {
            if (zephyrClass.equals(implClass)) return fetch();
            
            Map<Class, XMLStreamReader> implMap = getCustomPoolImplMap();
            
            XMLStreamReader sr = implMap.get(implClass);
            if(sr==null) return null;
            implMap.put(implClass, null);
            
            return sr;
        }
        
        public void doRecycle(XMLStreamReader r) {
            if(zephyrClass.isInstance(r)) {
                pool.set(r);
            } else {
                offerToCustomPool(r);
            }
            
            if(r instanceof RecycleAware)
                ((RecycleAware)r).onRecycled();
        }
        
        public XMLStreamReader doCreate(String systemId, InputStream in, boolean rejectDTDs) {
            try {
                XMLStreamReader xsr = fetch();
                if(xsr==null)
                    return xif.createXMLStreamReader(systemId,in);
                
                // try re-using this instance.
                InputSource is = new InputSource(systemId);
                is.setByteStream(in);
                reuse(xsr,is);
                return xsr;
            } catch (IllegalAccessException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            } catch (InvocationTargetException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            } catch (XMLStreamException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            }
        }
        
        public XMLStreamReader doCreate(String systemId, Reader in, boolean rejectDTDs) {
            try {
                XMLStreamReader xsr = fetch();
                if(xsr==null)
                    return xif.createXMLStreamReader(systemId,in);
                
                // try re-using this instance.
                InputSource is = new InputSource(systemId);
                is.setCharacterStream(in);
                reuse(xsr,is);
                return xsr;
            } catch (IllegalAccessException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            } catch (InvocationTargetException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            } catch (XMLStreamException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            }
        }
        
        public XMLStreamReader doCreate(Class<? extends XMLStreamReader> implClass) {
            XMLStreamReader xsr = fetch(implClass);
            if(xsr != null) {
                return xsr;
            }

            try {
                return implClass.newInstance();
            } catch (IllegalAccessException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            } catch (InstantiationException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            }
        }
        
        private void offerToCustomPool(XMLStreamReader xsr) {
            Map<Class, XMLStreamReader> implMap = getCustomPoolImplMap();
            
            implMap.put(xsr.getClass(), xsr);
        }
        
        private Map<Class, XMLStreamReader> getCustomPoolImplMap() {
            Map<Class, XMLStreamReader> implMap = customPool.get();
            if (implMap == null) {
                implMap = new HashMap<Class, XMLStreamReader>(2);
                customPool.set(implMap);
            }
            
            return implMap;
        }
        
        private void reuse(XMLStreamReader xsr, InputSource in) throws IllegalAccessException, InvocationTargetException {
            resetMethod.invoke(xsr);
            setInputSourceMethod.invoke(xsr,in);
        }
    }
    
    /**
     * Default {_at_link XMLStreamReaderFactory} implementation
     * that can work with any {_at_link XMLInputFactory}.
     *
     * <p>
     * {_at_link XMLInputFactory} is not required to be thread-safe, so the
     * create method on this implementation is synchronized.
     */
    public static final class Default extends NoLock {
        public Default(XMLInputFactory xif) {
            super(xif);
        }
        
        public synchronized XMLStreamReader doCreate(String systemId, InputStream in, boolean rejectDTDs) {
            return super.doCreate(systemId, in, rejectDTDs);
        }
        
        public synchronized XMLStreamReader doCreate(String systemId, Reader in, boolean rejectDTDs) {
            return super.doCreate(systemId, in, rejectDTDs);
        }
        
        public synchronized XMLStreamReader doCreate(Class<? extends XMLStreamReader> implClass) {
            return super.doCreate(implClass);
        }
    }
    
    /**
     * Similar to {_at_link Default} but doesn't do any synchronization.
     *
     * <p>
     * This is useful when you know your {_at_link XMLInputFactory} is thread-safe by itself.
     */
    public static class NoLock extends XMLStreamReaderFactory {
        private final XMLInputFactory xif;
        
        public NoLock(XMLInputFactory xif) {
            this.xif = xif;
        }
        
        public XMLStreamReader doCreate(String systemId, InputStream in, boolean rejectDTDs) {
            try {
                return xif.createXMLStreamReader(systemId,in);
            } catch (XMLStreamException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            }
        }
        
        public XMLStreamReader doCreate(String systemId, Reader in, boolean rejectDTDs) {
            try {
                return xif.createXMLStreamReader(systemId,in);
            } catch (XMLStreamException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            }
        }
        
        public XMLStreamReader doCreate(Class<? extends XMLStreamReader> implClass) {
            try {
                return implClass.newInstance();
            } catch (IllegalAccessException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            } catch (InstantiationException e) {
                throw new XMLReaderException("stax.cantCreate",e);
            }
        }
        
        public void doRecycle(XMLStreamReader r) {
            // there's no way to recycle with the default StAX API.
        }
    }
}