Grokbase Groups Pig user May 2013
FAQ
Hi there,

I have a huge input on an HDFS and I would like to use Pig to calculate
several unique metrics. To help explain the problem more easily, I assume
the input file has the following schema:

userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
dimensionC_key:chararray, activity:chararray, ...

Each record represent an activity performed by that userId.

Based on the value in the activity field, this activity record will be
mapped to 1 or more categories. There are about 10 categories in total.

Now I need to count the number of unique users for different dimension
combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
category.

What would be the best practices to perform such calculation?

I have tried several ways. Although I can get the results I want, it takes
a very long time (i.e. days). What I found is most of the time is spent on
the map phase. It looks like the script tries to load the huge input file
every time it tries to calculate one unique count. Is there a way to
improve this behavior?

I also tried something similar to below, but it looks like it reaches the
memory cap for a single reducer and just stuck at the last reducer step.

source = load ... as (userId:chararray, dimensionA_key:chararray,
dimensionB_key:chararray, dimensionC_key:chararray,
activity:chararray, ...);
a = group source by (dimensionA_key, dimensionB_key);
b = foreach a {
     userId1 = udf.newUserIdForCategory1(userId, activity);
     -- this udf returns the original user id if the activity should be
mapped to Category1 and None otherwise
     userId2 = udf.newUserIdForCategory2(userId, activity);
     userId3 = udf.newUserIdForCategory3(userId, activity);
     ...
     userId10 = udf.newUserIdForCategory10(userId, activity);
     generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
COUNT(userId3), ..., COUNT(userId10);
}
store b ...;

Thanks.

T.E.

Search Discussions

  • Jonathan Coveney at May 6, 2013 at 2:57 pm
    Are you familiar with the CUBE keyword that was relatively recently added?
    This sounds like a perfect use case for it. Furthermore, how are you
    splitting on activity? There is a SPLIT operator which is perfect for this,
    as you can have a different relation for each one.

    What I would do would be to use split to break it down into activities,
    then make a macro that gives you the counts you want and use that on each
    of the split relations.


    2013/5/6 Thomas Edison <justdoit.thomas.edison@gmail.com>
    Hi there,

    I have a huge input on an HDFS and I would like to use Pig to calculate
    several unique metrics. To help explain the problem more easily, I assume
    the input file has the following schema:

    userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
    dimensionC_key:chararray, activity:chararray, ...

    Each record represent an activity performed by that userId.

    Based on the value in the activity field, this activity record will be
    mapped to 1 or more categories. There are about 10 categories in total.

    Now I need to count the number of unique users for different dimension
    combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
    category.

    What would be the best practices to perform such calculation?

    I have tried several ways. Although I can get the results I want, it takes
    a very long time (i.e. days). What I found is most of the time is spent on
    the map phase. It looks like the script tries to load the huge input file
    every time it tries to calculate one unique count. Is there a way to
    improve this behavior?

    I also tried something similar to below, but it looks like it reaches the
    memory cap for a single reducer and just stuck at the last reducer step.

    source = load ... as (userId:chararray, dimensionA_key:chararray,
    dimensionB_key:chararray, dimensionC_key:chararray,
    activity:chararray, ...);
    a = group source by (dimensionA_key, dimensionB_key);
    b = foreach a {
    userId1 = udf.newUserIdForCategory1(userId, activity);
    -- this udf returns the original user id if the activity should be
    mapped to Category1 and None otherwise
    userId2 = udf.newUserIdForCategory2(userId, activity);
    userId3 = udf.newUserIdForCategory3(userId, activity);
    ...
    userId10 = udf.newUserIdForCategory10(userId, activity);
    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
    COUNT(userId3), ..., COUNT(userId10);
    }
    store b ...;

    Thanks.

    T.E.
  • Alan Gates at May 6, 2013 at 3:18 pm
    In the script you gave I'd be surprised if it's spending time in the map phase, as the map should be very simple. It's the reduce phase I'd expect to be very expensive because your mapping UDF prevents Pig from using the algebraic nature of count (that is, it has to ship all of the records to reduce not just the number of records). If your file is large this will be expensive. What happens if you switch your script to:

    A = load ...
    B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId, activity) as userId1, ...
    C = group B by dimA, dimB
    D = foreach C generate flatten(group), COUNT(userId1), ...

    When you said it was taking a long time in the map phase were you trying something like the above? If so I'd check how long your UDF is taking. Unless you're reading tons of data on a very small cluster the above should be very fast. It definitely should not reread the input for each UDF.

    Other things to check:
    What's your parallel count set at? That is, how many reducers are you running?
    How many waves of maps does this create? That is, what's the number of maps this produces divided by the number of slots you get on your cluster to run it?

    Alan.
    On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

    Hi there,

    I have a huge input on an HDFS and I would like to use Pig to calculate
    several unique metrics. To help explain the problem more easily, I assume
    the input file has the following schema:

    userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
    dimensionC_key:chararray, activity:chararray, ...

    Each record represent an activity performed by that userId.

    Based on the value in the activity field, this activity record will be
    mapped to 1 or more categories. There are about 10 categories in total.

    Now I need to count the number of unique users for different dimension
    combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
    category.

    What would be the best practices to perform such calculation?

    I have tried several ways. Although I can get the results I want, it takes
    a very long time (i.e. days). What I found is most of the time is spent on
    the map phase. It looks like the script tries to load the huge input file
    every time it tries to calculate one unique count. Is there a way to
    improve this behavior?

    I also tried something similar to below, but it looks like it reaches the
    memory cap for a single reducer and just stuck at the last reducer step.

    source = load ... as (userId:chararray, dimensionA_key:chararray,
    dimensionB_key:chararray, dimensionC_key:chararray,
    activity:chararray, ...);
    a = group source by (dimensionA_key, dimensionB_key);
    b = foreach a {
    userId1 = udf.newUserIdForCategory1(userId, activity);
    -- this udf returns the original user id if the activity should be
    mapped to Category1 and None otherwise
    userId2 = udf.newUserIdForCategory2(userId, activity);
    userId3 = udf.newUserIdForCategory3(userId, activity);
    ...
    userId10 = udf.newUserIdForCategory10(userId, activity);
    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
    COUNT(userId3), ..., COUNT(userId10);
    }
    store b ...;

    Thanks.

    T.E.
  • Thomas Edison at May 6, 2013 at 3:41 pm
    Thanks for the reply.

    @Jonathan,
    I haven't worked with CUBE before. I will try to learn it. Thanks for the
    tip.
    Currently, to split the activity, I use something like this.

    new_relation = FILTER relation BY activity == 'abc' or activity == 'def';

    In some cases, it is a one to one mapping, but not always. To my
    understanding, the SPLIT keyword is doing exactly the same as the way I'm
    doing, correct?

    @Alan,
    I haven't tried your method. I didn't come up with the UDF way until I saw
    my old script is taking too much time in the map phase - scanning the
    source multiple times. I will try your method. I also attached my old
    code at the end, just in case.

    I set my reducer at about 90% of my reducer cap. I think this is what is
    recommended.

    It takes about 10-15 waves.

    My old script:
    source = load ...;

    activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
    A_1 = foreach activity_1 generate dimA, dimB, userId;
    B_1 = distinct A_1;
    C_1 = group B_1 by (dimA, dimB);
    D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
    STORE...

    -- repeat for activity_1, but for other dimension combinations;


    activity_2 = FILTER source BY activity = 'abc';
    -- repeat whatever activity_1 has been done

    -- repeat other activities.

    Thanks.

    T.E.

    On Mon, May 6, 2013 at 8:12 AM, Alan Gates wrote:

    In the script you gave I'd be surprised if it's spending time in the map
    phase, as the map should be very simple. It's the reduce phase I'd expect
    to be very expensive because your mapping UDF prevents Pig from using the
    algebraic nature of count (that is, it has to ship all of the records to
    reduce not just the number of records). If your file is large this will be
    expensive. What happens if you switch your script to:

    A = load ...
    B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
    activity) as userId1, ...
    C = group B by dimA, dimB
    D = foreach C generate flatten(group), COUNT(userId1), ...

    When you said it was taking a long time in the map phase were you trying
    something like the above? If so I'd check how long your UDF is taking.
    Unless you're reading tons of data on a very small cluster the above
    should be very fast. It definitely should not reread the input for each
    UDF.

    Other things to check:
    What's your parallel count set at? That is, how many reducers are you
    running?
    How many waves of maps does this create? That is, what's the number of
    maps this produces divided by the number of slots you get on your cluster
    to run it?

    Alan.
    On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

    Hi there,

    I have a huge input on an HDFS and I would like to use Pig to calculate
    several unique metrics. To help explain the problem more easily, I assume
    the input file has the following schema:

    userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
    dimensionC_key:chararray, activity:chararray, ...

    Each record represent an activity performed by that userId.

    Based on the value in the activity field, this activity record will be
    mapped to 1 or more categories. There are about 10 categories in total.

    Now I need to count the number of unique users for different dimension
    combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
    category.

    What would be the best practices to perform such calculation?

    I have tried several ways. Although I can get the results I want, it takes
    a very long time (i.e. days). What I found is most of the time is spent on
    the map phase. It looks like the script tries to load the huge input file
    every time it tries to calculate one unique count. Is there a way to
    improve this behavior?

    I also tried something similar to below, but it looks like it reaches the
    memory cap for a single reducer and just stuck at the last reducer step.

    source = load ... as (userId:chararray, dimensionA_key:chararray,
    dimensionB_key:chararray, dimensionC_key:chararray,
    activity:chararray, ...);
    a = group source by (dimensionA_key, dimensionB_key);
    b = foreach a {
    userId1 = udf.newUserIdForCategory1(userId, activity);
    -- this udf returns the original user id if the activity should be
    mapped to Category1 and None otherwise
    userId2 = udf.newUserIdForCategory2(userId, activity);
    userId3 = udf.newUserIdForCategory3(userId, activity);
    ...
    userId10 = udf.newUserIdForCategory10(userId, activity);
    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
    COUNT(userId3), ..., COUNT(userId10);
    }
    store b ...;

    Thanks.

    T.E.
  • Thomas Edison at May 6, 2013 at 5:03 pm
    @Alan
    I just tried your method as shown below. The script is stuck at the last
    reducer even for a relative small set of the data and less combinations. I
    suspect it's an out of memory issue. If I remember correctly, to use
    nested foreach to calculate the unique counts is not a very good idea. Any
    suggestions? Thanks.

    T.E.

    source = load ...;
    A = foreach each source generate dimA, dimB, userId,
         udf.getActivity1UserId(userId, activity) as activity1_userId,
         udf.getActivity2UserId(userId, activity) as activity2_userId,
         udf.getActivity3UserId(userId, activity) as activity3_userId,
         ...
         udf.getActivity10UserId(userId, activity) as activity10_userId;

    B = group A by (dimA, dimB);

    C = foreach B {
         unique_activity1 = distinct A.activity1_userId;
         unique_activity2 = distinct A.activity2_userId;
         unique_activity3 = distinct A.activity3_userId;
         ...
         unique_activity10 = distinct A.activity10_userId;
         generate FLATTEN(group), COUNT(unique_activity1),
    COUNT(unique_activity2), COUNT(unique_activity3), ...,
    COUNT(unique_activity10);
    }

    STORE C...;

    On Mon, May 6, 2013 at 8:41 AM, Thomas Edison wrote:

    Thanks for the reply.

    @Jonathan,
    I haven't worked with CUBE before. I will try to learn it. Thanks for
    the tip.
    Currently, to split the activity, I use something like this.

    new_relation = FILTER relation BY activity == 'abc' or activity == 'def';

    In some cases, it is a one to one mapping, but not always. To my
    understanding, the SPLIT keyword is doing exactly the same as the way I'm
    doing, correct?

    @Alan,
    I haven't tried your method. I didn't come up with the UDF way until I
    saw my old script is taking too much time in the map phase - scanning the
    source multiple times. I will try your method. I also attached my old
    code at the end, just in case.

    I set my reducer at about 90% of my reducer cap. I think this is what is
    recommended.

    It takes about 10-15 waves.

    My old script:
    source = load ...;

    activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
    A_1 = foreach activity_1 generate dimA, dimB, userId;
    B_1 = distinct A_1;
    C_1 = group B_1 by (dimA, dimB);
    D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
    STORE...

    -- repeat for activity_1, but for other dimension combinations;


    activity_2 = FILTER source BY activity = 'abc';
    -- repeat whatever activity_1 has been done

    -- repeat other activities.

    Thanks.

    T.E.

    On Mon, May 6, 2013 at 8:12 AM, Alan Gates wrote:

    In the script you gave I'd be surprised if it's spending time in the map
    phase, as the map should be very simple. It's the reduce phase I'd expect
    to be very expensive because your mapping UDF prevents Pig from using the
    algebraic nature of count (that is, it has to ship all of the records to
    reduce not just the number of records). If your file is large this will be
    expensive. What happens if you switch your script to:

    A = load ...
    B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
    activity) as userId1, ...
    C = group B by dimA, dimB
    D = foreach C generate flatten(group), COUNT(userId1), ...

    When you said it was taking a long time in the map phase were you trying
    something like the above? If so I'd check how long your UDF is taking.
    Unless you're reading tons of data on a very small cluster the above
    should be very fast. It definitely should not reread the input for each
    UDF.

    Other things to check:
    What's your parallel count set at? That is, how many reducers are you
    running?
    How many waves of maps does this create? That is, what's the number of
    maps this produces divided by the number of slots you get on your cluster
    to run it?

    Alan.
    On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

    Hi there,

    I have a huge input on an HDFS and I would like to use Pig to calculate
    several unique metrics. To help explain the problem more easily, I assume
    the input file has the following schema:

    userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
    dimensionC_key:chararray, activity:chararray, ...

    Each record represent an activity performed by that userId.

    Based on the value in the activity field, this activity record will be
    mapped to 1 or more categories. There are about 10 categories in total.

    Now I need to count the number of unique users for different dimension
    combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
    category.

    What would be the best practices to perform such calculation?

    I have tried several ways. Although I can get the results I want, it takes
    a very long time (i.e. days). What I found is most of the time is spent on
    the map phase. It looks like the script tries to load the huge input file
    every time it tries to calculate one unique count. Is there a way to
    improve this behavior?

    I also tried something similar to below, but it looks like it reaches the
    memory cap for a single reducer and just stuck at the last reducer step.

    source = load ... as (userId:chararray, dimensionA_key:chararray,
    dimensionB_key:chararray, dimensionC_key:chararray,
    activity:chararray, ...);
    a = group source by (dimensionA_key, dimensionB_key);
    b = foreach a {
    userId1 = udf.newUserIdForCategory1(userId, activity);
    -- this udf returns the original user id if the activity should be
    mapped to Category1 and None otherwise
    userId2 = udf.newUserIdForCategory2(userId, activity);
    userId3 = udf.newUserIdForCategory3(userId, activity);
    ...
    userId10 = udf.newUserIdForCategory10(userId, activity);
    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
    COUNT(userId3), ..., COUNT(userId10);
    }
    store b ...;

    Thanks.

    T.E.
  • Prasanth J at May 9, 2013 at 3:50 am
    Hi Thomas

    It looks like you are trying to count unique users for multiple combinations of dimensions. If so, then you can make use of CUBE operator. Since counting unique users is not an algebraic measure (it is holistic measure) it may result in very slow reducer. There is JIRA which addresses this issue https://issues.apache.org/jira/browse/PIG-2831. If this is what you are looking for then I will look into this JIRA as soon as possible.

    Thanks
    -- Prasanth
    On May 6, 2013, at 10:03 AM, Thomas Edison wrote:

    @Alan
    I just tried your method as shown below. The script is stuck at the last
    reducer even for a relative small set of the data and less combinations. I
    suspect it's an out of memory issue. If I remember correctly, to use
    nested foreach to calculate the unique counts is not a very good idea. Any
    suggestions? Thanks.

    T.E.

    source = load ...;
    A = foreach each source generate dimA, dimB, userId,
    udf.getActivity1UserId(userId, activity) as activity1_userId,
    udf.getActivity2UserId(userId, activity) as activity2_userId,
    udf.getActivity3UserId(userId, activity) as activity3_userId,
    ...
    udf.getActivity10UserId(userId, activity) as activity10_userId;

    B = group A by (dimA, dimB);

    C = foreach B {
    unique_activity1 = distinct A.activity1_userId;
    unique_activity2 = distinct A.activity2_userId;
    unique_activity3 = distinct A.activity3_userId;
    ...
    unique_activity10 = distinct A.activity10_userId;
    generate FLATTEN(group), COUNT(unique_activity1),
    COUNT(unique_activity2), COUNT(unique_activity3), ...,
    COUNT(unique_activity10);
    }

    STORE C...;


    On Mon, May 6, 2013 at 8:41 AM, Thomas Edison <
    justdoit.thomas.edison@gmail.com> wrote:
    Thanks for the reply.

    @Jonathan,
    I haven't worked with CUBE before. I will try to learn it. Thanks for
    the tip.
    Currently, to split the activity, I use something like this.

    new_relation = FILTER relation BY activity == 'abc' or activity == 'def';

    In some cases, it is a one to one mapping, but not always. To my
    understanding, the SPLIT keyword is doing exactly the same as the way I'm
    doing, correct?

    @Alan,
    I haven't tried your method. I didn't come up with the UDF way until I
    saw my old script is taking too much time in the map phase - scanning the
    source multiple times. I will try your method. I also attached my old
    code at the end, just in case.

    I set my reducer at about 90% of my reducer cap. I think this is what is
    recommended.

    It takes about 10-15 waves.

    My old script:
    source = load ...;

    activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
    A_1 = foreach activity_1 generate dimA, dimB, userId;
    B_1 = distinct A_1;
    C_1 = group B_1 by (dimA, dimB);
    D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
    STORE...

    -- repeat for activity_1, but for other dimension combinations;


    activity_2 = FILTER source BY activity = 'abc';
    -- repeat whatever activity_1 has been done

    -- repeat other activities.

    Thanks.

    T.E.

    On Mon, May 6, 2013 at 8:12 AM, Alan Gates wrote:

    In the script you gave I'd be surprised if it's spending time in the map
    phase, as the map should be very simple. It's the reduce phase I'd expect
    to be very expensive because your mapping UDF prevents Pig from using the
    algebraic nature of count (that is, it has to ship all of the records to
    reduce not just the number of records). If your file is large this will be
    expensive. What happens if you switch your script to:

    A = load ...
    B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
    activity) as userId1, ...
    C = group B by dimA, dimB
    D = foreach C generate flatten(group), COUNT(userId1), ...

    When you said it was taking a long time in the map phase were you trying
    something like the above? If so I'd check how long your UDF is taking.
    Unless you're reading tons of data on a very small cluster the above
    should be very fast. It definitely should not reread the input for each
    UDF.

    Other things to check:
    What's your parallel count set at? That is, how many reducers are you
    running?
    How many waves of maps does this create? That is, what's the number of
    maps this produces divided by the number of slots you get on your cluster
    to run it?

    Alan.
    On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

    Hi there,

    I have a huge input on an HDFS and I would like to use Pig to calculate
    several unique metrics. To help explain the problem more easily, I assume
    the input file has the following schema:

    userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
    dimensionC_key:chararray, activity:chararray, ...

    Each record represent an activity performed by that userId.

    Based on the value in the activity field, this activity record will be
    mapped to 1 or more categories. There are about 10 categories in total.

    Now I need to count the number of unique users for different dimension
    combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
    category.

    What would be the best practices to perform such calculation?

    I have tried several ways. Although I can get the results I want, it takes
    a very long time (i.e. days). What I found is most of the time is spent on
    the map phase. It looks like the script tries to load the huge input file
    every time it tries to calculate one unique count. Is there a way to
    improve this behavior?

    I also tried something similar to below, but it looks like it reaches the
    memory cap for a single reducer and just stuck at the last reducer step.

    source = load ... as (userId:chararray, dimensionA_key:chararray,
    dimensionB_key:chararray, dimensionC_key:chararray,
    activity:chararray, ...);
    a = group source by (dimensionA_key, dimensionB_key);
    b = foreach a {
    userId1 = udf.newUserIdForCategory1(userId, activity);
    -- this udf returns the original user id if the activity should be
    mapped to Category1 and None otherwise
    userId2 = udf.newUserIdForCategory2(userId, activity);
    userId3 = udf.newUserIdForCategory3(userId, activity);
    ...
    userId10 = udf.newUserIdForCategory10(userId, activity);
    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
    COUNT(userId3), ..., COUNT(userId10);
    }
    store b ...;

    Thanks.

    T.E.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categoriespig, hadoop
postedMay 6, '13 at 3:11a
activeMay 9, '13 at 3:50a
posts6
users4
websitepig.apache.org

People

Translate

site design / logo © 2021 Grokbase