users@grizzly.java.net

Re: Grizzly in Hadoop

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Tue, 12 Jun 2007 12:56:13 -0400

Salut,

Devaraj Das wrote:
> Here is the core of the "service(Request, Response)" adapter method
>
> response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(<file_length>));
>
> int len = mapOutputIn.read(<from_a_file>);
> while (len > 0) {
> try {
> chunk.recycle();
> chunk.append(buffer, 0, len);
> response.doWrite(chunk);
> } catch (IOException ie) {
> isInputException = false;
> throw ie;
> }
> totalRead += len;
> if (totalRead == partLength) break;
> len = mapOutputIn.read(<from_a_file>);
> }
>
> By the way, we don't set content-length and expect the server to do chunked
> encoding (in some cases we could be serving very huge files in the order of
> 4-5 Gigs), and content-length header expects things to be 'int' (at least in
> jetty). So we work around this problem by defining our own custom http
> header that is a "long".

I can add an API for you if you want to use long (easy to do). Just file
an issue and I will make it for 1.5.2 :-) This is not the first time
this requirement are asked.


>
> Here is the core of the client code
>
> URLConnection connection = path.openConnection();
> if (timeout > 0) {
> connection.setConnectTimeout(timeout);
> connection.setReadTimeout(timeout);
> }
> InputStream input = connection.getInputStream();
> //get the content length from a custom http header
> long length =
> Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
>
> OutputStream output = fileSys.create(localFilename);
>
> int len = input.read(buffer);
> while (len > 0) {
> totalBytes += len;
> output.write(buffer, 0 , len);
> if (currentThread.isInterrupted()) {
> throw new InterruptedException();
> }
> if (totalBytes == length) break;
> len = input.read(buffer);
> }
> } finally {
> output.close();
> }
> } finally {
> input.close();
> }
>
> I am seeing exceptions of the form on the server (on quite a few nodes).
> This seems to be contributing to the performance degradation now. Any clue
> on this? Thanks much for the help.
>
> 2007-06-12 14:25:28,968 WARN org.apache.hadoop.mapred.TaskTracker:
> getMapOutput(task_0001_m_077205_1,971) failed :
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcher.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:104)
> at sun.nio.ch.IOUtil.write(IOUtil.java:75)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:334)
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:88)
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:57)
> at
> com.sun.grizzly.http.SocketChannelOutputBuffer.flushChannel(SocketChannelOut
> putBuffer.java:138)
> at
> com.sun.grizzly.http.SocketChannelOutputBuffer.realWriteBytes(SocketChannelO
> utputBuffer.java:125)
> at com.sun.grizzly.util.buf.ByteChunk.append(ByteChunk.java:331)
> at
> com.sun.grizzly.tcp.http11.InternalOutputBuffer$OutputStreamOutputBuffer.doW
> rite(InternalOutputBuffer.java:856)
> at
> com.sun.grizzly.tcp.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutput
> Filter.java:136)
> at
> com.sun.grizzly.tcp.http11.InternalOutputBuffer.doWrite(InternalOutputBuffer
> .java:614)
> at com.sun.grizzly.tcp.Response.doWrite(Response.java:587)
> at
> org.apache.hadoop.mapred.TaskTracker$MapOutput.service(TaskTracker.java:1944
> )
> at
> com.sun.grizzly.http.DefaultProcessorTask.invokeAdapter(DefaultProcessorTask
> .java:597)
> at
> com.sun.grizzly.http.DefaultProcessorTask.doProcess(DefaultProcessorTask.jav
> a:528)
> at
> com.sun.grizzly.http.DefaultProcessorTask.process(DefaultProcessorTask.java:
> 772)
> at
> com.sun.grizzly.http.SelectorThread$3.execute(SelectorThread.java:745)
> at
> com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolCh
> ain.java:74)

This exception just means the client is closing the connection before
the server has a chance to write the response. Are you seeing an
exception on the client side?

Thanks!

-- Jeanfrancois




>
> -----Original Message-----
> From: Devaraj Das [mailto:ddas_at_yahoo-inc.com]
> Sent: Tuesday, June 12, 2007 7:45 PM
> To: users_at_grizzly.dev.java.net; 'Scott Oaks'
> Cc: 'Owen O'Malley'; 'Sameer Paranjpye'
> Subject: RE: Grizzly in Hadoop
>
> Hi Jeanfrancois,
> I will get back to you in detail in some time from now. But for now, I just
> want to update you that setting the keepalivetimeout to 0 seemed to have
> improved performance. Thanks for that tip. Now, do you think, if we start
> using the persistent connections (transfer a batch of files at a time)
> feature, it would help us significantly?
> Also, will using multiple selectorReadThreads help the performance (since we
> have multiple CPUs on our machines)?
> I will send the source code of the adpater shortly...
> Thanks,
> Devaraj.
>
> -----Original Message-----
> From: Jeanfrancois.Arcand_at_Sun.COM [mailto:Jeanfrancois.Arcand_at_Sun.COM]
> Sent: Tuesday, June 12, 2007 6:25 PM
> To: users_at_grizzly.dev.java.net; Scott Oaks
> Subject: Re: Grizzly in Hadoop
>
> Hi,
>
> Devaraj Das wrote:
>> Hi,
>>
>> We are considering using Grizzly (1.5) in Hadoop (an open source
>> framework that has the MapReduce and Distributed File System
>> implementations). The main reason for using it is to optimize a
>> framework
> phase called "shuffle".
>> In this phase we move lots of data across the network.
>
> Cool :-) I know Hadoop as your Web 2.0 stack is also considering using it
> :-)
>
>> We are currently using HTTP for moving the data (actually files) and
>> we use Jetty5. Now we are thinking of moving to Grizzly (to have NIO
>> and all its niceness). But initial experiments with our benchmark
>> showed that with Grizzly the performance of the shuffle phase is
>> nearly the same as we have with Jetty5. This is not what we initially
>> expected and hence would like to get feedback on where we might be
>> going
> wrong.
>> Hadoop is designed to run on large clusters of 100s of nodes
>> (currently it can run stable/reliably in a 1K node cluster). From the
>> Grizzly point of view, what needs to be known is that each node has a
>> HTTP server. Both
>> Jetty5 and Grizzly provides the ability to have multiple handlers to
>> service the incoming requests.
>>
>> There are 2 clients on each node, and each client has a configurable
>> number of fetcher threads. The fetcher code is written using the
>> java.net.URLConnection API.
>> Every node has both the server and the clients. The threads basically
>> hit the HTTP server asking for specific files. They all do this at
>> once (with some randomness in the order of hosts, maybe).
>>
>> The benchmark that I tested with is a sort for ~5TB of data with a
>> cluster of 500 nodes. On the hardware side, I used a cluster that has
>> 4 dualcore processors in each machine. The machines are partitioned
>> into racks with a gigabit ethernet within the rack and 100Mbps across
>> the racks. There are roughly 78000 independent files spread across
>> these 500 nodes each of size ~60KB that the client pulls (and again we
> have two such clients per node).
>> So you can imagine you have a massive all-all communication happening.
>> The configuration for the server and client is as follows:
>> Grizzly configuration for port 9999
>> maxThreads: 100
>> minThreads: 1
>> ByteBuffer size: 8192
>> useDirectByteBuffer: false
>> useByteBufferView: false
>> maxHttpHeaderSize: 8192
>> maxKeepAliveRequests: 256
>> keepAliveTimeoutInSeconds: 10
>> Static File Cache enabled: true
>> Stream Algorithm :
>> com.sun.grizzly.http.algorithms.NoParsingAlgorithm
>> Pipeline : com.sun.grizzly.http.LinkedListPipeline
>> Round Robin Selector Algorithm enabled: false
>> Round Robin Selector pool size: 0
>> recycleTasks: true
>> Asynchronous Request Processing enabled: false I also tried
>> some configs with multiple selectorReadThreads but didn't make much
>> difference.
>
> keepAliveTimeoutInSeconds: 10 seems for me a little low...what is the reason
> to have such a low number? Closing faster idle connection?
>
>
>> The client has 30 fetcher threads and the way it is designed is that
>> only one fetch from any given host would happen at any point of time.
>> So if a server host, h1, has 'n' files that we should pull, we do that
>> one at a
> time
>> (as opposed to multiple threads hitting that server to fetch multiple
> files
>> in parallel).
>>
>> Also, we don't use the features of HTTP1.1 persistent connections or
>> pipelining. We fetch exactly one file and close the connection to the
>> server.
>
> That's answer my previous question :-) I would recommend setting the
> keepAliveTimeoutInSeconds=0 then as I'm sure the performance will improve
> (no call to the keep-alive subsystem).
>
>> With the above setup, the performance I see is not different from what
>> I
> see
>> with Jetty5.
>
> Can it be the benchmark itself that it not able to load more?
>
>
> I see a lot of Read timeouts on the client side (and we have a
>> backoff (for the server host) on the client implementation whenever we
> fail
>> to fetch a file). I also saw some exceptions of the form on the server:
>
> You seem to hit the epoll problem on Linux. I know there is a way to avoid
> using epoll (a property). I will ping the NIO team and let you know.
>
> Also, what exactly your Adapter implementation is doing? Can you share the
> code? If I can have access to your setup, I would like to see if using
> Grizzly 1.0.15 makes a difference (just to make sure we don't have a bug in
> 1.5....as far as I can tell, I.5 is as fast as 1.0 on my benchmark).
>
> Thanks,
>
> --Jeanfrancois
>
>
>> Jun 11, 2007 5:04:51 PM com.sun.grizzly.Controller doSelect
>> SEVERE: doSelect exception
>> java.io.IOException: Operation not permitted
>> at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method)
>> at
>>
> sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:202)
>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:183)
>> at
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>> at
>> com.sun.grizzly.TCPSelectorHandler.select(TCPSelectorHandler.java:277)
>> at com.sun.grizzly.Controller.doSelect(Controller.java:218)
>> at com.sun.grizzly.Controller.start(Controller.java:451)
>> at
>>
> com.sun.grizzly.http.SelectorThread.startListener(SelectorThread.java:1158)
>> at
>>
> com.sun.grizzly.http.SelectorThread.startEndpoint(SelectorThread.java:1121)
>> at
> com.sun.grizzly.http.SelectorThread.run(SelectorThread.java:1099)
>> Are we missing something in the configuration or something else?
>>
>> Thanks for the help.
>>
>> Regards,
>> Devaraj.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>