users@grizzly.java.net

RE: Grizzly in Hadoop

From: Devaraj Das <ddas_at_yahoo-inc.com>
Date: Tue, 12 Jun 2007 20:16:47 +0530

Here is the core of the "service(Request, Response)" adapter method

        response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(<file_length>));

        int len = mapOutputIn.read(<from_a_file>);
        while (len > 0) {
          try {
            chunk.recycle();
            chunk.append(buffer, 0, len);
            response.doWrite(chunk);
          } catch (IOException ie) {
            isInputException = false;
            throw ie;
          }
          totalRead += len;
          if (totalRead == partLength) break;
          len = mapOutputIn.read(<from_a_file>);
        }

By the way, we don't set content-length and expect the server to do chunked
encoding (in some cases we could be serving very huge files in the order of
4-5 Gigs), and content-length header expects things to be 'int' (at least in
jetty). So we work around this problem by defining our own custom http
header that is a "long".

Here is the core of the client code

      URLConnection connection = path.openConnection();
      if (timeout > 0) {
        connection.setConnectTimeout(timeout);
        connection.setReadTimeout(timeout);
      }
      InputStream input = connection.getInputStream();
      //get the content length from a custom http header
      long length =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

      OutputStream output = fileSys.create(localFilename);

          int len = input.read(buffer);
          while (len > 0) {
            totalBytes += len;
            output.write(buffer, 0 , len);
            if (currentThread.isInterrupted()) {
              throw new InterruptedException();
            }
            if (totalBytes == length) break;
            len = input.read(buffer);
          }
        } finally {
          output.close();
        }
      } finally {
        input.close();
      }

I am seeing exceptions of the form on the server (on quite a few nodes).
This seems to be contributing to the performance degradation now. Any clue
on this? Thanks much for the help.

2007-06-12 14:25:28,968 WARN org.apache.hadoop.mapred.TaskTracker:
getMapOutput(task_0001_m_077205_1,971) failed :
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcher.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:104)
        at sun.nio.ch.IOUtil.write(IOUtil.java:75)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:334)
        at
com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:88)
        at
com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:57)
        at
com.sun.grizzly.http.SocketChannelOutputBuffer.flushChannel(SocketChannelOut
putBuffer.java:138)
        at
com.sun.grizzly.http.SocketChannelOutputBuffer.realWriteBytes(SocketChannelO
utputBuffer.java:125)
        at com.sun.grizzly.util.buf.ByteChunk.append(ByteChunk.java:331)
        at
com.sun.grizzly.tcp.http11.InternalOutputBuffer$OutputStreamOutputBuffer.doW
rite(InternalOutputBuffer.java:856)
        at
com.sun.grizzly.tcp.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutput
Filter.java:136)
        at
com.sun.grizzly.tcp.http11.InternalOutputBuffer.doWrite(InternalOutputBuffer
.java:614)
        at com.sun.grizzly.tcp.Response.doWrite(Response.java:587)
        at
org.apache.hadoop.mapred.TaskTracker$MapOutput.service(TaskTracker.java:1944
)
        at
com.sun.grizzly.http.DefaultProcessorTask.invokeAdapter(DefaultProcessorTask
.java:597)
        at
com.sun.grizzly.http.DefaultProcessorTask.doProcess(DefaultProcessorTask.jav
a:528)
        at
com.sun.grizzly.http.DefaultProcessorTask.process(DefaultProcessorTask.java:
772)
        at
com.sun.grizzly.http.SelectorThread$3.execute(SelectorThread.java:745)
        at
com.sun.grizzly.DefaultProtocolChain.executeProtocolFilter(DefaultProtocolCh
ain.java:74)

-----Original Message-----
From: Devaraj Das [mailto:ddas_at_yahoo-inc.com]
Sent: Tuesday, June 12, 2007 7:45 PM
To: users_at_grizzly.dev.java.net; 'Scott Oaks'
Cc: 'Owen O'Malley'; 'Sameer Paranjpye'
Subject: RE: Grizzly in Hadoop

Hi Jeanfrancois,
I will get back to you in detail in some time from now. But for now, I just
want to update you that setting the keepalivetimeout to 0 seemed to have
improved performance. Thanks for that tip. Now, do you think, if we start
using the persistent connections (transfer a batch of files at a time)
feature, it would help us significantly?
Also, will using multiple selectorReadThreads help the performance (since we
have multiple CPUs on our machines)?
I will send the source code of the adpater shortly...
Thanks,
Devaraj.

-----Original Message-----
From: Jeanfrancois.Arcand_at_Sun.COM [mailto:Jeanfrancois.Arcand_at_Sun.COM]
Sent: Tuesday, June 12, 2007 6:25 PM
To: users_at_grizzly.dev.java.net; Scott Oaks
Subject: Re: Grizzly in Hadoop

Hi,

Devaraj Das wrote:
> Hi,
>
> We are considering using Grizzly (1.5) in Hadoop (an open source
> framework that has the MapReduce and Distributed File System
> implementations). The main reason for using it is to optimize a
> framework
phase called "shuffle".
> In this phase we move lots of data across the network.

Cool :-) I know Hadoop as your Web 2.0 stack is also considering using it
:-)

>
> We are currently using HTTP for moving the data (actually files) and
> we use Jetty5. Now we are thinking of moving to Grizzly (to have NIO
> and all its niceness). But initial experiments with our benchmark
> showed that with Grizzly the performance of the shuffle phase is
> nearly the same as we have with Jetty5. This is not what we initially
> expected and hence would like to get feedback on where we might be
> going
wrong.
>
> Hadoop is designed to run on large clusters of 100s of nodes
> (currently it can run stable/reliably in a 1K node cluster). From the
> Grizzly point of view, what needs to be known is that each node has a
> HTTP server. Both
> Jetty5 and Grizzly provides the ability to have multiple handlers to
> service the incoming requests.
>
> There are 2 clients on each node, and each client has a configurable
> number of fetcher threads. The fetcher code is written using the
> java.net.URLConnection API.
> Every node has both the server and the clients. The threads basically
> hit the HTTP server asking for specific files. They all do this at
> once (with some randomness in the order of hosts, maybe).
>
> The benchmark that I tested with is a sort for ~5TB of data with a
> cluster of 500 nodes. On the hardware side, I used a cluster that has
> 4 dualcore processors in each machine. The machines are partitioned
> into racks with a gigabit ethernet within the rack and 100Mbps across
> the racks. There are roughly 78000 independent files spread across
> these 500 nodes each of size ~60KB that the client pulls (and again we
have two such clients per node).
> So you can imagine you have a massive all-all communication happening.
> The configuration for the server and client is as follows:
> Grizzly configuration for port 9999
> maxThreads: 100
> minThreads: 1
> ByteBuffer size: 8192
> useDirectByteBuffer: false
> useByteBufferView: false
> maxHttpHeaderSize: 8192
> maxKeepAliveRequests: 256
> keepAliveTimeoutInSeconds: 10
> Static File Cache enabled: true
> Stream Algorithm :
> com.sun.grizzly.http.algorithms.NoParsingAlgorithm
> Pipeline : com.sun.grizzly.http.LinkedListPipeline
> Round Robin Selector Algorithm enabled: false
> Round Robin Selector pool size: 0
> recycleTasks: true
> Asynchronous Request Processing enabled: false I also tried
> some configs with multiple selectorReadThreads but didn't make much
> difference.

keepAliveTimeoutInSeconds: 10 seems for me a little low...what is the reason
to have such a low number? Closing faster idle connection?


>
> The client has 30 fetcher threads and the way it is designed is that
> only one fetch from any given host would happen at any point of time.
> So if a server host, h1, has 'n' files that we should pull, we do that
> one at a
time
> (as opposed to multiple threads hitting that server to fetch multiple
files
> in parallel).
>
> Also, we don't use the features of HTTP1.1 persistent connections or
> pipelining. We fetch exactly one file and close the connection to the
> server.

That's answer my previous question :-) I would recommend setting the
keepAliveTimeoutInSeconds=0 then as I'm sure the performance will improve
(no call to the keep-alive subsystem).

>
> With the above setup, the performance I see is not different from what
> I
see
> with Jetty5.

Can it be the benchmark itself that it not able to load more?


I see a lot of Read timeouts on the client side (and we have a
> backoff (for the server host) on the client implementation whenever we
fail
> to fetch a file). I also saw some exceptions of the form on the server:

You seem to hit the epoll problem on Linux. I know there is a way to avoid
using epoll (a property). I will ping the NIO team and let you know.

Also, what exactly your Adapter implementation is doing? Can you share the
code? If I can have access to your setup, I would like to see if using
Grizzly 1.0.15 makes a difference (just to make sure we don't have a bug in
1.5....as far as I can tell, I.5 is as fast as 1.0 on my benchmark).

Thanks,

--Jeanfrancois


>
> Jun 11, 2007 5:04:51 PM com.sun.grizzly.Controller doSelect
> SEVERE: doSelect exception
> java.io.IOException: Operation not permitted
> at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method)
> at
>
sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:202)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:183)
> at
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
> at
> com.sun.grizzly.TCPSelectorHandler.select(TCPSelectorHandler.java:277)
> at com.sun.grizzly.Controller.doSelect(Controller.java:218)
> at com.sun.grizzly.Controller.start(Controller.java:451)
> at
>
com.sun.grizzly.http.SelectorThread.startListener(SelectorThread.java:1158)
> at
>
com.sun.grizzly.http.SelectorThread.startEndpoint(SelectorThread.java:1121)
> at
com.sun.grizzly.http.SelectorThread.run(SelectorThread.java:1099)
>
> Are we missing something in the configuration or something else?
>
> Thanks for the help.
>
> Regards,
> Devaraj.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
For additional commands, e-mail: users-help_at_grizzly.dev.java.net


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
For additional commands, e-mail: users-help_at_grizzly.dev.java.net