I'm outputting a small amount of secondary summary information from a map
task that I want to use in the reduce phase of the job. This information is
keyed on a custom input split index.

Each map task outputs this summary information (less than hundred bytes per
input task). Note that the summary information isn't ready until the
completion of the map task.

Each reduce task needs to read this information (for all input splits) to
complete its task.

What is the best way to pass this information to the Reduce stage? I'm
working on java using cdhb2. Ideas I had include:

1. Output this data to MapContext.getWorkOutputPath(). However, that data
is not available anywhere in the reduce stage.
2. Output this data to "mapred.output.dir". The problem here is that the
map task writes immediately to this so failed jobs and speculative execution
could cause collision issues.
3. Output this data as in (1) and then use Mapper.cleanup() to copy these
files to "mapred.output.dir". Could work but I'm still a little concerned
about collision/race issues as I'm not clear about when a Map task becomes
"the" committed map task for that split.
4. Use an external system to hold this information and then just call that
system from both phases. This is basically an alternative of #3 and has the
same issues.

Are there suggested approaches of how to do this?

It seems like (1) might make the most sense if there is a defined way to
stream secondary outputs from all the mappers within the Reduce.setup()
method.

Thanks for any ideas.

Jacques

Search Discussions

  • Harsh J at Feb 14, 2011 at 2:55 am
    With just HDFS, IMO the good approach would be (2). See this FAQ on
    task-specific HDFS output directories you can use:
    http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F.
    It'd also be much easier to use the MultipleOutputs class (or other
    such utilities) for writing the extra data, as they also prefix -m- or
    -r- in the filenames, based on the task type.
    On Mon, Feb 14, 2011 at 1:48 AM, Jacques wrote:
    I'm outputting a small amount of secondary summary information from a map
    task that I want to use in the reduce phase of the job.  This information is
    keyed on a custom input split index.

    Each map task outputs this summary information (less than hundred bytes per
    input task).  Note that the summary information isn't ready until the
    completion of the map task.

    Each reduce task needs to read this information (for all input splits) to
    complete its task.

    What is the best way to pass this information to the Reduce stage?  I'm
    working on java using cdhb2.   Ideas I had include:

    1. Output this data to MapContext.getWorkOutputPath().  However, that data
    is not available anywhere in the reduce stage.
    2. Output this data to "mapred.output.dir".  The problem here is that the
    map task writes immediately to this so failed jobs and speculative execution
    could cause collision issues.
    3. Output this data as in (1) and then use Mapper.cleanup() to copy these
    files to "mapred.output.dir".  Could work but I'm still a little concerned
    about collision/race issues as I'm not clear about when a Map task becomes
    "the" committed map task for that split.
    4. Use an external system to hold this information and then just call that
    system from both phases.  This is basically an alternative of #3 and has the
    same issues.

    Are there suggested approaches of how to do this?

    It seems like (1) might make the most sense if there is a defined way to
    stream secondary outputs from all the mappers within the Reduce.setup()
    method.

    Thanks for any ideas.

    Jacques


    --
    Harsh J
    www.harshj.com
  • Jacques at Feb 14, 2011 at 3:36 am
    It was my understanding based on the FAQ and my personal experience, that
    using the MutlipleOutputs class, or just relying on OutputComitter only
    works for the final phase of the job. (E.g. the reduce phase in a
    map+reduce job and the map phase only in the case of reducer=NONE). In the
    case I'm talking about, I want the map output to be committed and available
    to the reducers. If I understand the intricacies of MapReduce, the map
    output of a full map+reduce job is never put onto HDFS but is rather
    streamed directly from the mapper to the requesting reducers. To use (2)
    effectively, I only want to commit the secondary output to HDFS if the map
    task is completed successfully.

    This seems to either require:
    a) Assuming that the first time map.cleanup is called for a particular
    split, that it is the definitive call for that split (and thus commit the
    secondary information at that point)
    b) Or, somehow always commit the map output to directories named for that
    task attempt and then hook a delete of the map task output for those map
    tasks which weren't committed.

    Am I missing something and/or over-complicating things?

    Thanks for your help
    Jacques
    On Sun, Feb 13, 2011 at 6:54 PM, Harsh J wrote:

    With just HDFS, IMO the good approach would be (2). See this FAQ on
    task-specific HDFS output directories you can use:

    http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F
    .
    It'd also be much easier to use the MultipleOutputs class (or other
    such utilities) for writing the extra data, as they also prefix -m- or
    -r- in the filenames, based on the task type.
    On Mon, Feb 14, 2011 at 1:48 AM, Jacques wrote:
    I'm outputting a small amount of secondary summary information from a map
    task that I want to use in the reduce phase of the job. This information is
    keyed on a custom input split index.

    Each map task outputs this summary information (less than hundred bytes per
    input task). Note that the summary information isn't ready until the
    completion of the map task.

    Each reduce task needs to read this information (for all input splits) to
    complete its task.

    What is the best way to pass this information to the Reduce stage? I'm
    working on java using cdhb2. Ideas I had include:

    1. Output this data to MapContext.getWorkOutputPath(). However, that data
    is not available anywhere in the reduce stage.
    2. Output this data to "mapred.output.dir". The problem here is that the
    map task writes immediately to this so failed jobs and speculative execution
    could cause collision issues.
    3. Output this data as in (1) and then use Mapper.cleanup() to copy these
    files to "mapred.output.dir". Could work but I'm still a little concerned
    about collision/race issues as I'm not clear about when a Map task becomes
    "the" committed map task for that split.
    4. Use an external system to hold this information and then just call that
    system from both phases. This is basically an alternative of #3 and has the
    same issues.

    Are there suggested approaches of how to do this?

    It seems like (1) might make the most sense if there is a defined way to
    stream secondary outputs from all the mappers within the Reduce.setup()
    method.

    Thanks for any ideas.

    Jacques


    --
    Harsh J
    www.harshj.com
  • Harsh J at Feb 14, 2011 at 4:12 am
    From my experience, writing data is possible using MO in both Map and
    Reduce sides of a single MR job. All data written to the MO name in
    map-side is committed just like it would if the job were a map-only
    job (there's no difference, since a map task does not wait for reduce
    tasks to begin - it is very independent of what the job plan is). Know
    that the MO uses direct record writers instead of the MapOutputBuffer
    class that is used in the case of the default collector in a
    Map+Reduce job (to write to local filesystem, for ReduceTask to pick
    up and use) and thus your data should be available in Reduce side if
    the framework guarantees that the Reduce operation never starts until
    all Map tasks have finished (which is the case right now).
    On Mon, Feb 14, 2011 at 9:05 AM, Jacques wrote:
    It was my understanding based on the FAQ and my personal experience, that
    using the MutlipleOutputs class, or just relying on OutputComitter only
    works for the final phase of the job.  (E.g. the reduce phase in a
    map+reduce job and the map phase only in the case of reducer=NONE).  In the
    case I'm talking about, I want the map output to be committed and available
    to the reducers.  If I understand the intricacies of MapReduce, the map
    output of a full map+reduce job is never put onto HDFS but is rather
    streamed directly from the mapper to the requesting reducers.  To use (2)
    effectively, I only want to commit the secondary output to HDFS if the map
    task is completed successfully.

    This seems to either require:
    a) Assuming that the first time map.cleanup is called for a particular
    split, that it is the definitive call for that split (and thus commit the
    secondary information at that point)
    b) Or, somehow always commit the map output to directories named for that
    task attempt and then hook a delete of the map task output for those map
    tasks which weren't committed.

    Am I missing something and/or over-complicating things?

    Thanks for your help
    Jacques
    On Sun, Feb 13, 2011 at 6:54 PM, Harsh J wrote:

    With just HDFS, IMO the good approach would be (2). See this FAQ on
    task-specific HDFS output directories you can use:

    http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F.
    It'd also be much easier to use the MultipleOutputs class (or other
    such utilities) for writing the extra data, as they also prefix -m- or
    -r- in the filenames, based on the task type.
    On Mon, Feb 14, 2011 at 1:48 AM, Jacques wrote:
    I'm outputting a small amount of secondary summary information from a
    map
    task that I want to use in the reduce phase of the job.  This
    information is
    keyed on a custom input split index.

    Each map task outputs this summary information (less than hundred bytes
    per
    input task).  Note that the summary information isn't ready until the
    completion of the map task.

    Each reduce task needs to read this information (for all input splits)
    to
    complete its task.

    What is the best way to pass this information to the Reduce stage?  I'm
    working on java using cdhb2.   Ideas I had include:

    1. Output this data to MapContext.getWorkOutputPath().  However, that
    data
    is not available anywhere in the reduce stage.
    2. Output this data to "mapred.output.dir".  The problem here is that
    the
    map task writes immediately to this so failed jobs and speculative
    execution
    could cause collision issues.
    3. Output this data as in (1) and then use Mapper.cleanup() to copy
    these
    files to "mapred.output.dir".  Could work but I'm still a little
    concerned
    about collision/race issues as I'm not clear about when a Map task
    becomes
    "the" committed map task for that split.
    4. Use an external system to hold this information and then just call
    that
    system from both phases.  This is basically an alternative of #3 and has
    the
    same issues.

    Are there suggested approaches of how to do this?

    It seems like (1) might make the most sense if there is a defined way to
    stream secondary outputs from all the mappers within the Reduce.setup()
    method.

    Thanks for any ideas.

    Jacques


    --
    Harsh J
    www.harshj.com


    --
    Harsh J
    www.harshj.com
  • Chris Douglas at Feb 14, 2011 at 4:05 am
    If these assumptions are correct:

    0) Each map outputs one result, a few hundred bytes
    1) The map output is deterministic, given an input split index
    2) Every reducer must see the result from every map

    Then just output the result N times, where N is the number of
    reducers, using a custom Partitioner that assigns the result to
    (records_seen++ % N), where records_seen is an int field on the
    partitioner.

    If (1) does not hold, then write the first stage as job with a single
    (optional) reduce, and the second stage as a map-only job processing
    the result. -C
    On Sun, Feb 13, 2011 at 12:18 PM, Jacques wrote:
    I'm outputting a small amount of secondary summary information from a map
    task that I want to use in the reduce phase of the job.  This information is
    keyed on a custom input split index.

    Each map task outputs this summary information (less than hundred bytes per
    input task).  Note that the summary information isn't ready until the
    completion of the map task.

    Each reduce task needs to read this information (for all input splits) to
    complete its task.

    What is the best way to pass this information to the Reduce stage?  I'm
    working on java using cdhb2.   Ideas I had include:

    1. Output this data to MapContext.getWorkOutputPath().  However, that data
    is not available anywhere in the reduce stage.
    2. Output this data to "mapred.output.dir".  The problem here is that the
    map task writes immediately to this so failed jobs and speculative execution
    could cause collision issues.
    3. Output this data as in (1) and then use Mapper.cleanup() to copy these
    files to "mapred.output.dir".  Could work but I'm still a little concerned
    about collision/race issues as I'm not clear about when a Map task becomes
    "the" committed map task for that split.
    4. Use an external system to hold this information and then just call that
    system from both phases.  This is basically an alternative of #3 and has the
    same issues.

    Are there suggested approaches of how to do this?

    It seems like (1) might make the most sense if there is a defined way to
    stream secondary outputs from all the mappers within the Reduce.setup()
    method.

    Thanks for any ideas.

    Jacques
  • Jason at Feb 14, 2011 at 4:23 am
    I think this kind of partitioner is a little hackish. More straight forward approach is to emit the extra data N times under special keys and write a partitioner that would recognize these keys and dispatch them accordingly between partitions 0..N-1
    Also if this data needs to be shipped to reducers upfront, it could be easily done using custom sort comparator

    Sent from my iPhone
    On Feb 13, 2011, at 8:05 PM, Chris Douglas wrote:

    If these assumptions are correct:

    0) Each map outputs one result, a few hundred bytes
    1) The map output is deterministic, given an input split index
    2) Every reducer must see the result from every map

    Then just output the result N times, where N is the number of
    reducers, using a custom Partitioner that assigns the result to
    (records_seen++ % N), where records_seen is an int field on the
    partitioner.

    If (1) does not hold, then write the first stage as job with a single
    (optional) reduce, and the second stage as a map-only job processing
    the result. -C
    On Sun, Feb 13, 2011 at 12:18 PM, Jacques wrote:
    I'm outputting a small amount of secondary summary information from a map
    task that I want to use in the reduce phase of the job. This information is
    keyed on a custom input split index.

    Each map task outputs this summary information (less than hundred bytes per
    input task). Note that the summary information isn't ready until the
    completion of the map task.

    Each reduce task needs to read this information (for all input splits) to
    complete its task.

    What is the best way to pass this information to the Reduce stage? I'm
    working on java using cdhb2. Ideas I had include:

    1. Output this data to MapContext.getWorkOutputPath(). However, that data
    is not available anywhere in the reduce stage.
    2. Output this data to "mapred.output.dir". The problem here is that the
    map task writes immediately to this so failed jobs and speculative execution
    could cause collision issues.
    3. Output this data as in (1) and then use Mapper.cleanup() to copy these
    files to "mapred.output.dir". Could work but I'm still a little concerned
    about collision/race issues as I'm not clear about when a Map task becomes
    "the" committed map task for that split.
    4. Use an external system to hold this information and then just call that
    system from both phases. This is basically an alternative of #3 and has the
    same issues.

    Are there suggested approaches of how to do this?

    It seems like (1) might make the most sense if there is a defined way to
    stream secondary outputs from all the mappers within the Reduce.setup()
    method.

    Thanks for any ideas.

    Jacques
  • Jacques at Feb 14, 2011 at 4:55 am
    Everybody, thanks for all the help.

    Chris/Jason, while 1) assumption is actually incorrect for my situation.
    Nonetheless, I can see how one would basically use a dynamic-typing approach
    to sending the additional data as a first keys for each partition. It seems
    less than elegant but doable.

    The solution Harsh J provides seems to be more elegant but I need to spend
    some more time understanding exactly what the interplay between paths and
    comitting a job using MultipleOutputs is. My first and second passes have
    failed but it seems to indeed be committing the output of the map task even
    when it is in a map+reduce job. Like most things, I just need to spend some
    more time digging through the code at a more detailed level. I will come
    back if I have more questions.

    Per the final assumption that Harsh J mentions, "the framework guarantees
    that the Reduce operation never starts until
    all Map tasks have finished". It seems that, while pre-shuffling could be
    possible to get ready for running the reduce, the reduce would always have
    to wait unless a Partitioner could guarantee that a particular partition was
    complete before all maps were completed. Is there talk of this changing
    somehow?

    Thanks again,
    Jacques

    On Sun, Feb 13, 2011 at 8:22 PM, Jason wrote:

    I think this kind of partitioner is a little hackish. More straight forward
    approach is to emit the extra data N times under special keys and write a
    partitioner that would recognize these keys and dispatch them accordingly
    between partitions 0..N-1
    Also if this data needs to be shipped to reducers upfront, it could be
    easily done using custom sort comparator

    Sent from my iPhone
    On Feb 13, 2011, at 8:05 PM, Chris Douglas wrote:

    If these assumptions are correct:

    0) Each map outputs one result, a few hundred bytes
    1) The map output is deterministic, given an input split index
    2) Every reducer must see the result from every map

    Then just output the result N times, where N is the number of
    reducers, using a custom Partitioner that assigns the result to
    (records_seen++ % N), where records_seen is an int field on the
    partitioner.

    If (1) does not hold, then write the first stage as job with a single
    (optional) reduce, and the second stage as a map-only job processing
    the result. -C
    On Sun, Feb 13, 2011 at 12:18 PM, Jacques wrote:
    I'm outputting a small amount of secondary summary information from a
    map
    task that I want to use in the reduce phase of the job. This
    information is
    keyed on a custom input split index.

    Each map task outputs this summary information (less than hundred bytes
    per
    input task). Note that the summary information isn't ready until the
    completion of the map task.

    Each reduce task needs to read this information (for all input splits)
    to
    complete its task.

    What is the best way to pass this information to the Reduce stage? I'm
    working on java using cdhb2. Ideas I had include:

    1. Output this data to MapContext.getWorkOutputPath(). However, that
    data
    is not available anywhere in the reduce stage.
    2. Output this data to "mapred.output.dir". The problem here is that
    the
    map task writes immediately to this so failed jobs and speculative
    execution
    could cause collision issues.
    3. Output this data as in (1) and then use Mapper.cleanup() to copy
    these
    files to "mapred.output.dir". Could work but I'm still a little
    concerned
    about collision/race issues as I'm not clear about when a Map task
    becomes
    "the" committed map task for that split.
    4. Use an external system to hold this information and then just call
    that
    system from both phases. This is basically an alternative of #3 and has
    the
    same issues.

    Are there suggested approaches of how to do this?

    It seems like (1) might make the most sense if there is a defined way to
    stream secondary outputs from all the mappers within the Reduce.setup()
    method.

    Thanks for any ideas.

    Jacques
  • Chris Douglas at Feb 14, 2011 at 1:44 pm

    On Sun, Feb 13, 2011 at 8:22 PM, Jason wrote:
    I think this kind of partitioner is a little hackish. More straight forward approach is to emit the extra data N times under special keys and write a partitioner that would recognize these keys and dispatch them accordingly between partitions 0..N-1
    Also if this data needs to be shipped to reducers upfront, it could be easily done using custom sort comparator
    As listed in the assumptions, I thought each map emits only one datum
    that must be read by every reduce. Not one special datum among the
    normal output. Changing the output record type to add the partition
    struck me as overly formal, so the hackish solution seemed
    appropriate. If the summary data complement record data emitted from
    the map, then composing the job as you describe is standard.

    However, if the map is non-deterministic, then- again- all of the
    output (not just the summary data) from the first stage must go to
    durable storage (i.e. HDFS), or re-executions will yield inconsistent
    results. I haven't set up MO to effect the shuffle in HDFS as Harsh
    describes, but it could be made to work. -C

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupmapreduce-user @
categorieshadoop
postedFeb 13, '11 at 8:18p
activeFeb 14, '11 at 1:44p
posts8
users4
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase