Grokbase Groups Pig user July 2008
FAQ
Hello,

I'm attempting to run a Pig job on a Hadoop cluster with a 5GB/35
million row input. When run on sample data of 100k rows, I get the
correct results, but when I run it on the whole dataset, some of the
distinct counts are incorrect. The pigfile (field names and input
schema changed slightly to protect the innocent) is below:

register pigutil.jar
raw_data = LOAD '/input/user_sessions.tsv' AS
(userid,timestamp,location,duration);
nice_data = FOREACH raw_data GENERATE userid,
pigutil.DateFromUnixTimestamp(timestamp) as date, location, duration;
report = FOREACH (GROUP nice_data BY (date,location)) {
unique_ids = DISTINCT nice_data.userid;
GENERATE FLATTEN(group), SUM(nice_data.duration) AS total_duration,
COUNT(nice_data) AS hits, COUNT(unique_ids) AS unique_users;
}
STORE report into '/output/user_statistics';

Some info about the data and the errors:

* ~540 result rows total
* total_duration and hits are calculated properly
* There are generally between 5 and 10 incorrect results for
unique_users per run
* In all cases, the number of unique users reported are greater than
the correct number
* Each run produces errors for different rows, and no two runs has
produced exactly the same incorrect data
* There are a lot of log messages from SpillableMemoryManager in the
reduce phase about low memory handlers being called (both collection
and usage threshold being exceeded)
* The error rate *seems* to decrease with an increase in memory (and
increase with a decrease in memory). We don't have enough data
samples to be sure that this is the case. The memory was set using
mapred.child.java.opts.

From what I can tell by my (very limited) knowledge of Pig's
codebase, it seems like the problem might be occurring in the
DistinctDataBag somewhere. Perhaps the uniqueness constraint somehow
gets lost somewhere in the spilling logic. I'm not really sure where
to go from here, since the code in DistinctDataBag is rather complex.
Has anyone else had these problems? Is there someone I can work with
who is familiar with the DistinctDataBag code to try to track down
what's causing these errors?

Thanks,
Brandon

Search Discussions

  • Alan Gates at Jul 24, 2008 at 10:58 pm
    Brandon,

    I wrote the DistinctDataBag stuff, so I'm reasonably familiar with it.
    I can try to run 35m rows through it and see what happens. But I have a
    few questions for you first.

    1) How many machines are you running this query on?
    2) How much memory are allocating to your JVM?
    3) What is the cardinality of your grouping keys?
    4) Is there anything unique or special about your data that I should
    know before I try to simulate your query and see if I can reproduce the
    error?

    Alan.

    Brandon Dimcheff wrote:
    Hello,

    I'm attempting to run a Pig job on a Hadoop cluster with a 5GB/35
    million row input. When run on sample data of 100k rows, I get the
    correct results, but when I run it on the whole dataset, some of the
    distinct counts are incorrect. The pigfile (field names and input
    schema changed slightly to protect the innocent) is below:

    register pigutil.jar
    raw_data = LOAD '/input/user_sessions.tsv' AS
    (userid,timestamp,location,duration);
    nice_data = FOREACH raw_data GENERATE userid,
    pigutil.DateFromUnixTimestamp(timestamp) as date, location, duration;
    report = FOREACH (GROUP nice_data BY (date,location)) {
    unique_ids = DISTINCT nice_data.userid;
    GENERATE FLATTEN(group), SUM(nice_data.duration) AS
    total_duration, COUNT(nice_data) AS hits, COUNT(unique_ids) AS
    unique_users;
    }
    STORE report into '/output/user_statistics';

    Some info about the data and the errors:

    * ~540 result rows total
    * total_duration and hits are calculated properly
    * There are generally between 5 and 10 incorrect results for
    unique_users per run
    * In all cases, the number of unique users reported are greater than
    the correct number
    * Each run produces errors for different rows, and no two runs has
    produced exactly the same incorrect data
    * There are a lot of log messages from SpillableMemoryManager in the
    reduce phase about low memory handlers being called (both collection
    and usage threshold being exceeded)
    * The error rate *seems* to decrease with an increase in memory (and
    increase with a decrease in memory). We don't have enough data
    samples to be sure that this is the case. The memory was set using
    mapred.child.java.opts.

    From what I can tell by my (very limited) knowledge of Pig's codebase,
    it seems like the problem might be occurring in the DistinctDataBag
    somewhere. Perhaps the uniqueness constraint somehow gets lost
    somewhere in the spilling logic. I'm not really sure where to go from
    here, since the code in DistinctDataBag is rather complex. Has anyone
    else had these problems? Is there someone I can work with who is
    familiar with the DistinctDataBag code to try to track down what's
    causing these errors?

    Thanks,
    Brandon
  • Iván de Prado at Jul 25, 2008 at 8:55 am
    I have the same problem in my cluster, with nested DISTINCTs.

    - 4.5 Millions rows dataset.
    - 4 Nodes cluster
    - 400 M Memory per child.
    - PARALLEL 64

    SEED = FOREACH GRP {

    LOCATIONS = DISTINCT DATA.location;
    COUNTRIES = DISTINCT DATA.country;
    LANGUAGES = DISTINCT DATA.language;
    ...

    GENERATE
    flatten(group) as wid,
    COUNT(LOCATIONS) as locations,
    COUNT(COUNTRIES) as countries,
    COUNT(LANGUAGES) as languages,
    COUNT(DATA) as indexed,
    ...
    } PARALLEL 64;

    Iván

    El jue, 24-07-2008 a las 15:57 -0700, Alan Gates escribió:
    Brandon,

    I wrote the DistinctDataBag stuff, so I'm reasonably familiar with it.
    I can try to run 35m rows through it and see what happens. But I have a
    few questions for you first.

    1) How many machines are you running this query on?
    2) How much memory are allocating to your JVM?
    3) What is the cardinality of your grouping keys?
    4) Is there anything unique or special about your data that I should
    know before I try to simulate your query and see if I can reproduce the
    error?

    Alan.

    Brandon Dimcheff wrote:
    Hello,

    I'm attempting to run a Pig job on a Hadoop cluster with a 5GB/35
    million row input. When run on sample data of 100k rows, I get the
    correct results, but when I run it on the whole dataset, some of the
    distinct counts are incorrect. The pigfile (field names and input
    schema changed slightly to protect the innocent) is below:

    register pigutil.jar
    raw_data = LOAD '/input/user_sessions.tsv' AS
    (userid,timestamp,location,duration);
    nice_data = FOREACH raw_data GENERATE userid,
    pigutil.DateFromUnixTimestamp(timestamp) as date, location, duration;
    report = FOREACH (GROUP nice_data BY (date,location)) {
    unique_ids = DISTINCT nice_data.userid;
    GENERATE FLATTEN(group), SUM(nice_data.duration) AS
    total_duration, COUNT(nice_data) AS hits, COUNT(unique_ids) AS
    unique_users;
    }
    STORE report into '/output/user_statistics';

    Some info about the data and the errors:

    * ~540 result rows total
    * total_duration and hits are calculated properly
    * There are generally between 5 and 10 incorrect results for
    unique_users per run
    * In all cases, the number of unique users reported are greater than
    the correct number
    * Each run produces errors for different rows, and no two runs has
    produced exactly the same incorrect data
    * There are a lot of log messages from SpillableMemoryManager in the
    reduce phase about low memory handlers being called (both collection
    and usage threshold being exceeded)
    * The error rate *seems* to decrease with an increase in memory (and
    increase with a decrease in memory). We don't have enough data
    samples to be sure that this is the case. The memory was set using
    mapred.child.java.opts.

    From what I can tell by my (very limited) knowledge of Pig's codebase,
    it seems like the problem might be occurring in the DistinctDataBag
    somewhere. Perhaps the uniqueness constraint somehow gets lost
    somewhere in the spilling logic. I'm not really sure where to go from
    here, since the code in DistinctDataBag is rather complex. Has anyone
    else had these problems? Is there someone I can work with who is
    familiar with the DistinctDataBag code to try to track down what's
    causing these errors?

    Thanks,
    Brandon
  • Brandon Dimcheff at Jul 25, 2008 at 2:41 pm
    Alan,

    Thanks for your help. I've done a bit more experimentation and have
    discovered a couple more things. I first looked at how COUNT was
    implemented. It looks like COUNT calls size() on the bag, which will
    return mSize. I thought that mSize might be calculated improperly so
    I added "SUM(unique_ids) AS crazy_userid_sum" to my GENERATE line and
    re-ran the pigfile:

    GENERATE FLATTEN(group), SUM(nice_data.duration) AS total_duration,
    COUNT(nice_data) AS channel_switches, COUNT(unique_ids) AS
    unique_users, SUM(unique_ids) AS crazy_userid_sum;

    It turns out that the SUM generates the correct result in all cases,
    while there are still occasional errors in the COUNT. Since SUM
    requires an iteration over all the elements in the DistinctDataBag,
    this led me to believe that the uniqueness constraint is indeed
    operating correctly, but there is some error in the logic that
    calculates mSize.

    Then I started poking around in DistinctDataBag looking for anything
    that changes mSize that might be incorrect. I noticed that on line 87
    in addAll(), the size of the DataBag that is passed into the method is
    added to the mSize instance variable, and then during the iteration a
    few lines later mSize is being incremented when an element is
    successfully added to mContents. I thought this might be the problem,
    since it seems like elements would be double counted if addAll() was
    called. I commented out line 87, recompiled Pig, and ran it again,
    but there are still errors (though I do think line 87 might be
    incorrect anyways).

    Thanks to my coworker Marshall, I think we may have discovered what
    the actual problem is. The scenario is as follows: We're adding a
    bunch of stuff to the bag, and before we're finished a spill occurs.
    mContents is cleared during the spill (line 157). All add() does is
    check uniqueness against mContents. So now we will get duplicates in
    mContents that are already on disk and an inflated mSize. Now, the
    reason why SUM works is because the iterator is smart and enforces
    uniqueness as it reads the records back in. We think this occurs at
    the beginning of addToQueue, around line 363 - 369. mMergeTree is a
    TreeSet, so it'll enforce uniqueness and the call to addToQueue is
    aborted if there's already a matching record in mMergeTree.

    Do you think our assessment is correct? If so, it seems that the
    calculation of mSize needs to be significantly more complex than it is
    now. It looks to me like the entire bag will need to be iterated in
    order to reliably calculate the size. Do you have any ideas about how
    to implement this in a less expensive way? I'd be happy to take a
    stab at it, but I don't want to do anything particularly silly if you
    have a better idea.

    Thanks again,
    Brandon

    On Jul 24, 2008, at 18:57, Alan Gates wrote:

    Brandon,

    I wrote the DistinctDataBag stuff, so I'm reasonably familiar with
    it. I can try to run 35m rows through it and see what happens. But
    I have a few questions for you first.
    1) How many machines are you running this query on?
    2) How much memory are allocating to your JVM? 3) What is the
    cardinality of your grouping keys? 4) Is there anything unique or
    special about your data that I should know before I try to simulate
    your query and see if I can reproduce the error?

    Alan.

    Brandon Dimcheff wrote:
    Hello,

    I'm attempting to run a Pig job on a Hadoop cluster with a 5GB/35
    million row input. When run on sample data of 100k rows, I get the
    correct results, but when I run it on the whole dataset, some of
    the distinct counts are incorrect. The pigfile (field names and
    input schema changed slightly to protect the innocent) is below:

    register pigutil.jar
    raw_data = LOAD '/input/user_sessions.tsv' AS
    (userid,timestamp,location,duration);
    nice_data = FOREACH raw_data GENERATE userid,
    pigutil.DateFromUnixTimestamp(timestamp) as date, location, duration;
    report = FOREACH (GROUP nice_data BY (date,location)) {
    unique_ids = DISTINCT nice_data.userid;
    GENERATE FLATTEN(group), SUM(nice_data.duration) AS
    total_duration, COUNT(nice_data) AS hits, COUNT(unique_ids) AS
    unique_users;
    }
    STORE report into '/output/user_statistics';

    Some info about the data and the errors:

    * ~540 result rows total
    * total_duration and hits are calculated properly
    * There are generally between 5 and 10 incorrect results for
    unique_users per run
    * In all cases, the number of unique users reported are greater
    than the correct number
    * Each run produces errors for different rows, and no two runs has
    produced exactly the same incorrect data
    * There are a lot of log messages from SpillableMemoryManager in
    the reduce phase about low memory handlers being called (both
    collection and usage threshold being exceeded)
    * The error rate *seems* to decrease with an increase in memory
    (and increase with a decrease in memory). We don't have enough
    data samples to be sure that this is the case. The memory was set
    using mapred.child.java.opts.

    From what I can tell by my (very limited) knowledge of Pig's
    codebase, it seems like the problem might be occurring in the
    DistinctDataBag somewhere. Perhaps the uniqueness constraint
    somehow gets lost somewhere in the spilling logic. I'm not really
    sure where to go from here, since the code in DistinctDataBag is
    rather complex. Has anyone else had these problems? Is there
    someone I can work with who is familiar with the DistinctDataBag
    code to try to track down what's causing these errors?

    Thanks,
    Brandon
  • Alan Gates at Jul 25, 2008 at 3:21 pm
    Wow, nice job debugging. I don't see a way around this other than
    having DistinctDataBag reimplement size() by running through the
    iterator and actually counting. The one optimization you could do is to
    have it check whether a spill has happened or not yet, and if not just
    return mSize.

    Alan.

    Brandon Dimcheff wrote:
    Alan,

    Thanks for your help. I've done a bit more experimentation and have
    discovered a couple more things. I first looked at how COUNT was
    implemented. It looks like COUNT calls size() on the bag, which will
    return mSize. I thought that mSize might be calculated improperly so
    I added "SUM(unique_ids) AS crazy_userid_sum" to my GENERATE line and
    re-ran the pigfile:

    GENERATE FLATTEN(group), SUM(nice_data.duration) AS total_duration,
    COUNT(nice_data) AS channel_switches, COUNT(unique_ids) AS
    unique_users, SUM(unique_ids) AS crazy_userid_sum;

    It turns out that the SUM generates the correct result in all cases,
    while there are still occasional errors in the COUNT. Since SUM
    requires an iteration over all the elements in the DistinctDataBag,
    this led me to believe that the uniqueness constraint is indeed
    operating correctly, but there is some error in the logic that
    calculates mSize.

    Then I started poking around in DistinctDataBag looking for anything
    that changes mSize that might be incorrect. I noticed that on line 87
    in addAll(), the size of the DataBag that is passed into the method is
    added to the mSize instance variable, and then during the iteration a
    few lines later mSize is being incremented when an element is
    successfully added to mContents. I thought this might be the problem,
    since it seems like elements would be double counted if addAll() was
    called. I commented out line 87, recompiled Pig, and ran it again,
    but there are still errors (though I do think line 87 might be
    incorrect anyways).

    Thanks to my coworker Marshall, I think we may have discovered what
    the actual problem is. The scenario is as follows: We're adding a
    bunch of stuff to the bag, and before we're finished a spill occurs.
    mContents is cleared during the spill (line 157). All add() does is
    check uniqueness against mContents. So now we will get duplicates in
    mContents that are already on disk and an inflated mSize. Now, the
    reason why SUM works is because the iterator is smart and enforces
    uniqueness as it reads the records back in. We think this occurs at
    the beginning of addToQueue, around line 363 - 369. mMergeTree is a
    TreeSet, so it'll enforce uniqueness and the call to addToQueue is
    aborted if there's already a matching record in mMergeTree.

    Do you think our assessment is correct? If so, it seems that the
    calculation of mSize needs to be significantly more complex than it is
    now. It looks to me like the entire bag will need to be iterated in
    order to reliably calculate the size. Do you have any ideas about how
    to implement this in a less expensive way? I'd be happy to take a
    stab at it, but I don't want to do anything particularly silly if you
    have a better idea.

    Thanks again,
    Brandon

    On Jul 24, 2008, at 18:57, Alan Gates wrote:

    Brandon,

    I wrote the DistinctDataBag stuff, so I'm reasonably familiar with
    it. I can try to run 35m rows through it and see what happens. But
    I have a few questions for you first.
    1) How many machines are you running this query on?
    2) How much memory are allocating to your JVM? 3) What is the
    cardinality of your grouping keys? 4) Is there anything unique or
    special about your data that I should know before I try to simulate
    your query and see if I can reproduce the error?

    Alan.

    Brandon Dimcheff wrote:
    Hello,

    I'm attempting to run a Pig job on a Hadoop cluster with a 5GB/35
    million row input. When run on sample data of 100k rows, I get the
    correct results, but when I run it on the whole dataset, some of the
    distinct counts are incorrect. The pigfile (field names and input
    schema changed slightly to protect the innocent) is below:

    register pigutil.jar
    raw_data = LOAD '/input/user_sessions.tsv' AS
    (userid,timestamp,location,duration);
    nice_data = FOREACH raw_data GENERATE userid,
    pigutil.DateFromUnixTimestamp(timestamp) as date, location, duration;
    report = FOREACH (GROUP nice_data BY (date,location)) {
    unique_ids = DISTINCT nice_data.userid;
    GENERATE FLATTEN(group), SUM(nice_data.duration) AS
    total_duration, COUNT(nice_data) AS hits, COUNT(unique_ids) AS
    unique_users;
    }
    STORE report into '/output/user_statistics';

    Some info about the data and the errors:

    * ~540 result rows total
    * total_duration and hits are calculated properly
    * There are generally between 5 and 10 incorrect results for
    unique_users per run
    * In all cases, the number of unique users reported are greater than
    the correct number
    * Each run produces errors for different rows, and no two runs has
    produced exactly the same incorrect data
    * There are a lot of log messages from SpillableMemoryManager in the
    reduce phase about low memory handlers being called (both collection
    and usage threshold being exceeded)
    * The error rate *seems* to decrease with an increase in memory (and
    increase with a decrease in memory). We don't have enough data
    samples to be sure that this is the case. The memory was set using
    mapred.child.java.opts.

    From what I can tell by my (very limited) knowledge of Pig's
    codebase, it seems like the problem might be occurring in the
    DistinctDataBag somewhere. Perhaps the uniqueness constraint
    somehow gets lost somewhere in the spilling logic. I'm not really
    sure where to go from here, since the code in DistinctDataBag is
    rather complex. Has anyone else had these problems? Is there
    someone I can work with who is familiar with the DistinctDataBag
    code to try to track down what's causing these errors?

    Thanks,
    Brandon
  • Brandon Dimcheff at Jul 25, 2008 at 5:49 pm
    We were thinking that we might be able to just iterate the first time
    size is called and then save that to mSize and use that from then on.
    The cached size would probably need to be invalidated if new records
    were added to the bag. Also, perhaps I should submit a jira issue and
    move this discussion to the dev list? We're working on a patch and
    some tests for it, so I'll keep you posted...

    - Brandon
    On Jul 25, 2008, at 11:18, Alan Gates wrote:

    Wow, nice job debugging. I don't see a way around this other than
    having DistinctDataBag reimplement size() by running through the
    iterator and actually counting. The one optimization you could do
    is to have it check whether a spill has happened or not yet, and if
    not just return mSize.

    Alan.

    Brandon Dimcheff wrote:
    Alan,

    Thanks for your help. I've done a bit more experimentation and
    have discovered a couple more things. I first looked at how COUNT
    was implemented. It looks like COUNT calls size() on the bag,
    which will return mSize. I thought that mSize might be calculated
    improperly so I added "SUM(unique_ids) AS crazy_userid_sum" to my
    GENERATE line and re-ran the pigfile:

    GENERATE FLATTEN(group), SUM(nice_data.duration) AS total_duration,
    COUNT(nice_data) AS channel_switches, COUNT(unique_ids) AS
    unique_users, SUM(unique_ids) AS crazy_userid_sum;

    It turns out that the SUM generates the correct result in all
    cases, while there are still occasional errors in the COUNT. Since
    SUM requires an iteration over all the elements in the
    DistinctDataBag, this led me to believe that the uniqueness
    constraint is indeed operating correctly, but there is some error
    in the logic that calculates mSize.

    Then I started poking around in DistinctDataBag looking for
    anything that changes mSize that might be incorrect. I noticed
    that on line 87 in addAll(), the size of the DataBag that is passed
    into the method is added to the mSize instance variable, and then
    during the iteration a few lines later mSize is being incremented
    when an element is successfully added to mContents. I thought this
    might be the problem, since it seems like elements would be double
    counted if addAll() was called. I commented out line 87,
    recompiled Pig, and ran it again, but there are still errors
    (though I do think line 87 might be incorrect anyways).

    Thanks to my coworker Marshall, I think we may have discovered what
    the actual problem is. The scenario is as follows: We're adding a
    bunch of stuff to the bag, and before we're finished a spill
    occurs. mContents is cleared during the spill (line 157). All
    add() does is check uniqueness against mContents. So now we will
    get duplicates in mContents that are already on disk and an
    inflated mSize. Now, the reason why SUM works is because the
    iterator is smart and enforces uniqueness as it reads the records
    back in. We think this occurs at the beginning of addToQueue,
    around line 363 - 369. mMergeTree is a TreeSet, so it'll enforce
    uniqueness and the call to addToQueue is aborted if there's already
    a matching record in mMergeTree.

    Do you think our assessment is correct? If so, it seems that the
    calculation of mSize needs to be significantly more complex than it
    is now. It looks to me like the entire bag will need to be
    iterated in order to reliably calculate the size. Do you have any
    ideas about how to implement this in a less expensive way? I'd be
    happy to take a stab at it, but I don't want to do anything
    particularly silly if you have a better idea.

    Thanks again,
    Brandon

    On Jul 24, 2008, at 18:57, Alan Gates wrote:

    Brandon,

    I wrote the DistinctDataBag stuff, so I'm reasonably familiar with
    it. I can try to run 35m rows through it and see what happens.
    But I have a few questions for you first.
    1) How many machines are you running this query on?
    2) How much memory are allocating to your JVM? 3) What is the
    cardinality of your grouping keys? 4) Is there anything unique or
    special about your data that I should know before I try to
    simulate your query and see if I can reproduce the error?

    Alan.

    Brandon Dimcheff wrote:
    Hello,

    I'm attempting to run a Pig job on a Hadoop cluster with a 5GB/35
    million row input. When run on sample data of 100k rows, I get
    the correct results, but when I run it on the whole dataset, some
    of the distinct counts are incorrect. The pigfile (field names
    and input schema changed slightly to protect the innocent) is
    below:

    register pigutil.jar
    raw_data = LOAD '/input/user_sessions.tsv' AS
    (userid,timestamp,location,duration);
    nice_data = FOREACH raw_data GENERATE userid,
    pigutil.DateFromUnixTimestamp(timestamp) as date, location,
    duration;
    report = FOREACH (GROUP nice_data BY (date,location)) {
    unique_ids = DISTINCT nice_data.userid;
    GENERATE FLATTEN(group), SUM(nice_data.duration) AS
    total_duration, COUNT(nice_data) AS hits, COUNT(unique_ids) AS
    unique_users;
    }
    STORE report into '/output/user_statistics';

    Some info about the data and the errors:

    * ~540 result rows total
    * total_duration and hits are calculated properly
    * There are generally between 5 and 10 incorrect results for
    unique_users per run
    * In all cases, the number of unique users reported are greater
    than the correct number
    * Each run produces errors for different rows, and no two runs
    has produced exactly the same incorrect data
    * There are a lot of log messages from SpillableMemoryManager in
    the reduce phase about low memory handlers being called (both
    collection and usage threshold being exceeded)
    * The error rate *seems* to decrease with an increase in memory
    (and increase with a decrease in memory). We don't have enough
    data samples to be sure that this is the case. The memory was
    set using mapred.child.java.opts.

    From what I can tell by my (very limited) knowledge of Pig's
    codebase, it seems like the problem might be occurring in the
    DistinctDataBag somewhere. Perhaps the uniqueness constraint
    somehow gets lost somewhere in the spilling logic. I'm not
    really sure where to go from here, since the code in
    DistinctDataBag is rather complex. Has anyone else had these
    problems? Is there someone I can work with who is familiar with
    the DistinctDataBag code to try to track down what's causing
    these errors?

    Thanks,
    Brandon

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categoriespig, hadoop
postedJul 24, '08 at 5:53p
activeJul 25, '08 at 5:49p
posts6
users3
websitepig.apache.org

People

Translate

site design / logo © 2021 Grokbase