Grokbase Groups Pig dev March 2010
FAQ
Hmm, okay, I read the documentation further and it appears that this has
already been discussed previously
(here<http://wiki.apache.org/pig/PigTypesFunctionalSpec>).There
seem to be a question of what's the right thing to do. It seems clear to me
though. When an operation like '*' is applied, this is clearly an item-wise
operation that is to be applied to each member of the bag. If a function is
aggregate (SUM), then it operates across an entire bag.

When a COGROUP occurs, just do what SQL does. Which is to say, perform cross
join if an aggregate has been applied across several bags. And do so
automatically, so we don't have to type out the separate FLATTEN's

grouped = COGROUP employee BY name, bonuses BY name;
flattened = FOREACH grouped GENERATE group, *FLATTEN(employee),
FLATTEN(bonuses);grouped_again = GROUP flattened BY group;
total_compensation = FOREACH grouped_again GENERATE group,
SUM(employee:salary * bonuses:multiplier);*

So this should do the same:

grouped = COGROUP employee BY name, bonuses BY name;
total_compensation = FOREACH grouped GENERATE group,
SUM(employee:salary * bonuses:multiplier);


automatically, because that can only have one meaning.

Alternatively, if it is desired to stay with a low-level language, the
solution to all of this confusion around UDF's that take bag's and UDF's
that operate on members of bags can be resolved if we do two things.

1.) Allow UDF's to actually become first class citizens. This way we can
pass UDF's to other UDF's.
2.) introduce the concept of map() and reduce() operator over bags.

This two things allows us more freedom and follows the paradigm of
map-reducing more closely.

grouped = COGROUP employee BY name, bonuses BY name;
total_compensation = FOREACH grouped GENERATE group,
reduce(SUM,map(*,employee::salary,bonuses::multiplier));


Actually, this may deserve a separate keyword. Because map and reduce
operate on single bags where as Pig introduces this concept of co-grouping,
so we should have *comap *and *coreduce* that take functions and operate on
multiple bags that are results of a *cogroup*.

grouped = COGROUP employee BY name, bonuses BY name;
total_compensation = FOREACH grouped GENERATE group,
REDUCE(SUM,COMAP(*, employee::salary,bonuses::multiplier));


This allows us to write efficiently, on one line, what would other wise be
several aliases and unnecessary FLATTENed cross products.

A second thing that I see is the recommendation of implementing looping
constructs. I wonder if I may suggest, as a follow up to the above, that we
beef up UDF's as first class citizens and add the ability to create UDF
functions in Pig Latin with the ability to recurse.

The reason why I think this is a better way to loop than *for(;;)* and *
while(){}* and *do{}while()* statements is that recursive calls are
functional and are more easily optimizable than imperative programming. The
PigJournal <http://wiki.apache.org/pig/PigJournal> has an entry for all of
these constructs and functions under the heading "Extending Pig to Include
Branching, Looping, and Functions", but because map-reduce paradigm is
inherently functional, I would rather think that staying functional would be
a better way to approach this improvement. So the minimal amount of
additional features needed is to implement functions and branching and we
would have loops as a side-effect of those improvements.

In order for the optimizations to be available to PigLatin interpreter, the
functions and branching *must* be implemented within the Pig system. If it
is externalized, or implemented as UDL of some other language, then
opportunities for optimization of the execution vanishes.


Anyways, a couple of cents on a rainy day.



On Wed, Mar 10, 2010 at 10:15 AM, hc busy wrote:

An additional thought... we can define udf's like

ADD(bag{(int,int)}), DIVIDE(bag{(int,int)}), MULTIPLY(bag{(int,int)}),
SQRT(bag{(float)})..

basically vectorize most of the common arithmetic operations, but then the
language has to support it by converting

bag.a + bag.b

to

ADD(bag.(a,b))

I guess there are some difficulties, for instance:

SQRT(bag.a)+bag.b

How would this work? because sqrt(bag.a) returns a bag, how would we
convert it to the correct per tuple operation? It's almost like we want to
convert an expression

SUM(SQRT(bag.a),bag.b)

into a function F such that

SUM(SQRT(bag.a),bag.b) = F(bag.a,bag.b)

and then the F is computed by iterating through on each tuple of the bag.

FOREACH ... GENERATE ..., F(bag.(a,b));





On Wed, Mar 10, 2010 at 9:31 AM, hc busy wrote:


So, pig team, what is the right way to accomplish this?


On Tue, Mar 9, 2010 at 10:50 PM, Mridul Muralidharan <
mridulm@yahoo-inc.com> wrote:
On Tuesday 09 March 2010 04:13 AM, hc busy wrote:

okay. Here's the bag that I have:

{group: (a: int,b: chararray,c: chararray,d: int), TABLE: {number1:
int,
number2:int}}



and I want to do this

grunt> CALCULATE= FOREACH TABLE_group GENERATE group, SUM(TABLE.number1
/
TABLE.number2);

TABLE.number1 actually gives you the bag of number1 values found in TABLE
- but I am never really sure of the semantics in these situations since I am
slightly nervous that it is impl dependent ... my understanding is, what you
are attempting should not work, but I could be wrong.

I do know that TABLE.(number1, number2) will consistently project and
pair up the fields : so to 'fix' this, you can write your own DIVIDE_SUM
which does something like this :

grunt> CALCULATE= FOREACH TABLE_group GENERATE group,
DIVIDE_SUM(TABLE.(number1 , number2));

And DIVIDE_SUM udf impl takes in a bag with tuples containing schema
(numerator, denominator) : and returns :

result == sum ( foreach tuple ( tuple.numerator / tuple.denominator ) );


Obviously, this is not as 'elegant' as your initial code and is
definitely more cumbersome ... so clarifying this behavior with someone from
pig team will definitely be better before you attempt this.


Regards,
Mridul


grunt> DUMP CALCULATE;

2010-03-08 14:02:41,055 [main] ERROR org.apache.pig.tools.grunt.Grunt -
ERROR 1039: Incompatible types in Multiplication Operator left hand
side:bag
right hand side:bag



This seems useful that I may want to calculate an agg. of some
arithmetic
operations on member of a bag. Any suggestions?

... Looking at the documentation it looks like I want to do something
like

SUM(TABLE.(number1 / number2))

but that doesn't work either :-(

Search Discussions

  • Dmitriy Ryaboy at Mar 12, 2010 at 10:15 pm
    hc,
    Good stuff. I was thinking along very similar lines with regards to allowing
    mapping a function over a bag. I suspect a MAP can actually be written as a
    udf. We'd just have to pass the name of the function to be mapped and call
    InstantiateFuncFromSpec on it.

    We may want a different name for it, as "map" and "reduce" are associated
    with the Hadoop map and reduce stages when talking about Pig, and at some
    point Pig may want to allow users to explicitly set up map and reduce jobs
    -- as opposed to mapping functions to members of bags.

    -D

    On Fri, Mar 12, 2010 at 2:00 PM, hc busy wrote:

    Hmm, okay, I read the documentation further and it appears that this has
    already been discussed previously
    (here<http://wiki.apache.org/pig/PigTypesFunctionalSpec>).There
    seem to be a question of what's the right thing to do. It seems clear to me
    though. When an operation like '*' is applied, this is clearly an item-wise
    operation that is to be applied to each member of the bag. If a function is
    aggregate (SUM), then it operates across an entire bag.

    When a COGROUP occurs, just do what SQL does. Which is to say, perform
    cross
    join if an aggregate has been applied across several bags. And do so
    automatically, so we don't have to type out the separate FLATTEN's

    grouped = COGROUP employee BY name, bonuses BY name;
    flattened = FOREACH grouped GENERATE group, *FLATTEN(employee),
    FLATTEN(bonuses);grouped_again = GROUP flattened BY group;
    total_compensation = FOREACH grouped_again GENERATE group,
    SUM(employee:salary * bonuses:multiplier);*

    So this should do the same:

    grouped = COGROUP employee BY name, bonuses BY name;
    total_compensation = FOREACH grouped GENERATE group,
    SUM(employee:salary * bonuses:multiplier);


    automatically, because that can only have one meaning.

    Alternatively, if it is desired to stay with a low-level language, the
    solution to all of this confusion around UDF's that take bag's and UDF's
    that operate on members of bags can be resolved if we do two things.

    1.) Allow UDF's to actually become first class citizens. This way we can
    pass UDF's to other UDF's.
    2.) introduce the concept of map() and reduce() operator over bags.

    This two things allows us more freedom and follows the paradigm of
    map-reducing more closely.

    grouped = COGROUP employee BY name, bonuses BY name;
    total_compensation = FOREACH grouped GENERATE group,
    reduce(SUM,map(*,employee::salary,bonuses::multiplier));


    Actually, this may deserve a separate keyword. Because map and reduce
    operate on single bags where as Pig introduces this concept of co-grouping,
    so we should have *comap *and *coreduce* that take functions and operate on
    multiple bags that are results of a *cogroup*.

    grouped = COGROUP employee BY name, bonuses BY name;
    total_compensation = FOREACH grouped GENERATE group,
    REDUCE(SUM,COMAP(*, employee::salary,bonuses::multiplier));


    This allows us to write efficiently, on one line, what would other wise be
    several aliases and unnecessary FLATTENed cross products.

    A second thing that I see is the recommendation of implementing looping
    constructs. I wonder if I may suggest, as a follow up to the above, that we
    beef up UDF's as first class citizens and add the ability to create UDF
    functions in Pig Latin with the ability to recurse.

    The reason why I think this is a better way to loop than *for(;;)* and *
    while(){}* and *do{}while()* statements is that recursive calls are
    functional and are more easily optimizable than imperative programming. The
    PigJournal <http://wiki.apache.org/pig/PigJournal> has an entry for all of
    these constructs and functions under the heading "Extending Pig to Include
    Branching, Looping, and Functions", but because map-reduce paradigm is
    inherently functional, I would rather think that staying functional would
    be
    a better way to approach this improvement. So the minimal amount of
    additional features needed is to implement functions and branching and we
    would have loops as a side-effect of those improvements.

    In order for the optimizations to be available to PigLatin interpreter, the
    functions and branching *must* be implemented within the Pig system. If it
    is externalized, or implemented as UDL of some other language, then
    opportunities for optimization of the execution vanishes.


    Anyways, a couple of cents on a rainy day.



    On Wed, Mar 10, 2010 at 10:15 AM, hc busy wrote:

    An additional thought... we can define udf's like

    ADD(bag{(int,int)}), DIVIDE(bag{(int,int)}), MULTIPLY(bag{(int,int)}),
    SQRT(bag{(float)})..

    basically vectorize most of the common arithmetic operations, but then the
    language has to support it by converting

    bag.a + bag.b

    to

    ADD(bag.(a,b))

    I guess there are some difficulties, for instance:

    SQRT(bag.a)+bag.b

    How would this work? because sqrt(bag.a) returns a bag, how would we
    convert it to the correct per tuple operation? It's almost like we want to
    convert an expression

    SUM(SQRT(bag.a),bag.b)

    into a function F such that

    SUM(SQRT(bag.a),bag.b) = F(bag.a,bag.b)

    and then the F is computed by iterating through on each tuple of the bag.

    FOREACH ... GENERATE ..., F(bag.(a,b));





    On Wed, Mar 10, 2010 at 9:31 AM, hc busy wrote:


    So, pig team, what is the right way to accomplish this?


    On Tue, Mar 9, 2010 at 10:50 PM, Mridul Muralidharan <
    mridulm@yahoo-inc.com> wrote:
    On Tuesday 09 March 2010 04:13 AM, hc busy wrote:

    okay. Here's the bag that I have:

    {group: (a: int,b: chararray,c: chararray,d: int), TABLE: {number1:
    int,
    number2:int}}



    and I want to do this

    grunt> CALCULATE= FOREACH TABLE_group GENERATE group,
    SUM(TABLE.number1
    /
    TABLE.number2);

    TABLE.number1 actually gives you the bag of number1 values found in
    TABLE
    - but I am never really sure of the semantics in these situations since
    I am
    slightly nervous that it is impl dependent ... my understanding is,
    what you
    are attempting should not work, but I could be wrong.

    I do know that TABLE.(number1, number2) will consistently project and
    pair up the fields : so to 'fix' this, you can write your own
    DIVIDE_SUM
    which does something like this :

    grunt> CALCULATE= FOREACH TABLE_group GENERATE group,
    DIVIDE_SUM(TABLE.(number1 , number2));

    And DIVIDE_SUM udf impl takes in a bag with tuples containing schema
    (numerator, denominator) : and returns :

    result == sum ( foreach tuple ( tuple.numerator / tuple.denominator )
    );

    Obviously, this is not as 'elegant' as your initial code and is
    definitely more cumbersome ... so clarifying this behavior with someone
    from
    pig team will definitely be better before you attempt this.


    Regards,
    Mridul


    grunt> DUMP CALCULATE;

    2010-03-08 14:02:41,055 [main] ERROR org.apache.pig.tools.grunt.Grunt
    -
    ERROR 1039: Incompatible types in Multiplication Operator left hand
    side:bag
    right hand side:bag



    This seems useful that I may want to calculate an agg. of some
    arithmetic
    operations on member of a bag. Any suggestions?

    ... Looking at the documentation it looks like I want to do something
    like

    SUM(TABLE.(number1 / number2))

    but that doesn't work either :-(
  • Hc busy at Mar 16, 2010 at 6:49 pm
    Dmitriy, great minds think alike ;-)

    It would still serve us well to be able to write reducers in general and
    apply dynamically under a different guise. In addition to that, what I was
    pointing out is that mapping and reducing on multiple bags simultaneously is
    inherently a *different* type of operation than map and reduce on single
    bag, which is why we may want to define *coreduction* and *comapping.* I
    guess it's kind of hard to imagine a usecase for this kind of thing. But one
    thing that I've seen that were useful were the windowing functions in sql.
    If I want, for instance,
    *
    *
    *A={i:int, bag1{a:int, b:chararray};*
    *B={j:int, bag2{b:int, c:map}};*
    *C= cogroup A by i, B by j;*
    *
    *
    and after a cogroup, I want to operate on the pair of tuple from two bags
    such that a is minmal and then b is maximal.
    *
    *
    *D= FOREACH C GENERATE {*
    * T1= FOREACH bag1 GENERATE MIN(a);*
    * T2= FOREACH bag2 GENERATE MAX(b);*
    * T3 = FILTER(bag1) by a=T1;*
    * T4 = FILTER(bag2) by b=T2; *
    * GENERATE FLATTEN(T3, T4);*
    *}*

    or in general items (i^{th}) item from each bag. Another example is if we
    performing second place auction on advertisements. The exact function that
    we need is to "retrieve the bidder(bidder bag) corresponding to the second
    highest bid with this search term(term bag) possibly additional restrictions
    (restriction bag)". These types of co-reduction are actually just reduction
    on bags resulting from co-grouping. But there are non-linear time co-maps
    and co-reduces that needs to be considered as well.


    I guess you have a good point regarding keywords: the FOREACH keyword is
    kind of like a map, but FOREACH does not nest. I tried using that in the
    following query:


    grunt> CALCULATE= FOREACH TABLE_group GENERATE group, AVG(TABLE.number1
    / TABLE.number2);
    expanding it to:

    grunt> CALCULATE= *FOREACH* TABLE_group{
    B = *FOREACH* TABLE *GENERATE* number1/number2 as ratio;
    *GENERATE* group, *AVG*(B.ratio) as result;
    }
    2010-03-12 15:16:40,812 [main] ERROR org.apache.pig.tools.grunt.Grunt -
    ERROR 1000: Error during parsing. Encountered " "foreach" "FOREACH "" at
    line 2, column 18.
    Was expecting one of:

    "filter" ...

    "order" ...

    "arrange" ...

    "distinct" ...

    "limit" ...
    So if FOREACH becomes a function taking two parameters. The first parameter
    is a bag, and the second parameter is a function taking as parameter each
    tuple of the bag. Although, keeping it as a keyword and simply allowing
    nesting is good enough for me.

    oh... I see, so when FOREACH is invoked with algebraic's like AVG it behaves
    like reduction. But having both map and reduce in the same keyword seems so
    confusing. I mean, I guess I understand that we want to conform to SQL
    standard and that's the semantics of SQL's SELECT statement.

    The problem with creating functions is it causes us to create variable
    spaces. a for loop from 1 to 100 looks like this:

    function loop(index:int):int{
    if(index==100) return 0;
    else{
    //do some work,
    loop(index+1);
    }
    }

    the variable index is not going to be present in the same space as the
    aliases. And, if the "some work" involves map-reduction then several
    decisions has to be made about

    a.) how do we deal with accessing the index variable during map-reduction
    and
    b.) how do we handle updates to index during map reduction.

    certainly a.) is simple enough if index is thought to be constant in all
    context except for during execution in the loop on the pig server. but if we
    want to allow index to be mutable, then things get really complicated. In
    that case, we are no longer in the same paradigm as map-reduction.

    On Fri, Mar 12, 2010 at 2:14 PM, Dmitriy Ryaboy wrote:

    hc,
    Good stuff. I was thinking along very similar lines with regards to
    allowing
    mapping a function over a bag. I suspect a MAP can actually be written as a
    udf. We'd just have to pass the name of the function to be mapped and call
    InstantiateFuncFromSpec on it.

    We may want a different name for it, as "map" and "reduce" are associated
    with the Hadoop map and reduce stages when talking about Pig, and at some
    point Pig may want to allow users to explicitly set up map and reduce jobs
    -- as opposed to mapping functions to members of bags.

    -D

    On Fri, Mar 12, 2010 at 2:00 PM, hc busy wrote:

    Hmm, okay, I read the documentation further and it appears that this has
    already been discussed previously
    (here<http://wiki.apache.org/pig/PigTypesFunctionalSpec>).There
    seem to be a question of what's the right thing to do. It seems clear to me
    though. When an operation like '*' is applied, this is clearly an item-wise
    operation that is to be applied to each member of the bag. If a function is
    aggregate (SUM), then it operates across an entire bag.

    When a COGROUP occurs, just do what SQL does. Which is to say, perform
    cross
    join if an aggregate has been applied across several bags. And do so
    automatically, so we don't have to type out the separate FLATTEN's

    grouped = COGROUP employee BY name, bonuses BY name;
    flattened = FOREACH grouped GENERATE group, *FLATTEN(employee),
    FLATTEN(bonuses);grouped_again = GROUP flattened BY group;
    total_compensation = FOREACH grouped_again GENERATE group,
    SUM(employee:salary * bonuses:multiplier);*

    So this should do the same:

    grouped = COGROUP employee BY name, bonuses BY name;
    total_compensation = FOREACH grouped GENERATE group,
    SUM(employee:salary * bonuses:multiplier);


    automatically, because that can only have one meaning.

    Alternatively, if it is desired to stay with a low-level language, the
    solution to all of this confusion around UDF's that take bag's and UDF's
    that operate on members of bags can be resolved if we do two things.

    1.) Allow UDF's to actually become first class citizens. This way we can
    pass UDF's to other UDF's.
    2.) introduce the concept of map() and reduce() operator over bags.

    This two things allows us more freedom and follows the paradigm of
    map-reducing more closely.

    grouped = COGROUP employee BY name, bonuses BY name;
    total_compensation = FOREACH grouped GENERATE group,
    reduce(SUM,map(*,employee::salary,bonuses::multiplier));


    Actually, this may deserve a separate keyword. Because map and reduce
    operate on single bags where as Pig introduces this concept of
    co-grouping,
    so we should have *comap *and *coreduce* that take functions and operate on
    multiple bags that are results of a *cogroup*.

    grouped = COGROUP employee BY name, bonuses BY name;
    total_compensation = FOREACH grouped GENERATE group,
    REDUCE(SUM,COMAP(*, employee::salary,bonuses::multiplier));


    This allows us to write efficiently, on one line, what would other wise be
    several aliases and unnecessary FLATTENed cross products.

    A second thing that I see is the recommendation of implementing looping
    constructs. I wonder if I may suggest, as a follow up to the above, that we
    beef up UDF's as first class citizens and add the ability to create UDF
    functions in Pig Latin with the ability to recurse.

    The reason why I think this is a better way to loop than *for(;;)* and *
    while(){}* and *do{}while()* statements is that recursive calls are
    functional and are more easily optimizable than imperative programming. The
    PigJournal <http://wiki.apache.org/pig/PigJournal> has an entry for all of
    these constructs and functions under the heading "Extending Pig to Include
    Branching, Looping, and Functions", but because map-reduce paradigm is
    inherently functional, I would rather think that staying functional would
    be
    a better way to approach this improvement. So the minimal amount of
    additional features needed is to implement functions and branching and we
    would have loops as a side-effect of those improvements.

    In order for the optimizations to be available to PigLatin interpreter, the
    functions and branching *must* be implemented within the Pig system. If it
    is externalized, or implemented as UDL of some other language, then
    opportunities for optimization of the execution vanishes.


    Anyways, a couple of cents on a rainy day.



    On Wed, Mar 10, 2010 at 10:15 AM, hc busy wrote:

    An additional thought... we can define udf's like

    ADD(bag{(int,int)}), DIVIDE(bag{(int,int)}), MULTIPLY(bag{(int,int)}),
    SQRT(bag{(float)})..

    basically vectorize most of the common arithmetic operations, but then the
    language has to support it by converting

    bag.a + bag.b

    to

    ADD(bag.(a,b))

    I guess there are some difficulties, for instance:

    SQRT(bag.a)+bag.b

    How would this work? because sqrt(bag.a) returns a bag, how would we
    convert it to the correct per tuple operation? It's almost like we want to
    convert an expression

    SUM(SQRT(bag.a),bag.b)

    into a function F such that

    SUM(SQRT(bag.a),bag.b) = F(bag.a,bag.b)

    and then the F is computed by iterating through on each tuple of the
    bag.
    FOREACH ... GENERATE ..., F(bag.(a,b));





    On Wed, Mar 10, 2010 at 9:31 AM, hc busy wrote:


    So, pig team, what is the right way to accomplish this?


    On Tue, Mar 9, 2010 at 10:50 PM, Mridul Muralidharan <
    mridulm@yahoo-inc.com> wrote:
    On Tuesday 09 March 2010 04:13 AM, hc busy wrote:

    okay. Here's the bag that I have:

    {group: (a: int,b: chararray,c: chararray,d: int), TABLE: {number1:
    int,
    number2:int}}



    and I want to do this

    grunt> CALCULATE= FOREACH TABLE_group GENERATE group,
    SUM(TABLE.number1
    /
    TABLE.number2);

    TABLE.number1 actually gives you the bag of number1 values found in
    TABLE
    - but I am never really sure of the semantics in these situations
    since
    I am
    slightly nervous that it is impl dependent ... my understanding is,
    what you
    are attempting should not work, but I could be wrong.

    I do know that TABLE.(number1, number2) will consistently project and
    pair up the fields : so to 'fix' this, you can write your own
    DIVIDE_SUM
    which does something like this :

    grunt> CALCULATE= FOREACH TABLE_group GENERATE group,
    DIVIDE_SUM(TABLE.(number1 , number2));

    And DIVIDE_SUM udf impl takes in a bag with tuples containing schema
    (numerator, denominator) : and returns :

    result == sum ( foreach tuple ( tuple.numerator / tuple.denominator )
    );

    Obviously, this is not as 'elegant' as your initial code and is
    definitely more cumbersome ... so clarifying this behavior with
    someone
    from
    pig team will definitely be better before you attempt this.


    Regards,
    Mridul


    grunt> DUMP CALCULATE;

    2010-03-08 14:02:41,055 [main] ERROR
    org.apache.pig.tools.grunt.Grunt
    -
    ERROR 1039: Incompatible types in Multiplication Operator left hand
    side:bag
    right hand side:bag



    This seems useful that I may want to calculate an agg. of some
    arithmetic
    operations on member of a bag. Any suggestions?

    ... Looking at the documentation it looks like I want to do
    something
    like

    SUM(TABLE.(number1 / number2))

    but that doesn't work either :-(

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupdev @
categoriespig, hadoop
postedMar 12, '10 at 10:01p
activeMar 16, '10 at 6:49p
posts3
users2
websitepig.apache.org

2 users in discussion

Hc busy: 2 posts Dmitriy Ryaboy: 1 post

People

Translate

site design / logo © 2022 Grokbase