FAQ
Hi

I'm running on Hadoop 0.20.2 and I have a job with the following nature:
* Mapper outputs very large records (50 to 200 MB)
* Reducer (single) merges all those records together
* Map output key is a constant (could be a NullWritable, but currently it's
a LongWritable(1))
* Reducer doesn't care about the keys at all

I'm running into OOMs at the Sort phase. Previously, when I was on Hadoop
0.20.1, I ran into OOMs at the Shuffle phase, but that was due to a bug that
was fixed in MAPREDUCE-1182. The OOMs I'm seeing now are from IFile, line
342 in readNextBlock, where it tries to allocate a byte[] the size of the
key+value output and fails. Inspecting the code, I noticed it depends on
io.file.buffer.size -- it allocates a buffer that large which it attempts to
reuse. However, if the map key+value output's size is large, it allocates
another byte[] of that size, which is devastating.

I'm running with mapred.child.java.opts set to -Xmx4096m, and each machine
in my cluster has 16 GB RAM, as well as it's the only job that's running (at
the moment), so there's definitely a lot of RAM to use.

So I limited the output of the Mappers to be of size <64MB, which is my
io.file.buffer.size and now the job succeeds, but I feel that it's not the
best way to deal with it. First, because someone could easily configure the
job I write (it's a library, so someone can do whatever he wants with it) to
have less io.file.buffer.size, or larger map output, and things will break
again. Second, because I think there must be a better way to deal with it.

Searching around I found several parameters that might be related, but not
sure if they'll help much. Of those are controlling the spill percent,
lowering the size of the largest record that can fit in-memory in the
shuffle phase etc. They feel like voodoo to me, but I may be wrong :).

It'd be best if I can avoid the shuffle and sort phase entirely - they are
not needed for this job. I wrote a *NoSorter implements IndexedSorter* in
hope that it will improve things, but it didn't (it replaces QuickSort by
not doing any sort).

Also, I don't understand why do the values of the Reducer need to be read
into memory, if they most likely (unless you really insist) don't
participate in the sort and merge phase, only the keys matter (again, unless
you provide a Comparator for the values). If it must load stuff into memory,
then loading the keys/values in small chunks of say 64K, is better because
the JVM can really struggle to allocate 100MB of consecutive byte[].

Another thing that would work great is if Hadoop just feed the Reducer as
soon as Mappers' outputs are ready. This job does not care about sorting +
merging the outputs, so I could gain a lot of perf back if the Reducer would
kick in immediately, and not only after all Mappers are done.

I couldn't measure it precisely, but of the 50 minutes it took the Reducer
to finish (Mappers finished in 6 minutes !), more than half was spent in the
copy, shuffle and sort phases, which is insane.

Any help / advise / best practice instructions are greatly appreciated. If
any of these are fixed in a newer Hadoop release I'd love to hear about it
too. I cannot upgrade from 0.20.2 at the moment, but it'd be nice to know
those are already addressed.

Shai

Search Discussions

  • Harsh J at Apr 14, 2011 at 6:50 pm
    Hello Shai,
    On Fri, Apr 15, 2011 at 12:01 AM, Shai Erera wrote:
    Hi
    I'm running on Hadoop 0.20.2 and I have a job with the following nature:
    * Mapper outputs very large records (50 to 200 MB)
    * Reducer (single) merges all those records together
    * Map output key is a constant (could be a NullWritable, but currently it's
    a LongWritable(1))
    * Reducer doesn't care about the keys at all
    If I understand right, your single reducer's only work is to merge
    your multiple map's large record emits, and nothing else (It does not
    have 'keys' to worry about), correct?

    Why not do this with a normal FS-using program that opens a single
    file to write out map-materialized output files from a Map-only job to
    merge them?

    --
    Harsh J
  • Shai Erera at Apr 15, 2011 at 12:15 pm
    Thanks for the prompt response Harsh !

    The job is an indexing job. Each Mapper emits a small index and the Reducer
    merges all of those indexes together. The Mappers output the index as a
    Writable which serializes it. I guess I could write the Reducer's function
    as a separate class as you suggest, but then I'll need to write a custom
    OutputFormat that will put those indexes on HDFS or somewhere?

    That complicates matters for me -- currently, when this job is run as part
    of a sequence of jobs, I can guarantee that if the job succeeds, then the
    indexes are successfully merged, and if it fails, the job should be
    restarted. While that can be achieved with a separate FS-using program as
    you suggest, it complicates matters.

    Is my scenario that extreme? Would you say the common scenario for Hadoop
    are jobs that output tiny objects between Mappers and Reducers?

    Would this work much better if I work w/ several Reducers? I'm not sure it
    will because the problem lies, IMO, in Hadoop allocating large consecutive
    chunks of RAM in my case, instead of trying to either stream it or break it
    down to smaller chunks.

    Is there absolutely no way to bypass the shuffle + sort phases? I don't mind
    writing some classes if that's what it takes ...

    Shai
    On Thu, Apr 14, 2011 at 9:50 PM, Harsh J wrote:

    Hello Shai,
    On Fri, Apr 15, 2011 at 12:01 AM, Shai Erera wrote:
    Hi
    I'm running on Hadoop 0.20.2 and I have a job with the following nature:
    * Mapper outputs very large records (50 to 200 MB)
    * Reducer (single) merges all those records together
    * Map output key is a constant (could be a NullWritable, but currently it's
    a LongWritable(1))
    * Reducer doesn't care about the keys at all
    If I understand right, your single reducer's only work is to merge
    your multiple map's large record emits, and nothing else (It does not
    have 'keys' to worry about), correct?

    Why not do this with a normal FS-using program that opens a single
    file to write out map-materialized output files from a Map-only job to
    merge them?

    --
    Harsh J
  • Harsh J at Apr 15, 2011 at 4:34 pm
    Hello Shai,
    On Fri, Apr 15, 2011 at 5:45 PM, Shai Erera wrote:
    The job is an indexing job. Each Mapper emits a small index and the Reducer
    merges all of those indexes together. The Mappers output the index as a
    Writable which serializes it. I guess I could write the Reducer's function
    as a separate class as you suggest, but then I'll need to write a custom
    OutputFormat that will put those indexes on HDFS or somewhere?
    I was thinking of a simple Java program that works with HDFS there,
    not a Map/Reduce one (although you can tweak Map-only jobs a bit to
    run a single mapper alone, which can then go ahead and do the same).
    Your Mapper can open one out-file, get a list of all previous job's
    output files, and perform the merge reading them one by one. This
    would bypass using a Reduce phase.
    That complicates matters for me -- currently, when this job is run as part
    of a sequence of jobs, I can guarantee that if the job succeeds, then the
    indexes are successfully merged, and if it fails, the job should be
    restarted. While that can be achieved with a separate FS-using program as
    you suggest, it complicates matters.
    I agree that the suggestion could complicate your workflow a bit.
    Although, it is doable by Map-only job as I mentioned right above
    (which may make it a bit more acceptable?).
    Is my scenario that extreme? Would you say the common scenario for Hadoop
    are jobs that output tiny objects between Mappers and Reducers?

    Would this work much better if I work w/ several Reducers? I'm not sure it
    will because the problem lies, IMO, in Hadoop allocating large consecutive
    chunks of RAM in my case, instead of trying to either stream it or break it
    down to smaller chunks.
    Large outputs are alright, but I wouldn't say they are alright for
    simple merging since it would all go through the sort phase with about
    twice the I/O ultimately. Using multiple reducers can help a bit if
    you do not mind partitioned results at the end.
    Is there absolutely no way to bypass the shuffle + sort phases? I don't mind
    writing some classes if that's what it takes ...
    Shuffle is an essential part of the Map to Reduce transition, it can't
    be 'bypassed' since a Reducer has to fetch all map-outputs to begin
    with. Sort/Group may be made dummy as you had done, but can't be
    disabled altogether AFAIK. The latter has been bought up on the lists
    before, if I remember right; but am not aware of an implementation
    alongside that could do that (just begin reducing merely partitioned,
    unsorted data).

    --
    Harsh J
  • Chris Douglas at Apr 15, 2011 at 6:05 pm

    On Fri, Apr 15, 2011 at 9:34 AM, Harsh J wrote:
    Is there absolutely no way to bypass the shuffle + sort phases? I don't mind
    writing some classes if that's what it takes ...
    Shuffle is an essential part of the Map to Reduce transition, it can't
    be 'bypassed' since a Reducer has to fetch all map-outputs to begin
    with. Sort/Group may be made dummy as you had done, but can't be
    disabled altogether AFAIK. The latter has been bought up on the lists
    before, if I remember right; but am not aware of an implementation
    alongside that could do that (just begin reducing merely partitioned,
    unsorted data).
    The sort also effects the partitioning, so completely disabling the
    sort (as above) will only work with 1 reducer.

    If only grouping is important, then a bijective f(key) that is
    inexpensive to sort is canonical. Though more efficient grouping
    methods are possible, in practice this captures most of the possible
    performance improvement.

    If neither sorting nor grouping are important, then a comparator that
    always asserts that its operands are equal will effect the
    partitioning, but each reducer will receive all its records in one
    iterator. Note also that the key portion of the record will be
    incorrect in the old API.

    However, as Harsh correctly points out, this doesn't appear to be the
    bottleneck in your job. The data motion for records of tens or
    hundreds of MB is patently inefficient, and OOMs are a regrettable but
    relatively minor consequence. If you can change your job to handle
    metadata backed by a store in HDFS, then your job can merge the
    indices instead of merging GB of record data. In other words, pass a
    reference to the record data and not the actual.

    If the job neither sorts nor groups, what is the format for the index?
    Instead of a reduce phase, a second, single-map job that concatenates
    the output of the first seems better fit (assuming the goal is a
    single file). -C
  • Shai Erera at Apr 16, 2011 at 3:36 am
    bq. If you can change your job to handle metadata backed by a store in HDFS

    I have two Mappers, one that works with HDFS and one with GPFS. The GPFS one
    does exactly that -- it stores the indexes in GPFS (which all Mappers and
    Reducers see, as a shared location) and outputs just the pointer to that
    location. Then, Hadoop just merges key=LongWritable and value=Text, and
    indeed it works better (the job runs ~170% faster).

    The value (index) is a collection of files, though my Writable writes them
    as a single stream, so in essence I can make them look a single file. I've
    never worked with HDFS before (only GPFS), and HDFS is a new requirement.
    Can you point out some example code / classes I should use to achieve the
    same trick? Will I need, in my Mapper, to specifically call FileSystem API
    to store the index, or is a Writable enough?

    Shai
    On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas wrote:
    On Fri, Apr 15, 2011 at 9:34 AM, Harsh J wrote:
    Is there absolutely no way to bypass the shuffle + sort phases? I don't
    mind
    writing some classes if that's what it takes ...
    Shuffle is an essential part of the Map to Reduce transition, it can't
    be 'bypassed' since a Reducer has to fetch all map-outputs to begin
    with. Sort/Group may be made dummy as you had done, but can't be
    disabled altogether AFAIK. The latter has been bought up on the lists
    before, if I remember right; but am not aware of an implementation
    alongside that could do that (just begin reducing merely partitioned,
    unsorted data).
    The sort also effects the partitioning, so completely disabling the
    sort (as above) will only work with 1 reducer.

    If only grouping is important, then a bijective f(key) that is
    inexpensive to sort is canonical. Though more efficient grouping
    methods are possible, in practice this captures most of the possible
    performance improvement.

    If neither sorting nor grouping are important, then a comparator that
    always asserts that its operands are equal will effect the
    partitioning, but each reducer will receive all its records in one
    iterator. Note also that the key portion of the record will be
    incorrect in the old API.

    However, as Harsh correctly points out, this doesn't appear to be the
    bottleneck in your job. The data motion for records of tens or
    hundreds of MB is patently inefficient, and OOMs are a regrettable but
    relatively minor consequence. If you can change your job to handle
    metadata backed by a store in HDFS, then your job can merge the
    indices instead of merging GB of record data. In other words, pass a
    reference to the record data and not the actual.

    If the job neither sorts nor groups, what is the format for the index?
    Instead of a reduce phase, a second, single-map job that concatenates
    the output of the first seems better fit (assuming the goal is a
    single file). -C
  • Chris Douglas at Apr 18, 2011 at 7:02 am
    I don't understand your job, but the Writable interface is just a
    format for record serialization. If your mapper generates
    <URI,offset,length> tuples referencing into data written in HDFS, that
    is sufficient to open the stream in the reducer using the FileSystem
    API. Writing an OutputFormat that interprets that tuple as a range of
    bytes to read from HDFS and write to a single stream should not
    diverge too far from the OutputFormats bundled with Hadoop. You might
    start there.

    Again, it's not clear what your goal is or what you mean by "index".
    Are the input records changed before being written by the reduce? Or
    is the purpose of this job only to concatenate index files? -C
    On Fri, Apr 15, 2011 at 8:35 PM, Shai Erera wrote:
    bq. If you can change your job to handle metadata backed by a store in HDFS
    I have two Mappers, one that works with HDFS and one with GPFS. The GPFS one
    does exactly that -- it stores the indexes in GPFS (which all Mappers and
    Reducers see, as a shared location) and outputs just the pointer to that
    location. Then, Hadoop just merges key=LongWritable and value=Text, and
    indeed it works better (the job runs ~170% faster).
    The value (index) is a collection of files, though my Writable writes them
    as a single stream, so in essence I can make them look a single file. I've
    never worked with HDFS before (only GPFS), and HDFS is a new requirement.
    Can you point out some example code / classes I should use to achieve the
    same trick? Will I need, in my Mapper, to specifically call FileSystem API
    to store the index, or is a Writable enough?
    Shai
    On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas wrote:
    On Fri, Apr 15, 2011 at 9:34 AM, Harsh J wrote:
    Is there absolutely no way to bypass the shuffle + sort phases? I don't
    mind
    writing some classes if that's what it takes ...
    Shuffle is an essential part of the Map to Reduce transition, it can't
    be 'bypassed' since a Reducer has to fetch all map-outputs to begin
    with. Sort/Group may be made dummy as you had done, but can't be
    disabled altogether AFAIK. The latter has been bought up on the lists
    before, if I remember right; but am not aware of an implementation
    alongside that could do that (just begin reducing merely partitioned,
    unsorted data).
    The sort also effects the partitioning, so completely disabling the
    sort (as above) will only work with 1 reducer.

    If only grouping is important, then a bijective f(key) that is
    inexpensive to sort is canonical. Though more efficient grouping
    methods are possible, in practice this captures most of the possible
    performance improvement.

    If neither sorting nor grouping are important, then a comparator that
    always asserts that its operands are equal will effect the
    partitioning, but each reducer will receive all its records in one
    iterator. Note also that the key portion of the record will be
    incorrect in the old API.

    However, as Harsh correctly points out, this doesn't appear to be the
    bottleneck in your job. The data motion for records of tens or
    hundreds of MB is patently inefficient, and OOMs are a regrettable but
    relatively minor consequence. If you can change your job to handle
    metadata backed by a store in HDFS, then your job can merge the
    indices instead of merging GB of record data. In other words, pass a
    reference to the record data and not the actual.

    If the job neither sorts nor groups, what is the format for the index?
    Instead of a reduce phase, a second, single-map job that concatenates
    the output of the first seems better fit (assuming the goal is a
    single file). -C
  • Shai Erera at Apr 18, 2011 at 10:42 am
    Thanks for your response Chris !

    An index, in this case a Lucene index, is a logical name for a collection of
    files. Each Mapper instance generates such index from the input it receives,
    and then the Reducer merges all of those indexes together, to output a
    single Lucene index.

    I ended up doing the following -- my HDFS Mapper creates an index in-memory
    and then serializes the in-memory index into a single file that is stored on
    HDFS (each Mapper serializes to a different file). I use FileSystem API to
    achieve that, so hopefully it's the way to do it. The Mapper outputs a Text
    value which is the location on HDFS. The Reducer then interprets that value
    and reads the file using FileSystem API, and deserialize it into an
    in-memory Lucene index.

    I still need to write an OutputCommitter which will get rid of those
    serialized-index-files (on HDFS) once the job completes successfully (e.g.
    in cleanupJob).

    Am I going in the right direction, or should I stop and rethink the
    approach?

    Shai
    On Mon, Apr 18, 2011 at 10:02 AM, Chris Douglas wrote:

    I don't understand your job, but the Writable interface is just a
    format for record serialization. If your mapper generates
    <URI,offset,length> tuples referencing into data written in HDFS, that
    is sufficient to open the stream in the reducer using the FileSystem
    API. Writing an OutputFormat that interprets that tuple as a range of
    bytes to read from HDFS and write to a single stream should not
    diverge too far from the OutputFormats bundled with Hadoop. You might
    start there.

    Again, it's not clear what your goal is or what you mean by "index".
    Are the input records changed before being written by the reduce? Or
    is the purpose of this job only to concatenate index files? -C
    On Fri, Apr 15, 2011 at 8:35 PM, Shai Erera wrote:
    bq. If you can change your job to handle metadata backed by a store in HDFS
    I have two Mappers, one that works with HDFS and one with GPFS. The GPFS one
    does exactly that -- it stores the indexes in GPFS (which all Mappers and
    Reducers see, as a shared location) and outputs just the pointer to that
    location. Then, Hadoop just merges key=LongWritable and value=Text, and
    indeed it works better (the job runs ~170% faster).
    The value (index) is a collection of files, though my Writable writes them
    as a single stream, so in essence I can make them look a single file. I've
    never worked with HDFS before (only GPFS), and HDFS is a new requirement.
    Can you point out some example code / classes I should use to achieve the
    same trick? Will I need, in my Mapper, to specifically call FileSystem API
    to store the index, or is a Writable enough?
    Shai
    On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas wrote:
    On Fri, Apr 15, 2011 at 9:34 AM, Harsh J wrote:
    Is there absolutely no way to bypass the shuffle + sort phases? I
    don't
    mind
    writing some classes if that's what it takes ...
    Shuffle is an essential part of the Map to Reduce transition, it can't
    be 'bypassed' since a Reducer has to fetch all map-outputs to begin
    with. Sort/Group may be made dummy as you had done, but can't be
    disabled altogether AFAIK. The latter has been bought up on the lists
    before, if I remember right; but am not aware of an implementation
    alongside that could do that (just begin reducing merely partitioned,
    unsorted data).
    The sort also effects the partitioning, so completely disabling the
    sort (as above) will only work with 1 reducer.

    If only grouping is important, then a bijective f(key) that is
    inexpensive to sort is canonical. Though more efficient grouping
    methods are possible, in practice this captures most of the possible
    performance improvement.

    If neither sorting nor grouping are important, then a comparator that
    always asserts that its operands are equal will effect the
    partitioning, but each reducer will receive all its records in one
    iterator. Note also that the key portion of the record will be
    incorrect in the old API.

    However, as Harsh correctly points out, this doesn't appear to be the
    bottleneck in your job. The data motion for records of tens or
    hundreds of MB is patently inefficient, and OOMs are a regrettable but
    relatively minor consequence. If you can change your job to handle
    metadata backed by a store in HDFS, then your job can merge the
    indices instead of merging GB of record data. In other words, pass a
    reference to the record data and not the actual.

    If the job neither sorts nor groups, what is the format for the index?
    Instead of a reduce phase, a second, single-map job that concatenates
    the output of the first seems better fit (assuming the goal is a
    single file). -C
  • Chris Douglas at Apr 19, 2011 at 6:47 pm

    On Mon, Apr 18, 2011 at 3:42 AM, Shai Erera wrote:
    I ended up doing the following -- my HDFS Mapper creates an index in-memory
    and then serializes the in-memory index into a single file that is stored on
    HDFS (each Mapper serializes to a different file). I use FileSystem API to
    achieve that, so hopefully it's the way to do it. The Mapper outputs a Text
    value which is the location on HDFS. The Reducer then interprets that value
    and reads the file using FileSystem API, and deserialize it into an
    in-memory Lucene index.
    Without knowing the format of a Lucene index, I can't say whether this
    approach makes sense. Instead of handling the cleanup yourself, you
    might consider running the index generation and the concat as separate
    parts of your workflow (as Harsh suggested). -C

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupmapreduce-user @
categorieshadoop
postedApr 14, '11 at 6:32p
activeApr 19, '11 at 6:47p
posts9
users3
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase