FAQ
Sometimes in reasonably large jobs, the last phase of the reduce will get
stuck (leaving the reduce fixed at 66%) in GC mode forever, at which point
the logs will fill with lines like

2008-12-15 20:37:13.864 INFO [Low Memory Detector]
org.apache.pig.impl.util.SpillableMemoryManager - low memory handler
called (Collection threshold exceeded) init = 5439488(5312K) used =
357957576(349567K) committed = 357957632(349568K) max =
357957632(349568K)

Sometimes the Pig process will stop reporting progress and will be killed by
Hadoop after 10 mins (even happened at 30m when I modified the Hadoop conf),
but sometimes it will go on indefinitely, apparently reporting progress but
never making any because it's sitting in GC.

I'm running Java in the Hadoop conf with -Xmx512M, so a reasonable heap
size, and have set 4 maps and 4 reduces per machine (some machines have 4 GB
ram, some have 8 GB). Would it be beneficial to modify Hadoop to use, say,
a gig of ram per job but do fewer jobs per machine? I'm concerned that
while that may get me through this job, it wouldn't get me through one 10x
its size; it feels like there's something more fundamental that I may be
doing wrong.

I would be interested in general in people's best practices for
optimization, and specifically on any suggestions as to how to fix or work
around this.

Thanks in advance,
Kevin

Search Discussions

  • Alan Gates at Dec 16, 2008 at 5:13 pm
    A couple of thoughts. One, at Yahoo we run with 1G of memory for our
    maps and reduces. Even this doesn't avoid spilling it can help avoid
    the spilling spilling for no benefit problem that you're seeing.
    Two, the biggest issue here is usually uneven distribution of keys to
    reducers because the key data itself is not evenly distributed. One
    work around for that, if you're doing something that can be broken
    up, is to pick additional key(s) that are reasonably independent of
    the first keys, and add those to your group by, then group again a
    second time by just the first keys, after the data has been reduced.
    For example:

    A = load 'myfile';
    B = group A by $0;
    C = foreach B generate group, COUNT(A);

    becomes

    A = load 'myfile';
    B = group A by $0, $1;
    C = foreach B generate flatten(group), COUNT(A)
    D = group C by $0;
    E = foreach D generate group, SUM(C.$2);

    Alan.
    On Dec 15, 2008, at 8:46 PM, Kevin Weil wrote:

    Sometimes in reasonably large jobs, the last phase of the reduce
    will get
    stuck (leaving the reduce fixed at 66%) in GC mode forever, at
    which point
    the logs will fill with lines like

    2008-12-15 20:37:13.864 INFO [Low Memory Detector]
    org.apache.pig.impl.util.SpillableMemoryManager - low memory handler
    called (Collection threshold exceeded) init = 5439488(5312K) used =
    357957576(349567K) committed = 357957632(349568K) max =
    357957632(349568K)

    Sometimes the Pig process will stop reporting progress and will be
    killed by
    Hadoop after 10 mins (even happened at 30m when I modified the
    Hadoop conf),
    but sometimes it will go on indefinitely, apparently reporting
    progress but
    never making any because it's sitting in GC.

    I'm running Java in the Hadoop conf with -Xmx512M, so a reasonable
    heap
    size, and have set 4 maps and 4 reduces per machine (some machines
    have 4 GB
    ram, some have 8 GB). Would it be beneficial to modify Hadoop to
    use, say,
    a gig of ram per job but do fewer jobs per machine? I'm concerned
    that
    while that may get me through this job, it wouldn't get me through
    one 10x
    its size; it feels like there's something more fundamental that I
    may be
    doing wrong.

    I would be interested in general in people's best practices for
    optimization, and specifically on any suggestions as to how to fix
    or work
    around this.

    Thanks in advance,
    Kevin
  • Ted Dunning at Dec 16, 2008 at 7:54 pm
    Sadly, this advice almost always coincides with cases where a combiner makes
    sense. There are cases where Pig can't figure this out, but it is getting
    better and better at it.
    On Tue, Dec 16, 2008 at 9:12 AM, Alan Gates wrote:

    A couple of thoughts. One, at Yahoo we run with 1G of memory for our maps
    and reduces. Even this doesn't avoid spilling it can help avoid the
    spilling spilling for no benefit problem that you're seeing. Two, the
    biggest issue here is usually uneven distribution of keys to reducers
    because the key data itself is not evenly distributed. One work around for
    that, if you're doing something that can be broken up, is to pick additional
    key(s) that are reasonably independent of the first keys, and add those to
    your group by, then group again a second time by just the first keys, after
    the data has been reduced. For example:

    A = load 'myfile';
    B = group A by $0;
    C = foreach B generate group, COUNT(A);

    becomes

    A = load 'myfile';
    B = group A by $0, $1;
    C = foreach B generate flatten(group), COUNT(A)
    D = group C by $0;
    E = foreach D generate group, SUM(C.$2);

    Alan.


    On Dec 15, 2008, at 8:46 PM, Kevin Weil wrote:

    Sometimes in reasonably large jobs, the last phase of the reduce will get
    stuck (leaving the reduce fixed at 66%) in GC mode forever, at which point
    the logs will fill with lines like

    2008-12-15 20:37:13.864 INFO [Low Memory Detector]
    org.apache.pig.impl.util.SpillableMemoryManager - low memory handler
    called (Collection threshold exceeded) init = 5439488(5312K) used =
    357957576(349567K) committed = 357957632(349568K) max =
    357957632(349568K)

    Sometimes the Pig process will stop reporting progress and will be killed
    by
    Hadoop after 10 mins (even happened at 30m when I modified the Hadoop
    conf),
    but sometimes it will go on indefinitely, apparently reporting progress
    but
    never making any because it's sitting in GC.

    I'm running Java in the Hadoop conf with -Xmx512M, so a reasonable heap
    size, and have set 4 maps and 4 reduces per machine (some machines have 4
    GB
    ram, some have 8 GB). Would it be beneficial to modify Hadoop to use,
    say,
    a gig of ram per job but do fewer jobs per machine? I'm concerned that
    while that may get me through this job, it wouldn't get me through one 10x
    its size; it feels like there's something more fundamental that I may be
    doing wrong.

    I would be interested in general in people's best practices for
    optimization, and specifically on any suggestions as to how to fix or work
    around this.

    Thanks in advance,
    Kevin

    --
    Ted Dunning, CTO
    DeepDyve
    4600 Bohannon Drive, Suite 220
    Menlo Park, CA 94025
    www.deepdyve.com
    650-324-0110, ext. 738
    858-414-0013 (m)
  • Kevin Weil at Dec 16, 2008 at 7:56 pm
    Alan,

    Thank you, that's a clever trick, and we are using it in some places. It
    will work in most cases, but it seems like it breaks down if the reduce
    involves using COUNT with DISTINCT on what ends up as C.$2 in your script
    below. In that case, if the quantities you group by have intersections,
    then the SUM will double-count things. For example, if your data-set looks
    like

    A = { (firefox, url_1, id_1), (firefox, url_1, id_2), (firefox, url_2,
    id_2), (ie, url_1, id_1), (ie, url_3, id_3) }

    and you want to compute counts of unique id by browser. The non-clever way
    to do it is to load, group by $0, and do a DISTINCT, COUNT in a nested
    foreach. This will give you

    C1 = { (firefox, 2), (ie, 2) }

    but if you group by browser and url, then distinct-count that, you'll get

    C2 = { (firefox, url_1, 2), (firefox, url_2, 1), (ie, url_1, 1), (ie, url_3,
    1) }

    and then doing the sum will give you

    E2 = { (firefox, 3), (ie, 2) }.

    I suppose doing a distinct but not a count in the C2 calculation will do
    some of the work in a distributed fashion. But if the keys are uneven, I'm
    not sure it saves that much, because then you have to combine bags to get E2
    correctly. Are there any analogues of your suggestion for the case
    involving distinct?

    Thanks,
    Kevin

    On Tue, Dec 16, 2008 at 9:12 AM, Alan Gates wrote:

    A couple of thoughts. One, at Yahoo we run with 1G of memory for our maps
    and reduces. Even this doesn't avoid spilling it can help avoid the
    spilling spilling for no benefit problem that you're seeing. Two, the
    biggest issue here is usually uneven distribution of keys to reducers
    because the key data itself is not evenly distributed. One work around for
    that, if you're doing something that can be broken up, is to pick additional
    key(s) that are reasonably independent of the first keys, and add those to
    your group by, then group again a second time by just the first keys, after
    the data has been reduced. For example:

    A = load 'myfile';
    B = group A by $0;
    C = foreach B generate group, COUNT(A);

    becomes

    A = load 'myfile';
    B = group A by $0, $1;
    C = foreach B generate flatten(group), COUNT(A)
    D = group C by $0;
    E = foreach D generate group, SUM(C.$2);

    Alan.


    On Dec 15, 2008, at 8:46 PM, Kevin Weil wrote:

    Sometimes in reasonably large jobs, the last phase of the reduce will get
    stuck (leaving the reduce fixed at 66%) in GC mode forever, at which point
    the logs will fill with lines like

    2008-12-15 20:37:13.864 INFO [Low Memory Detector]
    org.apache.pig.impl.util.SpillableMemoryManager - low memory handler
    called (Collection threshold exceeded) init = 5439488(5312K) used =
    357957576(349567K) committed = 357957632(349568K) max =
    357957632(349568K)

    Sometimes the Pig process will stop reporting progress and will be killed
    by
    Hadoop after 10 mins (even happened at 30m when I modified the Hadoop
    conf),
    but sometimes it will go on indefinitely, apparently reporting progress
    but
    never making any because it's sitting in GC.

    I'm running Java in the Hadoop conf with -Xmx512M, so a reasonable heap
    size, and have set 4 maps and 4 reduces per machine (some machines have 4
    GB
    ram, some have 8 GB). Would it be beneficial to modify Hadoop to use,
    say,
    a gig of ram per job but do fewer jobs per machine? I'm concerned that
    while that may get me through this job, it wouldn't get me through one 10x
    its size; it feels like there's something more fundamental that I may be
    doing wrong.

    I would be interested in general in people's best practices for
    optimization, and specifically on any suggestions as to how to fix or work
    around this.

    Thanks in advance,
    Kevin
  • Alan Gates at Dec 17, 2008 at 7:04 pm
    There aren't any more tricks than the one you describe. This may
    still work however. The biggest factor in having a long running
    reduce is having a key with many values, since pig tries to
    instantiate all those values in memory. Doing the distinct up front
    will help that. FWIW, changing pig to use the combiner in cases with
    distinct is high on our list of plans and something we hope to get to
    soon. This should remove the need to double group in these cases.

    Alan.
    On Dec 16, 2008, at 11:55 AM, Kevin Weil wrote:

    Alan,

    Thank you, that's a clever trick, and we are using it in some
    places. It
    will work in most cases, but it seems like it breaks down if the
    reduce
    involves using COUNT with DISTINCT on what ends up as C.$2 in your
    script
    below. In that case, if the quantities you group by have
    intersections,
    then the SUM will double-count things. For example, if your data-
    set looks
    like

    A = { (firefox, url_1, id_1), (firefox, url_1, id_2), (firefox, url_2,
    id_2), (ie, url_1, id_1), (ie, url_3, id_3) }

    and you want to compute counts of unique id by browser. The non-
    clever way
    to do it is to load, group by $0, and do a DISTINCT, COUNT in a nested
    foreach. This will give you

    C1 = { (firefox, 2), (ie, 2) }

    but if you group by browser and url, then distinct-count that,
    you'll get

    C2 = { (firefox, url_1, 2), (firefox, url_2, 1), (ie, url_1, 1),
    (ie, url_3,
    1) }

    and then doing the sum will give you

    E2 = { (firefox, 3), (ie, 2) }.

    I suppose doing a distinct but not a count in the C2 calculation
    will do
    some of the work in a distributed fashion. But if the keys are
    uneven, I'm
    not sure it saves that much, because then you have to combine bags
    to get E2
    correctly. Are there any analogues of your suggestion for the case
    involving distinct?

    Thanks,
    Kevin

    On Tue, Dec 16, 2008 at 9:12 AM, Alan Gates wrote:

    A couple of thoughts. One, at Yahoo we run with 1G of memory for
    our maps
    and reduces. Even this doesn't avoid spilling it can help avoid the
    spilling spilling for no benefit problem that you're seeing. Two,
    the
    biggest issue here is usually uneven distribution of keys to reducers
    because the key data itself is not evenly distributed. One work
    around for
    that, if you're doing something that can be broken up, is to pick
    additional
    key(s) that are reasonably independent of the first keys, and add
    those to
    your group by, then group again a second time by just the first
    keys, after
    the data has been reduced. For example:

    A = load 'myfile';
    B = group A by $0;
    C = foreach B generate group, COUNT(A);

    becomes

    A = load 'myfile';
    B = group A by $0, $1;
    C = foreach B generate flatten(group), COUNT(A)
    D = group C by $0;
    E = foreach D generate group, SUM(C.$2);

    Alan.


    On Dec 15, 2008, at 8:46 PM, Kevin Weil wrote:

    Sometimes in reasonably large jobs, the last phase of the reduce
    will get
    stuck (leaving the reduce fixed at 66%) in GC mode forever, at
    which point
    the logs will fill with lines like

    2008-12-15 20:37:13.864 INFO [Low Memory Detector]
    org.apache.pig.impl.util.SpillableMemoryManager - low memory handler
    called (Collection threshold exceeded) init = 5439488(5312K) used =
    357957576(349567K) committed = 357957632(349568K) max =
    357957632(349568K)

    Sometimes the Pig process will stop reporting progress and will
    be killed
    by
    Hadoop after 10 mins (even happened at 30m when I modified the
    Hadoop
    conf),
    but sometimes it will go on indefinitely, apparently reporting
    progress
    but
    never making any because it's sitting in GC.

    I'm running Java in the Hadoop conf with -Xmx512M, so a
    reasonable heap
    size, and have set 4 maps and 4 reduces per machine (some
    machines have 4
    GB
    ram, some have 8 GB). Would it be beneficial to modify Hadoop to
    use,
    say,
    a gig of ram per job but do fewer jobs per machine? I'm
    concerned that
    while that may get me through this job, it wouldn't get me
    through one 10x
    its size; it feels like there's something more fundamental that I
    may be
    doing wrong.

    I would be interested in general in people's best practices for
    optimization, and specifically on any suggestions as to how to
    fix or work
    around this.

    Thanks in advance,
    Kevin

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categoriespig, hadoop
postedDec 16, '08 at 4:47a
activeDec 17, '08 at 7:04p
posts5
users3
websitepig.apache.org

People

Translate

site design / logo © 2021 Grokbase