FAQ
Hi all,



I am facing a problem with aggregations where reduce groups are
extremely large.



It's a very common usage scenario - for example someone might want the
equivalent of 'count (distinct.field2) from events e group by e.field1'.
the natural thing to do is emit e.field1 as the map-key and do the
distinct and count in the reduce.



Unfortunately, the values in the reduce phase have to be all pulled into
memory. And we end up running out of memory for large groups. It would
be great if the values iterator were able to seamlessly pull in data
from disk - especially since the data is coming from persistent store.



I was wondering if other people have faced this problem - and what they
have done (there are some solutions I have been suggested - like first
doing a group by on field1_hash(field2) to reduce group size - but they
are a pain to implement). And how difficult would it be to have an
iterator iterate over on-disk - rather than in memory - values?



Thx,



Joydeep

Search Discussions

  • Runping Qi at Oct 11, 2007 at 9:05 pm
    The values to reduce is an disk backed iterator.
    The problematic part is to compute the distinct count.
    You have to keep the unique values in memory, or you have to use some other
    tricks.
    One of such tricks is sampling. The other is to do write the values out to
    disk to do a merge sort, then read the sorted values in sequentially.
    It would be nice if somebody can contribute a patch.

    Runping

    -----Original Message-----
    From: Joydeep Sen Sarma
    Sent: Thursday, October 11, 2007 1:17 PM
    To: hadoop-user@lucene.apache.org
    Subject: large reduce group sizes

    Hi all,



    I am facing a problem with aggregations where reduce groups are
    extremely large.



    It's a very common usage scenario - for example someone might want the
    equivalent of 'count (distinct.field2) from events e group by e.field1'.
    the natural thing to do is emit e.field1 as the map-key and do the
    distinct and count in the reduce.



    Unfortunately, the values in the reduce phase have to be all pulled into
    memory. And we end up running out of memory for large groups. It would
    be great if the values iterator were able to seamlessly pull in data
    from disk - especially since the data is coming from persistent store.



    I was wondering if other people have faced this problem - and what they
    have done (there are some solutions I have been suggested - like first
    doing a group by on field1_hash(field2) to reduce group size - but they
    are a pain to implement). And how difficult would it be to have an
    iterator iterate over on-disk - rather than in memory - values?



    Thx,



    Joydeep


  • Joydeep Sen Sarma at Oct 11, 2007 at 9:30 pm
    great! Didn't realize that the iterator was disk based.

    The below sounds very doable. Will give it a shot. Do you see this as an
    option in the mapred job (optionally sort values)?

    -----Original Message-----
    From: Runping Qi
    Sent: Thursday, October 11, 2007 2:04 PM
    To: hadoop-user@lucene.apache.org
    Subject: RE: large reduce group sizes


    The values to reduce is an disk backed iterator.
    The problematic part is to compute the distinct count.
    You have to keep the unique values in memory, or you have to use some
    other
    tricks.
    One of such tricks is sampling. The other is to do write the values out
    to
    disk to do a merge sort, then read the sorted values in sequentially.
    It would be nice if somebody can contribute a patch.

    Runping

    -----Original Message-----
    From: Joydeep Sen Sarma
    Sent: Thursday, October 11, 2007 1:17 PM
    To: hadoop-user@lucene.apache.org
    Subject: large reduce group sizes

    Hi all,



    I am facing a problem with aggregations where reduce groups are
    extremely large.



    It's a very common usage scenario - for example someone might want the
    equivalent of 'count (distinct.field2) from events e group by
    e.field1'.
    the natural thing to do is emit e.field1 as the map-key and do the
    distinct and count in the reduce.



    Unfortunately, the values in the reduce phase have to be all pulled into
    memory. And we end up running out of memory for large groups. It would
    be great if the values iterator were able to seamlessly pull in data
    from disk - especially since the data is coming from persistent store.



    I was wondering if other people have faced this problem - and what they
    have done (there are some solutions I have been suggested - like first
    doing a group by on field1_hash(field2) to reduce group size - but they
    are a pain to implement). And how difficult would it be to have an
    iterator iterate over on-disk - rather than in memory - values?



    Thx,



    Joydeep


  • Ted Dunning at Oct 11, 2007 at 9:39 pm
    First off, as in my previous mail, you don't need special mechanism to do
    the unique count. Just add another MR step (which is, of course, sorting
    its little heart out).

    Secondly, you can definitely force the reduce iterator to give you values in
    order. The method is not very obvious, but there was a long thread on it a
    week or so ago.

    The summary outline is that you have to introduce the data you want to sort
    on into the key. Then you have to define both a sort order AND a partition
    function. The only problem is that you will only see ONE of the keys so you
    have to duplicate any data your want in the value.

    It isn't hard. But it also definitely isn't obvious.

    I can't wait to get Pig out there so we don't have to know all of this.

    On 10/11/07 2:32 PM, "Joydeep Sen Sarma" wrote:

    great! Didn't realize that the iterator was disk based.

    The below sounds very doable. Will give it a shot. Do you see this as an
    option in the mapred job (optionally sort values)?

    -----Original Message-----
    From: Runping Qi
    Sent: Thursday, October 11, 2007 2:04 PM
    To: hadoop-user@lucene.apache.org
    Subject: RE: large reduce group sizes


    The values to reduce is an disk backed iterator.
    The problematic part is to compute the distinct count.
    You have to keep the unique values in memory, or you have to use some
    other
    tricks.
    One of such tricks is sampling. The other is to do write the values out
    to
    disk to do a merge sort, then read the sorted values in sequentially.
    It would be nice if somebody can contribute a patch.

    Runping

    -----Original Message-----
    From: Joydeep Sen Sarma
    Sent: Thursday, October 11, 2007 1:17 PM
    To: hadoop-user@lucene.apache.org
    Subject: large reduce group sizes

    Hi all,



    I am facing a problem with aggregations where reduce groups are
    extremely large.



    It's a very common usage scenario - for example someone might want the
    equivalent of 'count (distinct.field2) from events e group by
    e.field1'.
    the natural thing to do is emit e.field1 as the map-key and do the
    distinct and count in the reduce.



    Unfortunately, the values in the reduce phase have to be all pulled into
    memory. And we end up running out of memory for large groups. It would
    be great if the values iterator were able to seamlessly pull in data
    from disk - especially since the data is coming from persistent store.



    I was wondering if other people have faced this problem - and what they
    have done (there are some solutions I have been suggested - like first
    doing a group by on field1_hash(field2) to reduce group size - but they
    are a pain to implement). And how difficult would it be to have an
    iterator iterate over on-disk - rather than in memory - values?



    Thx,



    Joydeep


  • Joydeep Sen Sarma at Oct 11, 2007 at 9:55 pm
    Yeah - I am doing it with two MR jobs right now.

    Understood the second solution. Is this what Pig uses internally (lazy -
    should just look at the code)?

    (One of the issues is that the optimal implementation requires
    anticipating the group size. Easy to do by custom code, hard to do
    automatically .. (would have to maintain approximate counts of distinct
    values by each dimension))

    -----Original Message-----
    From: Ted Dunning
    Sent: Thursday, October 11, 2007 2:39 PM
    To: hadoop-user@lucene.apache.org
    Subject: Re: large reduce group sizes


    First off, as in my previous mail, you don't need special mechanism to
    do
    the unique count. Just add another MR step (which is, of course,
    sorting
    its little heart out).

    Secondly, you can definitely force the reduce iterator to give you
    values in
    order. The method is not very obvious, but there was a long thread on
    it a
    week or so ago.

    The summary outline is that you have to introduce the data you want to
    sort
    on into the key. Then you have to define both a sort order AND a
    partition
    function. The only problem is that you will only see ONE of the keys so
    you
    have to duplicate any data your want in the value.

    It isn't hard. But it also definitely isn't obvious.

    I can't wait to get Pig out there so we don't have to know all of this.

    On 10/11/07 2:32 PM, "Joydeep Sen Sarma" wrote:

    great! Didn't realize that the iterator was disk based.

    The below sounds very doable. Will give it a shot. Do you see this as an
    option in the mapred job (optionally sort values)?

    -----Original Message-----
    From: Runping Qi
    Sent: Thursday, October 11, 2007 2:04 PM
    To: hadoop-user@lucene.apache.org
    Subject: RE: large reduce group sizes


    The values to reduce is an disk backed iterator.
    The problematic part is to compute the distinct count.
    You have to keep the unique values in memory, or you have to use some
    other
    tricks.
    One of such tricks is sampling. The other is to do write the values out
    to
    disk to do a merge sort, then read the sorted values in sequentially.
    It would be nice if somebody can contribute a patch.

    Runping

    -----Original Message-----
    From: Joydeep Sen Sarma
    Sent: Thursday, October 11, 2007 1:17 PM
    To: hadoop-user@lucene.apache.org
    Subject: large reduce group sizes

    Hi all,



    I am facing a problem with aggregations where reduce groups are
    extremely large.



    It's a very common usage scenario - for example someone might want
    the
    equivalent of 'count (distinct.field2) from events e group by
    e.field1'.
    the natural thing to do is emit e.field1 as the map-key and do the
    distinct and count in the reduce.



    Unfortunately, the values in the reduce phase have to be all pulled into
    memory. And we end up running out of memory for large groups. It
    would
    be great if the values iterator were able to seamlessly pull in data
    from disk - especially since the data is coming from persistent
    store.


    I was wondering if other people have faced this problem - and what they
    have done (there are some solutions I have been suggested - like
    first
    doing a group by on field1_hash(field2) to reduce group size - but they
    are a pain to implement). And how difficult would it be to have an
    iterator iterate over on-disk - rather than in memory - values?



    Thx,



    Joydeep


  • Ted Dunning at Oct 11, 2007 at 10:32 pm
    Why do you need to know the group size?

    Did I miss a transition in exactly what you are talking about?
    On 10/11/07 2:57 PM, "Joydeep Sen Sarma" wrote:

    Yeah - I am doing it with two MR jobs right now.

    ...

    (One of the issues is that the optimal implementation requires
    anticipating the group size. Easy to do by custom code, hard to do
    automatically .. (would have to maintain approximate counts of distinct
    values by each dimension))
  • Ted Dunning at Oct 11, 2007 at 9:33 pm
    When dealing with this, I often do a second reduce to count the unique lines
    output by a counting program such as you describe.

    For a common case of users and content items, the first MR produces <user,
    item, count> records. Then similar counting programs give <user, count>,
    <item, count>, <user, unique-item-count> and <item, unique-user> records in
    four steps that are each much faster than the original scan. Generating all
    four kinds of outputs in one step would be pretty easy as well, but that
    would require special output code to separate them. The four steps are fast
    enough compared to the first step that I don't much care. Combining unique
    and totals works more often if you are dropping data into a statistics table
    in a database.

    In detail:

    MR1:
    map: <user, item> => <<user, item>, 1>
    combine and reduce: standard key counter

    MR2:
    map: <user, item, count> => <user, count>
    combine and reduce: standard key counter

    MR3:
    map: <user, item, count> => <user, 1>
    combine and reduce: standard key counter

    MR4:
    map: <user, item, count> => <item, count>
    combine and reduce: standard key counter

    MR5:
    map: <user, item, count> => <user, 1>
    combine and reduce: standard key counter

    Note the similarity between 2&3 and 4&5. These four could be combined as
    mentioned above into two steps:

    MR2:
    map: <user, item, count> => <<TOTAL, user>, count>, <<UNIQUE, user>, 1>
    combine and reduce: standard key counter

    MR3:
    map: <user, item, count> => <<TOTAL, item>, count>, <<UNIQUE, item>, 1>
    combine and reduce: standard key counter

    On 10/11/07 1:17 PM, "Joydeep Sen Sarma" wrote:

    Hi all,



    I am facing a problem with aggregations where reduce groups are
    extremely large.



    It's a very common usage scenario - for example someone might want the
    equivalent of 'count (distinct.field2) from events e group by e.field1'.
    the natural thing to do is emit e.field1 as the map-key and do the
    distinct and count in the reduce.



    Unfortunately, the values in the reduce phase have to be all pulled into
    memory. And we end up running out of memory for large groups. It would
    be great if the values iterator were able to seamlessly pull in data
    from disk - especially since the data is coming from persistent store.



    I was wondering if other people have faced this problem - and what they
    have done (there are some solutions I have been suggested - like first
    doing a group by on field1_hash(field2) to reduce group size - but they
    are a pain to implement). And how difficult would it be to have an
    iterator iterate over on-disk - rather than in memory - values?



    Thx,



    Joydeep



Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedOct 11, '07 at 8:15p
activeOct 11, '07 at 10:32p
posts7
users3
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase