The current hadoop implementation shuffles directly to disk and then those
disk files are eventually requested by the target nodes which are
responsible for doing the reduce() on the intermediate data.

However, this requires more 2x IO than strictly necessary.

If the data were instead shuffled DIRECTLY to the target host, this IO
overhead would be removed.

I believe that any benefits from writing locally (compressing, combining)
and then doing a transfer can be had by simply allocating a buffer and (say
250-500MB per map task) and then transfering data directly. I don't think
that the savings will be 100% on par with first writing locally but
remember it's already 2x faster by not having to write to disk... so any
advantages to first shuffling to the local disk would have to be more than
100% faster.

However, writing data to the local disk first could in theory had some
practical advantages under certain loads. I just don't think they're
practical and that direct shuffling is superior.

Anyone have any thoughts here?

Search Discussions

  • Todd Lipcon at Dec 21, 2011 at 12:54 am
    The advantages of the "pull" based shuffle is fault tolerance - if you
    shuffle to the reducer and then the reducer dies, you have to rerun
    *all* of the earlier maps in the "push" model.

    The advantage of writing to disk is of course that you can have more
    intermediate output than fits in RAM.

    In practice, for short jobs, the output might stay entirely in buffer
    cache and never actually hit disk (RHEL by default configures the
    writeback period to 30 seconds when there isn't page cache pressure).

    One possible optimization I hope to look into next year is to change
    the map output code to push the data to the local TT, which would have
    configurable in-memory buffers. Only once those overflow would they
    flush to disk. Compared to just using buffer cache, this has the
    advantage that it won't _ever_ writeback unless it has to for space
    consumption reasons, and is more predictable to manage. My guess is we
    could squeeze some performance here but not tons.

    -Todd
    On Tue, Dec 20, 2011 at 3:55 PM, Kevin Burton wrote:
    The current hadoop implementation shuffles directly to disk and then those
    disk files are eventually requested by the target nodes which are
    responsible for doing the reduce() on the intermediate data.

    However, this requires more 2x IO than strictly necessary.

    If the data were instead shuffled DIRECTLY to the target host, this IO
    overhead would be removed.

    I believe that any benefits from writing locally (compressing, combining)
    and then doing a transfer can be had by simply allocating a buffer and (say
    250-500MB per map task) and then transfering data directly.  I don't think
    that the savings will be 100% on par with first writing locally but remember
    it's already 2x faster by not having to write to disk... so any advantages
    to first shuffling to the local disk would have to be more than 100% faster.

    However, writing data to the local disk first could in theory had some
    practical advantages under certain loads.  I just don't think they're
    practical and that direct shuffling is superior.

    Anyone have any thoughts here?


    --
    Todd Lipcon
    Software Engineer, Cloudera
  • Kevin Burton at Dec 21, 2011 at 2:00 am

    On Tue, Dec 20, 2011 at 4:53 PM, Todd Lipcon wrote:

    The advantages of the "pull" based shuffle is fault tolerance - if you
    shuffle to the reducer and then the reducer dies, you have to rerun
    *all* of the earlier maps in the "push" model.
    you would have the same situation if you aren't replicating the blocks in
    the mapper.

    in my situation I'm replicating the shuffle data so it should be a zero sum
    game.

    The map jobs are just re-run where the last one failed since the shuffle
    data has already been written.

    (I should note that I'm working on another Map Reduce implementation that
    I'm about to OSS)...

    There are a LOT of problems in the map reduce space which are themselves
    research papers and it would be nice to see more published in this area.

    The advantage of writing to disk is of course that you can have more
    intermediate output than fits in RAM.
    well if you're shuffling across the network and you back up due to network
    IO then your map jobs would just run slower.

    In practice, for short jobs, the output might stay entirely in buffer
    cache and never actually hit disk (RHEL by default configures the
    writeback period to 30 seconds when there isn't page cache pressure).
    Or just start to block when memory is exhausted.
  • Arun C Murthy at Dec 21, 2011 at 1:00 am

    On Dec 20, 2011, at 3:55 PM, Kevin Burton wrote:

    The current hadoop implementation shuffles directly to disk and then those disk files are eventually requested by the target nodes which are responsible for doing the reduce() on the intermediate data.

    However, this requires more 2x IO than strictly necessary.

    If the data were instead shuffled DIRECTLY to the target host, this IO overhead would be removed.
    We've discussed 'push' v/s 'pull' shuffle multiple times and each time turned away due to complexities in MR1. With MRv2 (YARN) this would be much more doable.

    IAC...

    A single reducer, in typical (well-designed?) applications, process multiple gigabytes of data across thousands of maps.

    So, to really not do any disk i/o during the shuffle you'd need very large amounts of RAM...

    Also, currently, the shuffle is effected by the reduce task. This has two major benefits :
    # The 'pull' can only be initiated after the reduce is scheduled. The 'push' model would be hampered if the reduce hasn't been started.
    # The 'pull' is more resilient to failure of a single reduce. In the push model, it's harder to deal with a reduce failing after a push from the map.

    Again, with MR2 we could experiment with push v/s pull where it makes sense (small jobs etc.). I'd love to mentor/help someone interested in putting cycles into it.

    Arun
  • Kevin Burton at Dec 21, 2011 at 2:13 am

    We've discussed 'push' v/s 'pull' shuffle multiple times and each time
    turned away due to complexities in MR1. With MRv2 (YARN) this would be much
    more doable.
    Ah.... gotcha. This is what I expected as well. It would be interesting to
    see a list of changes like this in MR1 vs MR2 to see what could POTENTIALLY
    happen in the future should everyone get spare time.

    So, to really not do any disk i/o during the shuffle you'd need very large
    amounts of RAM...
    Why is that? I'm not suggesting buffering it *all* but send it directly
    when it is generated.

    I think there should be a SMALL amount of buffer for combining , and
    compressing the data though. Normally like 250-500MB per mapper but this
    is when running say a 250GB job so this buffer is just to reduce IO sent to
    the remote node.

    Also, currently, the shuffle is effected by the reduce task. This has two
    major benefits :
    # The 'pull' can only be initiated after the reduce is scheduled. The
    'push' model would be hampered if the reduce hasn't been started.
    I've gone over this problem a number of times. The way I'm handling it is
    that ever map attempt is recorded and only successful maps actually have
    their data reduced. You end up having MORE intermediate data if machines
    are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
    clusters with lots of crashes this won't end up being a significant
    percentage of the data.

    # The 'pull' is more resilient to failure of a single reduce. In the push
    model, it's harder to deal with a reduce
    failing after a push from the map.
    I don't see how this would be the case ... I'm replicating all the shuffle
    data ... so if a reducer crashes I just startup a new one.

    There IS the problem of whether we replicate the intermediate data from the
    reducer but this can be a configuration option...

    Again, with MR2 we could experiment with push v/s pull where it makes
    sense (small jobs etc.). I'd love to mentor/help someone interested in
    putting cycles into it.
    I'm going to be doing a ton of work in this area and I'll publish it if I
    come across anything interesting.
  • Binglin Chang at Dec 21, 2011 at 6:07 am

    One possible optimization I hope to look into next year is to change
    the map output code to push the data to the local TT, which would have
    configurable in-memory buffers.

    Have someone ever considered a general data transfer service bundled with
    YARN? So other applications(rather than MR) can also benefits from it.
    The data transfer service looks like real world mail service, it has two
    simple interface: register send receive (stream based)
    Mapper & Recuder: Register(LocalMailService, Address(AppId,
    MapperId/ReducerId)
    Mapper: send(LocalMailService, from=Address(AppId, MapperId),
    to=Address(AppId, ReducerId), data=xxx);
    Reducer: recv(LocalMailService, from=Address(AppId, MapperId),
    to=Address(AppId, ReducerId));

    LocalMailService manages a big buffer(configurable), so it can cache map
    outputs or dump to disk if there is no memory;
    LocalMailService can start transfering data from source to dest if both
    addresses is registered;
    If source & dest are in the same machine, there will be no network transfer;
    To address can be multiple(broadcast or like mail group), this is useful
    for 1:N data transfers(binary/side data distribution), the service can use
    P2P for this kind of work(much better than -cacheFile)

    Just an idea, if anyone interested.


    On Wed, Dec 21, 2011 at 10:12 AM, Kevin Burton wrote:


    We've discussed 'push' v/s 'pull' shuffle multiple times and each time
    turned away due to complexities in MR1. With MRv2 (YARN) this would be much
    more doable.
    Ah.... gotcha. This is what I expected as well. It would be interesting
    to see a list of changes like this in MR1 vs MR2 to see what could
    POTENTIALLY happen in the future should everyone get spare time.

    So, to really not do any disk i/o during the shuffle you'd need very
    large amounts of RAM...
    Why is that? I'm not suggesting buffering it *all* but send it directly
    when it is generated.

    I think there should be a SMALL amount of buffer for combining , and
    compressing the data though. Normally like 250-500MB per mapper but this
    is when running say a 250GB job so this buffer is just to reduce IO sent to
    the remote node.

    Also, currently, the shuffle is effected by the reduce task. This has two
    major benefits :
    # The 'pull' can only be initiated after the reduce is scheduled. The
    'push' model would be hampered if the reduce hasn't been started.
    I've gone over this problem a number of times. The way I'm handling it is
    that ever map attempt is recorded and only successful maps actually have
    their data reduced. You end up having MORE intermediate data if machines
    are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
    clusters with lots of crashes this won't end up being a significant
    percentage of the data.

    # The 'pull' is more resilient to failure of a single reduce. In the push
    model, it's harder to deal with a reduce
    failing after a push from the map.
    I don't see how this would be the case ... I'm replicating all the shuffle
    data ... so if a reducer crashes I just startup a new one.

    There IS the problem of whether we replicate the intermediate data from
    the reducer but this can be a configuration option...

    Again, with MR2 we could experiment with push v/s pull where it makes
    sense (small jobs etc.). I'd love to mentor/help someone interested in
    putting cycles into it.
    I'm going to be doing a ton of work in this area and I'll publish it if I
    come across anything interesting.

  • Kevin Burton at Dec 21, 2011 at 8:33 am
    Twister, another mapred impl, uses a pub/sub based system like ActiveMQ ...
    Peregrine , the mapred impl I've been working on just uses HTTP, Netty, and
    async IO to do the same thing.

    Note that you mention a BUFFER ... just buffering the IO is not enough.
    Most jobs will be larger than memory AND you have to transfer the data ...
    so use the buffer to compress and combine the data before it is sent.
    On Tue, Dec 20, 2011 at 10:07 PM, Binglin Chang wrote:

    One possible optimization I hope to look into next year is to change
    the map output code to push the data to the local TT, which would have
    configurable in-memory buffers.

    Have someone ever considered a general data transfer service bundled with
    YARN? So other applications(rather than MR) can also benefits from it.
    The data transfer service looks like real world mail service, it has two
    simple interface: register send receive (stream based)
    Mapper & Recuder: Register(LocalMailService, Address(AppId,
    MapperId/ReducerId)
    Mapper: send(LocalMailService, from=Address(AppId, MapperId),
    to=Address(AppId, ReducerId), data=xxx);
    Reducer: recv(LocalMailService, from=Address(AppId, MapperId),
    to=Address(AppId, ReducerId));

    LocalMailService manages a big buffer(configurable), so it can cache map
    outputs or dump to disk if there is no memory;
    LocalMailService can start transfering data from source to dest if both
    addresses is registered;
    If source & dest are in the same machine, there will be no network
    transfer;
    To address can be multiple(broadcast or like mail group), this is useful
    for 1:N data transfers(binary/side data distribution), the service can use
    P2P for this kind of work(much better than -cacheFile)

    Just an idea, if anyone interested.


    On Wed, Dec 21, 2011 at 10:12 AM, Kevin Burton wrote:


    We've discussed 'push' v/s 'pull' shuffle multiple times and each time
    turned away due to complexities in MR1. With MRv2 (YARN) this would be much
    more doable.
    Ah.... gotcha. This is what I expected as well. It would be interesting
    to see a list of changes like this in MR1 vs MR2 to see what could
    POTENTIALLY happen in the future should everyone get spare time.

    So, to really not do any disk i/o during the shuffle you'd need very
    large amounts of RAM...
    Why is that? I'm not suggesting buffering it *all* but send it directly
    when it is generated.

    I think there should be a SMALL amount of buffer for combining , and
    compressing the data though. Normally like 250-500MB per mapper but this
    is when running say a 250GB job so this buffer is just to reduce IO sent to
    the remote node.

    Also, currently, the shuffle is effected by the reduce task. This has two
    major benefits :
    # The 'pull' can only be initiated after the reduce is scheduled. The
    'push' model would be hampered if the reduce hasn't been started.
    I've gone over this problem a number of times. The way I'm handling it
    is that ever map attempt is recorded and only successful maps actually have
    their data reduced. You end up having MORE intermediate data if machines
    are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
    clusters with lots of crashes this won't end up being a significant
    percentage of the data.

    # The 'pull' is more resilient to failure of a single reduce. In the
    push model, it's harder to deal with a reduce
    failing after a push from the map.
    I don't see how this would be the case ... I'm replicating all the
    shuffle data ... so if a reducer crashes I just startup a new one.

    There IS the problem of whether we replicate the intermediate data from
    the reducer but this can be a configuration option...

    Again, with MR2 we could experiment with push v/s pull where it makes
    sense (small jobs etc.). I'd love to mentor/help someone interested in
    putting cycles into it.
    I'm going to be doing a ton of work in this area and I'll publish it if I
    come across anything interesting.


    --
    --

    Founder/CEO Spinn3r.com <http://spinn3r.com/>

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupmapreduce-user @
categorieshadoop
postedDec 20, '11 at 11:56p
activeDec 21, '11 at 8:33a
posts7
users4
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase