Grokbase Groups Pig user May 2013
FAQ
@Alan
I just tried your method as shown below. The script is stuck at the last
reducer even for a relative small set of the data and less combinations. I
suspect it's an out of memory issue. If I remember correctly, to use
nested foreach to calculate the unique counts is not a very good idea. Any
suggestions? Thanks.

T.E.

source = load ...;
A = foreach each source generate dimA, dimB, userId,
     udf.getActivity1UserId(userId, activity) as activity1_userId,
     udf.getActivity2UserId(userId, activity) as activity2_userId,
     udf.getActivity3UserId(userId, activity) as activity3_userId,
     ...
     udf.getActivity10UserId(userId, activity) as activity10_userId;

B = group A by (dimA, dimB);

C = foreach B {
     unique_activity1 = distinct A.activity1_userId;
     unique_activity2 = distinct A.activity2_userId;
     unique_activity3 = distinct A.activity3_userId;
     ...
     unique_activity10 = distinct A.activity10_userId;
     generate FLATTEN(group), COUNT(unique_activity1),
COUNT(unique_activity2), COUNT(unique_activity3), ...,
COUNT(unique_activity10);
}

STORE C...;

On Mon, May 6, 2013 at 8:41 AM, Thomas Edison wrote:

Thanks for the reply.

@Jonathan,
I haven't worked with CUBE before. I will try to learn it. Thanks for
the tip.
Currently, to split the activity, I use something like this.

new_relation = FILTER relation BY activity == 'abc' or activity == 'def';

In some cases, it is a one to one mapping, but not always. To my
understanding, the SPLIT keyword is doing exactly the same as the way I'm
doing, correct?

@Alan,
I haven't tried your method. I didn't come up with the UDF way until I
saw my old script is taking too much time in the map phase - scanning the
source multiple times. I will try your method. I also attached my old
code at the end, just in case.

I set my reducer at about 90% of my reducer cap. I think this is what is
recommended.

It takes about 10-15 waves.

My old script:
source = load ...;

activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
A_1 = foreach activity_1 generate dimA, dimB, userId;
B_1 = distinct A_1;
C_1 = group B_1 by (dimA, dimB);
D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
STORE...

-- repeat for activity_1, but for other dimension combinations;


activity_2 = FILTER source BY activity = 'abc';
-- repeat whatever activity_1 has been done

-- repeat other activities.

Thanks.

T.E.

On Mon, May 6, 2013 at 8:12 AM, Alan Gates wrote:

In the script you gave I'd be surprised if it's spending time in the map
phase, as the map should be very simple. It's the reduce phase I'd expect
to be very expensive because your mapping UDF prevents Pig from using the
algebraic nature of count (that is, it has to ship all of the records to
reduce not just the number of records). If your file is large this will be
expensive. What happens if you switch your script to:

A = load ...
B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
activity) as userId1, ...
C = group B by dimA, dimB
D = foreach C generate flatten(group), COUNT(userId1), ...

When you said it was taking a long time in the map phase were you trying
something like the above? If so I'd check how long your UDF is taking.
Unless you're reading tons of data on a very small cluster the above
should be very fast. It definitely should not reread the input for each
UDF.

Other things to check:
What's your parallel count set at? That is, how many reducers are you
running?
How many waves of maps does this create? That is, what's the number of
maps this produces divided by the number of slots you get on your cluster
to run it?

Alan.
On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

Hi there,

I have a huge input on an HDFS and I would like to use Pig to calculate
several unique metrics. To help explain the problem more easily, I assume
the input file has the following schema:

userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
dimensionC_key:chararray, activity:chararray, ...

Each record represent an activity performed by that userId.

Based on the value in the activity field, this activity record will be
mapped to 1 or more categories. There are about 10 categories in total.

Now I need to count the number of unique users for different dimension
combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
category.

What would be the best practices to perform such calculation?

I have tried several ways. Although I can get the results I want, it takes
a very long time (i.e. days). What I found is most of the time is spent on
the map phase. It looks like the script tries to load the huge input file
every time it tries to calculate one unique count. Is there a way to
improve this behavior?

I also tried something similar to below, but it looks like it reaches the
memory cap for a single reducer and just stuck at the last reducer step.

source = load ... as (userId:chararray, dimensionA_key:chararray,
dimensionB_key:chararray, dimensionC_key:chararray,
activity:chararray, ...);
a = group source by (dimensionA_key, dimensionB_key);
b = foreach a {
userId1 = udf.newUserIdForCategory1(userId, activity);
-- this udf returns the original user id if the activity should be
mapped to Category1 and None otherwise
userId2 = udf.newUserIdForCategory2(userId, activity);
userId3 = udf.newUserIdForCategory3(userId, activity);
...
userId10 = udf.newUserIdForCategory10(userId, activity);
generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
COUNT(userId3), ..., COUNT(userId10);
}
store b ...;

Thanks.

T.E.

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 5 of 6 | next ›
Discussion Overview
groupuser @
categoriespig, hadoop
postedMay 6, '13 at 3:11a
activeMay 9, '13 at 3:50a
posts6
users4
websitepig.apache.org

People

Translate

site design / logo © 2021 Grokbase