Grokbase Groups Pig user August 2011
FAQ
I think this is similar to to the 'merge' join issue not being automatically
supported.

If we have done a GROUP in the past, this data should have been mapped, then
handed off to the reducers and stored on those nodes.

They should be nicely tiled and ready for a tiled merge join with 1/Nth of
the data on each of your nodes (ignoring replicas of course).

Now if COGROUP comes along, and has the previous GROUP as part of the
expression, it should be able to use the previous data already reduced and
on disk as the source of one of the relations.

The other relation may need to be map reduced first of course.

I tried to come up with a simple pig script to EXPLAIN the problem. I think
this one will do it:
foo = LOAD 'group_test1.csv' USING PigStorage(',') AS (col_a:int,
col_b:int);
bar = LOAD 'group_test2.csv' USING PigStorage(',') AS (col_a:int,
col_b:int);
foo_grouped = GROUP foo BY col_a;
both_grouped = GROUP foo_grouped BY $0 , bar BY col_a;
STORE foo_grouped INTO 'foo_grouped';
STORE both_grouped INTO 'both_grouped';

It first does a global rearrange to form foo_grouped, but then does it again
to form both_grouped. Which of course seems like no fun.

…. here is the EXPLAIN (assuming I'm reading it right).


#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
foo_grouped: (Name: LOStore Schema:
group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
---foo_grouped: (Name: LOSplitOutput Schema:
group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
(Name: Constant Type: boolean Uid: 41)

---foo_grouped: (Name: LOSplit Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
---foo_grouped: (Name: LOCogroup Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
col_a:(Name: Project Type: int Uid: 15 Input: 0 Column: 0)

---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
(Name: LOGenerate[false,false] Schema:
col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
15]ColumnPrune:OutputUids=[16, 15]
(Name: Cast Type: int Uid: 15)
---col_a:(Name: Project Type: bytearray Uid: 15
Input: 0 Column: (*))
(Name: Cast Type: int Uid: 16)
---col_b:(Name: Project Type: bytearray Uid: 16
Input: 1 Column: (*))
---(Name: LOInnerLoad[0] Schema: col_a#15:bytearray)

---(Name: LOInnerLoad[1] Schema: col_b#16:bytearray)
---foo: (Name: LOLoad Schema:
col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null

both_grouped: (Name: LOStore Schema:
group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
---both_grouped: (Name: LOCogroup Schema:
group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
group:(Name: Project Type: int Uid: 45 Input: 0 Column: 0)
col_a:(Name: Project Type: int Uid: 17 Input: 1 Column: 0)

---foo_grouped: (Name: LOSplitOutput Schema:
group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})
(Name: Constant Type: boolean Uid: 44)

---foo_grouped: (Name: LOSplit Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
---foo_grouped: (Name: LOCogroup Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
col_a:(Name: Project Type: int Uid: 15 Input: 0 Column:
0)
---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
(Name: LOGenerate[false,false] Schema:
col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
15]ColumnPrune:OutputUids=[16, 15]
(Name: Cast Type: int Uid: 15)
---col_a:(Name: Project Type: bytearray Uid: 15
Input: 0 Column: (*))
(Name: Cast Type: int Uid: 16)
---col_b:(Name: Project Type: bytearray Uid: 16
Input: 1 Column: (*))
---(Name: LOInnerLoad[0] Schema:
col_a#15:bytearray)
---(Name: LOInnerLoad[1] Schema:
col_b#16:bytearray)
---foo: (Name: LOLoad Schema:
col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null
---bar: (Name: LOForEach Schema: col_a#17:int,col_b#18:int)
(Name: LOGenerate[false,false] Schema:
col_a#17:int,col_b#18:int)ColumnPrune:InputUids=[17,
18]ColumnPrune:OutputUids=[17, 18]
(Name: Cast Type: int Uid: 17)
---col_a:(Name: Project Type: bytearray Uid: 17 Input: 0
Column: (*))
(Name: Cast Type: int Uid: 18)
---col_b:(Name: Project Type: bytearray Uid: 18 Input: 1
Column: (*))
---(Name: LOInnerLoad[0] Schema: col_a#17:bytearray)

---(Name: LOInnerLoad[1] Schema: col_b#18:bytearray)
---bar: (Name: LOLoad Schema:
col_a#17:bytearray,col_b#18:bytearray)RequiredFields:null

#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
foo_grouped:
Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
- scope-15
---foo_grouped: Filter[bag] - scope-13
Constant(true) - scope-14

---foo_grouped: Split - scope-12

---foo_grouped: Package[tuple]{int} - scope-9

---foo_grouped: Global Rearrange[tuple] - scope-8

---foo_grouped: Local Rearrange[tuple]{int}(false) - scope-10
Project[int][0] - scope-11

---foo: New For Each(false,false)[bag] - scope-7

Cast[int] - scope-2
---Project[bytearray][0] - scope-1
Cast[int] - scope-5
---Project[bytearray][1] - scope-4
---foo:
Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
scope-0

both_grouped:
Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
- scope-32
---both_grouped: Package[tuple]{int} - scope-27

---both_grouped: Global Rearrange[tuple] - scope-26

---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
Project[int][0] - scope-29

---foo_grouped: Filter[bag] - scope-16
Constant(true) - scope-17

---foo_grouped: Split - scope-12

---foo_grouped: Package[tuple]{int} - scope-9

---foo_grouped: Global Rearrange[tuple] - scope-8

---foo_grouped: Local
Rearrange[tuple]{int}(false) - scope-10
Project[int][0] - scope-11

---foo: New For Each(false,false)[bag] -
scope-7
Cast[int] - scope-2
---Project[bytearray][0] - scope-1
Cast[int] - scope-5
---Project[bytearray][1] - scope-4
---foo:
Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
scope-0
---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
Project[int][0] - scope-31

---bar: New For Each(false,false)[bag] - scope-25

Cast[int] - scope-20
---Project[bytearray][0] - scope-19
Cast[int] - scope-23
---Project[bytearray][1] - scope-22
---bar:
Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
scope-18

2011-08-30 13:10:30,404 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
File concatenation threshold: 100 optimistic? false
2011-08-30 13:10:30,492 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size before optimization: 3
2011-08-30 13:10:30,493 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- Merged 1 map-only splittees.
2011-08-30 13:10:30,495 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- Merged 1 out of total 3 MR operators.
2011-08-30 13:10:30,495 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size after optimization: 2
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-33
Map Plan
foo_grouped: Local Rearrange[tuple]{int}(false) - scope-10
Project[int][0] - scope-11

---foo: New For Each(false,false)[bag] - scope-7

Cast[int] - scope-2
---Project[bytearray][0] - scope-1
Cast[int] - scope-5
---Project[bytearray][1] - scope-4
---foo:
Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
scope-0--------
Reduce Plan
Split - scope-42
foo_grouped:
Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
- scope-15
Store(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
- scope-34
---foo_grouped: Package[tuple]{int} - scope-9--------
Global sort: false
----------------

MapReduce node scope-40
Map Plan
Union[tuple] - scope-41
---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
Project[int][0] - scope-29
---Load(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
- scope-37
---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
Project[int][0] - scope-31

---bar: New For Each(false,false)[bag] - scope-25

Cast[int] - scope-20
---Project[bytearray][0] - scope-19
Cast[int] - scope-23
---Project[bytearray][1] - scope-22
---bar:
Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
scope-18--------
Reduce Plan
both_grouped:
Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
- scope-32
---both_grouped: Package[tuple]{int} - scope-27--------
Global sort: false
----------------


--

Founder/CEO Spinn3r.com

Location: *San Francisco, CA*
Skype: *burtonator*

Skype-in: *(415) 871-0687*

Search Discussions

  • Dmitriy Ryaboy at Aug 30, 2011 at 9:24 pm
    If I understand correctly, you are objecting to the reshuffle of foo_grouped
    based on col_a when it was just shuffled based on col_a.

    I've considered this. We currently don't take existing partitioning into
    account; that would be handy. But we need to evaluate the cost/benefit here.

    This only works when parallelism is the same for both group operators.

    The number of MR jobs doesn't get reduced; the number of required mappers
    gets reduced to only those needed to read the second relation, and the
    reducers have to pull from the previously generated data as side files.

    As Hadoop does not specify locality policies for reducers (afaik), the
    sidefiles would likely need to be read over the network. This might be
    faster than what happens now because you avoid some IO and the sort, but I
    am not totally convinced it materially affects total runtime for non-trivial
    examples.

    Do you have any cheap ways to estimate how much savings that gets us in
    these kinds of scenarios?

    D
    On Tue, Aug 30, 2011 at 1:20 PM, Kevin Burton wrote:

    I think this is similar to to the 'merge' join issue not being
    automatically
    supported.

    If we have done a GROUP in the past, this data should have been mapped,
    then
    handed off to the reducers and stored on those nodes.

    They should be nicely tiled and ready for a tiled merge join with 1/Nth of
    the data on each of your nodes (ignoring replicas of course).

    Now if COGROUP comes along, and has the previous GROUP as part of the
    expression, it should be able to use the previous data already reduced and
    on disk as the source of one of the relations.

    The other relation may need to be map reduced first of course.

    I tried to come up with a simple pig script to EXPLAIN the problem. I
    think
    this one will do it:
    foo = LOAD 'group_test1.csv' USING PigStorage(',') AS (col_a:int,
    col_b:int);
    bar = LOAD 'group_test2.csv' USING PigStorage(',') AS (col_a:int,
    col_b:int);
    foo_grouped = GROUP foo BY col_a;
    both_grouped = GROUP foo_grouped BY $0 , bar BY col_a;
    STORE foo_grouped INTO 'foo_grouped';
    STORE both_grouped INTO 'both_grouped';

    It first does a global rearrange to form foo_grouped, but then does it
    again
    to form both_grouped. Which of course seems like no fun.

    …. here is the EXPLAIN (assuming I'm reading it right).


    #-----------------------------------------------
    # New Logical Plan:
    #-----------------------------------------------
    foo_grouped: (Name: LOStore Schema:
    group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    ---foo_grouped: (Name: LOSplitOutput Schema:
    group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    (Name: Constant Type: boolean Uid: 41)

    ---foo_grouped: (Name: LOSplit Schema:
    group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    ---foo_grouped: (Name: LOCogroup Schema:
    group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    col_a:(Name: Project Type: int Uid: 15 Input: 0 Column: 0)

    ---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
    (Name: LOGenerate[false,false] Schema:
    col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
    15]ColumnPrune:OutputUids=[16, 15]
    (Name: Cast Type: int Uid: 15)
    ---col_a:(Name: Project Type: bytearray Uid: 15
    Input: 0 Column: (*))
    (Name: Cast Type: int Uid: 16)
    ---col_b:(Name: Project Type: bytearray Uid: 16
    Input: 1 Column: (*))
    ---(Name: LOInnerLoad[0] Schema: col_a#15:bytearray)

    ---(Name: LOInnerLoad[1] Schema: col_b#16:bytearray)
    ---foo: (Name: LOLoad Schema:
    col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null

    both_grouped: (Name: LOStore Schema:

    group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
    ---both_grouped: (Name: LOCogroup Schema:
    group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
    group:(Name: Project Type: int Uid: 45 Input: 0 Column: 0)
    col_a:(Name: Project Type: int Uid: 17 Input: 1 Column: 0)

    ---foo_grouped: (Name: LOSplitOutput Schema:
    group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    (Name: Constant Type: boolean Uid: 44)

    ---foo_grouped: (Name: LOSplit Schema:
    group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    ---foo_grouped: (Name: LOCogroup Schema:
    group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
    col_a:(Name: Project Type: int Uid: 15 Input: 0 Column:
    0)
    ---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
    (Name: LOGenerate[false,false] Schema:
    col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
    15]ColumnPrune:OutputUids=[16, 15]
    (Name: Cast Type: int Uid: 15)
    ---col_a:(Name: Project Type: bytearray Uid: 15
    Input: 0 Column: (*))
    (Name: Cast Type: int Uid: 16)
    ---col_b:(Name: Project Type: bytearray Uid: 16
    Input: 1 Column: (*))
    ---(Name: LOInnerLoad[0] Schema:
    col_a#15:bytearray)
    ---(Name: LOInnerLoad[1] Schema:
    col_b#16:bytearray)
    ---foo: (Name: LOLoad Schema:
    col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null
    ---bar: (Name: LOForEach Schema: col_a#17:int,col_b#18:int)
    (Name: LOGenerate[false,false] Schema:
    col_a#17:int,col_b#18:int)ColumnPrune:InputUids=[17,
    18]ColumnPrune:OutputUids=[17, 18]
    (Name: Cast Type: int Uid: 17)
    ---col_a:(Name: Project Type: bytearray Uid: 17 Input: 0
    Column: (*))
    (Name: Cast Type: int Uid: 18)
    ---col_b:(Name: Project Type: bytearray Uid: 18 Input: 1
    Column: (*))
    ---(Name: LOInnerLoad[0] Schema: col_a#17:bytearray)

    ---(Name: LOInnerLoad[1] Schema: col_b#18:bytearray)
    ---bar: (Name: LOLoad Schema:
    col_a#17:bytearray,col_b#18:bytearray)RequiredFields:null

    #-----------------------------------------------
    # Physical Plan:
    #-----------------------------------------------
    foo_grouped:

    Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
    - scope-15
    ---foo_grouped: Filter[bag] - scope-13
    Constant(true) - scope-14

    ---foo_grouped: Split - scope-12

    ---foo_grouped: Package[tuple]{int} - scope-9

    ---foo_grouped: Global Rearrange[tuple] - scope-8

    ---foo_grouped: Local Rearrange[tuple]{int}(false) - scope-10
    Project[int][0] - scope-11

    ---foo: New For Each(false,false)[bag] - scope-7

    Cast[int] - scope-2
    ---Project[bytearray][0] - scope-1
    Cast[int] - scope-5
    ---Project[bytearray][1] - scope-4
    ---foo:
    Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
    scope-0

    both_grouped:

    Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
    - scope-32
    ---both_grouped: Package[tuple]{int} - scope-27

    ---both_grouped: Global Rearrange[tuple] - scope-26

    ---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
    Project[int][0] - scope-29

    ---foo_grouped: Filter[bag] - scope-16
    Constant(true) - scope-17

    ---foo_grouped: Split - scope-12

    ---foo_grouped: Package[tuple]{int} - scope-9

    ---foo_grouped: Global Rearrange[tuple] - scope-8

    ---foo_grouped: Local
    Rearrange[tuple]{int}(false) - scope-10
    Project[int][0] - scope-11

    ---foo: New For Each(false,false)[bag] -
    scope-7
    Cast[int] - scope-2
    ---Project[bytearray][0] - scope-1
    Cast[int] - scope-5
    ---Project[bytearray][1] - scope-4
    ---foo:
    Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
    scope-0
    ---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
    Project[int][0] - scope-31

    ---bar: New For Each(false,false)[bag] - scope-25

    Cast[int] - scope-20
    ---Project[bytearray][0] - scope-19
    Cast[int] - scope-23
    ---Project[bytearray][1] - scope-22
    ---bar:
    Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
    scope-18

    2011-08-30 13:10:30,404 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
    File concatenation threshold: 100 optimistic? false
    2011-08-30 13:10:30,492 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 3
    2011-08-30 13:10:30,493 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - Merged 1 map-only splittees.
    2011-08-30 13:10:30,495 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - Merged 1 out of total 3 MR operators.
    2011-08-30 13:10:30,495 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 2
    #--------------------------------------------------
    # Map Reduce Plan
    #--------------------------------------------------
    MapReduce node scope-33
    Map Plan
    foo_grouped: Local Rearrange[tuple]{int}(false) - scope-10
    Project[int][0] - scope-11

    ---foo: New For Each(false,false)[bag] - scope-7

    Cast[int] - scope-2
    ---Project[bytearray][0] - scope-1
    Cast[int] - scope-5
    ---Project[bytearray][1] - scope-4
    ---foo:
    Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
    scope-0--------
    Reduce Plan
    Split - scope-42
    foo_grouped:
    Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
    - scope-15
    Store(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
    - scope-34
    ---foo_grouped: Package[tuple]{int} - scope-9--------
    Global sort: false
    ----------------

    MapReduce node scope-40
    Map Plan
    Union[tuple] - scope-41
    ---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
    Project[int][0] - scope-29
    ---Load(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
    - scope-37
    ---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
    Project[int][0] - scope-31

    ---bar: New For Each(false,false)[bag] - scope-25

    Cast[int] - scope-20
    ---Project[bytearray][0] - scope-19
    Cast[int] - scope-23
    ---Project[bytearray][1] - scope-22
    ---bar:
    Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
    scope-18--------
    Reduce Plan
    both_grouped:

    Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
    - scope-32
    ---both_grouped: Package[tuple]{int} - scope-27--------
    Global sort: false
    ----------------


    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Kevin Burton at Aug 30, 2011 at 10:28 pm

    On Tue, Aug 30, 2011 at 2:24 PM, Dmitriy Ryaboy wrote:

    If I understand correctly, you are objecting to the reshuffle of
    foo_grouped
    based on col_a when it was just shuffled based on col_a.

    I've considered this. We currently don't take existing partitioning into
    account; that would be handy. But we need to evaluate the cost/benefit
    here.
    Yes. Exactly.I think the cost/benefit would be clear. You don't have to
    read it on the source or transfer it over the network again. However, you
    do have to write it to disk on the remote end and there may be situations
    where reduce doesn't need to be written to disk…

    This only works when parallelism is the same for both group operators.
    I would assume this is the default. But I agree. All our jobs run with the
    same parallelism.

    As Hadoop does not specify locality policies for reducers (afaik), the
    sidefiles would likely need to be read over the network. This might be
    faster than what happens now because you avoid some IO and the sort, but I
    am not totally convinced it materially affects total runtime for
    non-trivial
    examples.
    If the side files are / were written to HDFS they would be replicated and
    the locality would them become irrelevant.

    Your mappers would always have the side files.

    With only one elided group by I think this actually might be MORE data than
    required. For example, if you are on machines A, B, C and the first MR was
    ran on A and the partition/reduced data was there and then during the second
    MR job you ended up on B then you would need to read the data from A-B… This
    would require network bandwidth.

    If you had at least three operations, two of which re-used existing data,
    then the network bandwidth wouldn't be wasted. Another upside to writing to
    HDFS is that you don't have to re-run the job if one of the nodes dies..

    Do you have any cheap ways to estimate how much savings that gets us in
    these kinds of scenarios?
    I think it would be obvious. Maybe I'm missing something.

    For multiple GROUPs > 2/3 the savings would be significant. All the file
    IO, CPU, and network IO is removed and you just use one of the partitioned
    relations.

    In my app… this would yield a 5-10x speedup…

    Kevin

    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Dmitriy Ryaboy at Aug 30, 2011 at 11:19 pm
    I am not sure I am following your argument about side files.

    We store the output of the first group-by to HDFS. Let's say we have 100
    reducers in that case, so we create 100 files (each replicated 3x).

    We now read in the second relation and partition it into 100 bins on the
    mappers. We then start 100 reducers (arbitrarily selected among our nodes).
    Each node needs to fetch its map outputs (this is always the case). In this
    optimized workflow, it also needs to fetch one of the 100 files we created
    in the previous job. This will most likely have to go over a network in any
    decent-sized cluster, as we only have 3/n chance of a local read here (n
    being the number of nodes in your cluster). The amount of data we have to
    read off a remote disk and move to the reducer is mostly the same in both
    this and regular scenario. The regular scenario might be a bit worse in the
    case when its mappers do not get started local to the data.

    The regular scenario also loses out on the fact that it does a fair bit of
    extra IO -- it reads the previously generated chunk of data, writes it back
    to disk, and only then ships it. So there is a double IO cost.

    Your proposed optimization also gets some savings from the fact that we
    don't have to perform a sort of the output of the first job, and we can
    simply do a merge join in each reducer.

    Despite all this, my gut feeling is that the 5-10x speedup number is way too
    optimistic. Is that a guess, or backed by an experiment?

    We should probably open a jira, try a quick prototype experiment, and then
    if the prototype is promising, spec out how this can actually be
    implemented.

    D
    On Tue, Aug 30, 2011 at 3:27 PM, Kevin Burton wrote:
    On Tue, Aug 30, 2011 at 2:24 PM, Dmitriy Ryaboy wrote:

    If I understand correctly, you are objecting to the reshuffle of
    foo_grouped
    based on col_a when it was just shuffled based on col_a.

    I've considered this. We currently don't take existing partitioning into
    account; that would be handy. But we need to evaluate the cost/benefit
    here.
    Yes. Exactly.I think the cost/benefit would be clear. You don't have to
    read it on the source or transfer it over the network again. However, you
    do have to write it to disk on the remote end and there may be situations
    where reduce doesn't need to be written to disk…

    This only works when parallelism is the same for both group operators.
    I would assume this is the default. But I agree. All our jobs run with
    the
    same parallelism.

    As Hadoop does not specify locality policies for reducers (afaik), the
    sidefiles would likely need to be read over the network. This might be
    faster than what happens now because you avoid some IO and the sort, but I
    am not totally convinced it materially affects total runtime for
    non-trivial
    examples.
    If the side files are / were written to HDFS they would be replicated and
    the locality would them become irrelevant.

    Your mappers would always have the side files.

    With only one elided group by I think this actually might be MORE data than
    required. For example, if you are on machines A, B, C and the first MR was
    ran on A and the partition/reduced data was there and then during the
    second
    MR job you ended up on B then you would need to read the data from A-B…
    This
    would require network bandwidth.

    If you had at least three operations, two of which re-used existing data,
    then the network bandwidth wouldn't be wasted. Another upside to writing
    to
    HDFS is that you don't have to re-run the job if one of the nodes dies..

    Do you have any cheap ways to estimate how much savings that gets us in
    these kinds of scenarios?
    I think it would be obvious. Maybe I'm missing something.

    For multiple GROUPs > 2/3 the savings would be significant. All the file
    IO, CPU, and network IO is removed and you just use one of the partitioned
    relations.

    In my app… this would yield a 5-10x speedup…

    Kevin

    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Kevin Burton at Aug 31, 2011 at 12:22 am

    On Tue, Aug 30, 2011 at 4:19 PM, Dmitriy Ryaboy wrote:

    I am not sure I am following your argument about side files.

    We store the output of the first group-by to HDFS. Let's say we have 100
    reducers in that case, so we create 100 files (each replicated 3x).

    We now read in the second relation and partition it into 100 bins on the
    mappers. We then start 100 reducers (arbitrarily selected among our nodes).
    Each node needs to fetch its map outputs (this is always the case). In this
    optimized workflow, it also needs to fetch one of the 100 files we created
    in the previous job. This will most likely have to go over a network in any
    decent-sized cluster, as we only have 3/n chance of a local read here (n
    being the number of nodes in your cluster). The amount of data we have to
    read off a remote disk and move to the reducer is mostly the same in both
    this and regular scenario. The regular scenario might be a bit worse in the
    case when its mappers do not get started local to the data.
    Ah….. yes. This is a good point. This seems to be a flaw in the mapred
    scheduler but I imagine you would have ti hint it ahead of time by that you
    will need access to these files so that the map jobs line up where the
    chunks are written.

    It is further complicated by the fact the the only 'pure' machine to read
    the data from is the original box as if this machine fails you may need to
    read blocks from other machines and these might not line up with the same
    blocks from the current job or even be on the same hosts.

    The regular scenario also loses out on the fact that it does a fair bit of
    extra IO -- it reads the previously generated chunk of data, writes it back
    to disk, and only then ships it. So there is a double IO cost.
    I also think it is impacted by the CPI … in our situation it's CPU
    intensive.

    Your proposed optimization also gets some savings from the fact that we
    don't have to perform a sort of the output of the first job, and we can
    simply do a merge join in each reducer.
    yes. I think this is the main speed up as I see it.

    So there are really two options (this is the extra work involved).

    Option 1 (the current option)

    - perform all the IO on the original file
    - sort all the data
    - send all the data over the network to the reducers

    Option 2:
    - read data from potentially the incorrect hosts over the network during
    the reduce phase.

    … Option 2 still seems better. Other than HDFS replication, the network
    shouldn't use as much bandwidth during this phase but it will still 'slow us
    down' but I don' think as much as option 1.

    Despite all this, my gut feeling is that the 5-10x speedup number is way too
    optimistic. Is that a guess, or backed by an experiment?
    Mea culpa … I should have explained a bit more.

    The 5-10x speedup is because we perform this step repeatedly.

    We first do a initial group… then a number of recurring cogroups… usually
    5-10x but this is just for our initial prototype. It could be much more in
    production.

    We should probably open a jira, try a quick prototype experiment, and then
    if the prototype is promising, spec out how this can actually be
    implemented.
    I think a modification of my original .pig script (but maybe with a
    significant amount of data) could be used to help prove out the performance
    difference.

    Kevin

    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Dmitriy Ryaboy at Aug 31, 2011 at 12:38 am
    So the approach to doing this would be as follows:

    * test the benefit in a realistic scenario, perhaps using some quick custom
    scripts using streaming or plain MR

    Assuming the benefits are substantial:

    * introduce a copartitioned join type, which is a map-side join which works
    when:
    -- all relations being joined produce the same number of splits
    -- there is a guarantee that all instances of a join key are in the same
    split
    -- there is a guarantee that the same key will be found in the same split
    index for all relations

    * introduce a copartitioned merge join type, which adds another constraint:
    -- records in each split of each relation are sorted on the join key

    (we may want to only do copartitioned merge.. though the copartitioned impl
    may be useful in certain cases as well).

    At that point, we can manually test the speedup in the pig context by
    forcing these joins on materialized intermediate data.

    The next step would be doing this automatically by tracing partitioning
    functions, keys, and parallelism factors for all relations, and deriving the
    fact that the join applies.

    This is going to be a lot of work, someone's going to have to be really
    motivated to do it. We've had good luck with GSoC students for that sort of
    thing :-).

    D
    On Tue, Aug 30, 2011 at 5:21 PM, Kevin Burton wrote:
    On Tue, Aug 30, 2011 at 4:19 PM, Dmitriy Ryaboy wrote:

    I am not sure I am following your argument about side files.

    We store the output of the first group-by to HDFS. Let's say we have 10
    reducers in that case, so we create 100 files (each replicated 3x).

    We now read in the second relation and partition it into 100 bins on the
    mappers. We then start 100 reducers (arbitrarily selected among our nodes).
    Each node needs to fetch its map outputs (this is always the case). In this
    optimized workflow, it also needs to fetch one of the 100 files we created
    in the previous job. This will most likely have to go over a network in any
    decent-sized cluster, as we only have 3/n chance of a local read here (n
    being the number of nodes in your cluster). The amount of data we have to
    read off a remote disk and move to the reducer is mostly the same in both
    this and regular scenario. The regular scenario might be a bit worse in the
    case when its mappers do not get started local to the data.
    Ah….. yes. This is a good point. This seems to be a flaw in the mapred
    scheduler but I imagine you would have ti hint it ahead of time by that you
    will need access to these files so that the map jobs line up where the
    chunks are written.

    It is further complicated by the fact the the only 'pure' machine to read
    the data from is the original box as if this machine fails you may need to
    read blocks from other machines and these might not line up with the same
    blocks from the current job or even be on the same hosts.

    The regular scenario also loses out on the fact that it does a fair bit of
    extra IO -- it reads the previously generated chunk of data, writes it back
    to disk, and only then ships it. So there is a double IO cost.
    I also think it is impacted by the CPI … in our situation it's CPU
    intensive.

    Your proposed optimization also gets some savings from the fact that we
    don't have to perform a sort of the output of the first job, and we can
    simply do a merge join in each reducer.
    yes. I think this is the main speed up as I see it.

    So there are really two options (this is the extra work involved).

    Option 1 (the current option)

    - perform all the IO on the original file
    - sort all the data
    - send all the data over the network to the reducers

    Option 2:
    - read data from potentially the incorrect hosts over the network during
    the reduce phase.

    … Option 2 still seems better. Other than HDFS replication, the network
    shouldn't use as much bandwidth during this phase but it will still 'slow
    us
    down' but I don' think as much as option 1.

    Despite all this, my gut feeling is that the 5-10x speedup number is way
    too
    optimistic. Is that a guess, or backed by an experiment?
    Mea culpa … I should have explained a bit more.

    The 5-10x speedup is because we perform this step repeatedly.

    We first do a initial group… then a number of recurring cogroups… usually
    5-10x but this is just for our initial prototype. It could be much more in
    production.

    We should probably open a jira, try a quick prototype experiment, and then
    if the prototype is promising, spec out how this can actually be
    implemented.
    I think a modification of my original .pig script (but maybe with a
    significant amount of data) could be used to help prove out the performance
    difference.

    Kevin

    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Kevin Burton at Aug 31, 2011 at 1:09 am


    This is going to be a lot of work, someone's going to have to be really
    motivated to do it. We've had good luck with GSoC students for that sort of
    thing :-).
    Because they're they only ones where fixing Pig bugs isn't a distraction
    over their main job :-P


    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Kevin Burton at Aug 31, 2011 at 1:15 am

    On Tue, Aug 30, 2011 at 5:37 PM, Dmitriy Ryaboy wrote:

    So the approach to doing this would be as follows:

    * test the benefit in a realistic scenario, perhaps using some quick custom
    scripts using streaming or plain MR

    I might actually have to do this as the current solution might be impossible
    to work with… if I have to abandon Pig (at least or this project) and
    develop it in plain MR I could potentially implement both forms.

    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*
  • Kevin Burton at Aug 31, 2011 at 11:00 pm
    I was wrong about this:

    - perform all the IO on the original file
    - sort all the data
    - send all the data over the network to the reducers

    Option 2:
    - read data from potentially the incorrect hosts over the network during
    the reduce phase.

    I think these are the real steps:

    - read all the blocks off disk on the source nodes and send them to mappers
    - sort the data / rewriting them to disk during the sort if necessary
    - send all the data to the reducers over the network
    - write the data to disk on the reducers

    vs

    - read all the data off disk from the previous reduction phase
    - potentially (with a high probability) send it per the network during the
    group by …

    It seems that we could setup a more simple benchmark by measuring each phase
    individually and then benchmark the performance advantage.


    --

    Founder/CEO Spinn3r.com

    Location: *San Francisco, CA*
    Skype: *burtonator*

    Skype-in: *(415) 871-0687*

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categoriespig, hadoop
postedAug 30, '11 at 8:21p
activeAug 31, '11 at 11:00p
posts9
users2
websitepig.apache.org

2 users in discussion

Kevin Burton: 6 posts Dmitriy Ryaboy: 3 posts

People

Translate

site design / logo © 2021 Grokbase