FAQ
Hello,

I'm trying to run the following query where m1 and m2 have the same data
(>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
join. It ends up running 269 map jobs and 1 reduce job. The map jobs
complete but the reduce job just runs on one process on one of the hadoop
nodes at 100% cpu utilization and just slowly increases in memory
consumption. The reduce job never goes beyond 82% complete despite letting
it run for a day.

I am running on 0.5.0 based on this morning's trunk.

insert overwrite table foo1
select m1.id as id_1, m2.id as id_2, count(1), m1.dt
from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt = m2.dt
group by m1.id, m2.id, m1.dt;

Any input would be appreciated.

Search Discussions

  • Namit Jain at Nov 4, 2009 at 12:48 am
    Get the join condition in the on condition:

    insert overwrite table foo1
    select m1.id<http://m1.id> as id_1, m2.id<http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id> <> m2.id<http://m2.id> and m1.id<http://m1.id> < m2.id<http://m2.id> group by m1.id<http://m1.id>, m2.id<http://m2.id>, m1.dt;



    From: Defenestrator
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org
    Subject: Self join problem

    Hello,

    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.

    I am running on 0.5.0 based on this morning's trunk.

    insert overwrite table foo1
    select m1.id<http://m1.id> as id_1, m2.id<http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 where m1.id<http://m1.id> <> m2.id<http://m2.id> and m1.id<http://m1.id> < m2.id<http://m2.id> and m1.dt = m2.dt group by m1.id<http://m1.id>, m2.id<http://m2.id>, m1.dt;

    Any input would be appreciated.
  • Defenestrator at Nov 4, 2009 at 2:46 am
    Hi Namit,

    Thanks for your suggestion.

    I tried changing the query as you had suggested by moving the m1.dt = m2.dt
    to the on clause. It increased the number of reduce jobs to 2. So now
    there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?
    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:

    Get the join condition in the on condition:



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id < m2.idgroup by
    m1.id, m2.id, m1.dt;







    *From:* Defenestrator
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* hive-user@hadoop.apache.org
    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt = m2.dt
    group by m1.id, m2.id, m1.dt;



    Any input would be appreciated.
  • Namit Jain at Nov 4, 2009 at 2:54 am
    The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks



    From: Defenestrator
    Sent: Tuesday, November 03, 2009 6:46 PM
    To: hive-user@hadoop.apache.org
    Subject: Re: Self join problem

    Hi Namit,

    Thanks for your suggestion.

    I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query?

    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:
    Get the join condition in the on condition:

    insert overwrite table foo1
    select m1.id<http://m1.id> as id_1, m2.id<http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id> <> m2.id<http://m2.id> and m1.id<http://m1.id> < m2.id<http://m2.id> group by m1.id<http://m1.id>, m2.id<http://m2.id>, m1.dt;



    From: Defenestrator
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org
    Subject: Self join problem

    Hello,

    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.

    I am running on 0.5.0 based on this morning's trunk.

    insert overwrite table foo1
    select m1.id<http://m1.id> as id_1, m2.id<http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 where m1.id<http://m1.id> <> m2.id<http://m2.id> and m1.id<http://m1.id> < m2.id<http://m2.id> and m1.dt = m2.dt group by m1.id<http://m1.id>, m2.id<http://m2.id>, m1.dt;

    Any input would be appreciated.
  • Defenestrator at Nov 4, 2009 at 3:45 am
    I was able to increase the number of reduce jobs manually to 32. However,
    it finishes 28 of them and the other 4 has the same behavior of using 100%
    cpu and consuming a lot of memory. I'm suspecting that it might be an issue
    with the reduce job itself - is there a way to figure out what these jobs
    are doing exactly?

    Thanks.
    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain wrote:

    The number of reducers are inferred from the input data size. But, you
    can always overwrite it by setting mapred.reduce.tasks







    *From:* Defenestrator
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* hive-user@hadoop.apache.org
    *Subject:* Re: Self join problem



    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt = m2.dt
    to the on clause. It increased the number of reduce jobs to 2. So now
    there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:

    Get the join condition in the on condition:



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id < m2.idgroup by
    m1.id, m2.id, m1.dt;







    *From:* Defenestrator
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* hive-user@hadoop.apache.org
    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt = m2.dt
    group by m1.id, m2.id, m1.dt;



    Any input would be appreciated.

  • Ryan LeCompte at Nov 4, 2009 at 3:56 am
    I've had a similar issue with a small cluster. Is there any way that
    you can reduce the size of the data being joined on both sides? If you
    search the forums for join issue, you will see the thread for my issue
    and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator
    wrote:
    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same
    behavior of using 100% cpu and consuming a lot of memory. I'm
    suspecting that it might be an issue with the reduce job itself - is
    there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain wrote:
    The number of reducers are inferred from the input data size. But,
    you can always overwrite it by setting mapred.reduce.tasks







    From: Defenestrator
    Sent: Tuesday, November 03, 2009 6:46 PM


    To: hive-user@hadoop.apache.org
    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt
    = m2.dt to the on clause. It increased the number of reduce jobs to
    2. So now there are two processes running on two nodes at 100%
    consuming a lot of memory. Is there a reason why hive doesn't spawn
    more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:

    Get the join condition in the on condition:



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id <
    m2.id group by m1.id, m2.id, m1.dt;







    From: Defenestrator
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org
    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same
    data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying
    to do a self join. It ends up running 269 map jobs and 1 reduce
    job. The map jobs complete but the reduce job just runs on one
    process on one of the hadoop nodes at 100% cpu utilization and just
    slowly increases in memory consumption. The reduce job never goes
    beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt =
    m2.dt group by m1.id, m2.id, m1.dt;



    Any input would be appreciated.


  • Defenestrator at Nov 5, 2009 at 1:22 am
    Thanks for the pointer, Ryan. I tried to reduce the amount of data (down to
    880k rows) as suggested by the thread. Unfortunately, that still doesn't
    helped.

    I've poked around some more on this problem. My query is essentially doing
    a cartesian product for each dt, which is what I'm intending to do. And I
    see a reference (http://issues.apache.org/jira/browse/HIVE-556) that
    indicates cartesian products produce sub-optimal plans because it will get
    processed by a single reducer. Is there a reason why such plans aren't
    parallelized across multiple reducers?

    Here's the EXPLAIN of the query:

    ABSTRACT SYNTAX TREE:
    (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF m1) (TOK_TABREF m2) (= (.
    (TOK_TABLE_OR_COL m1) dt) (. (TOK_TABLE_OR_COL m2) dt)))) (TOK_INSERT
    (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (.
    (TOK_TABLE_OR_COL m1) id) id_1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL m2) id)
    id_2) (TOK_SELEXPR (TOK_FUNCTION count 1)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL
    m1) dt))) (TOK_WHERE (and (<> (. (TOK_TABLE_OR_COL m1) id) (.
    (TOK_TABLE_OR_COL m2) id)) (< (. (TOK_TABLE_OR_COL m1) id) (.
    (TOK_TABLE_OR_COL m2) id)))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL m1) id) (.
    (TOK_TABLE_OR_COL m2) id) (. (TOK_TABLE_OR_COL m1) dt))))

    STAGE DEPENDENCIES:
    Stage-1 is a root stage
    Stage-2 depends on stages: Stage-1
    Stage-0 is a root stage

    STAGE PLANS:
    Stage: Stage-1
    Map Reduce
    Alias -> Map Operator Tree:
    m1
    TableScan
    alias: m1
    Reduce Output Operator
    key expressions:
    expr: dt
    type: bigint
    sort order: +
    Map-reduce partition columns:
    expr: dt
    type: bigint
    tag: 0
    value expressions:
    expr: id
    type: int
    expr: dt
    type: bigint
    m2
    TableScan
    alias: m2
    Reduce Output Operator
    key expressions:
    expr: dt
    type: bigint
    sort order: +
    Map-reduce partition columns:
    expr: dt
    type: bigint
    tag: 1
    value expressions:
    expr: id
    type: int
    Reduce Operator Tree:
    Join Operator
    condition map:
    Inner Join 0 to 1
    condition expressions:
    0 {VALUE._col1} {VALUE._col2}
    1 {VALUE._col1}
    outputColumnNames: _col1, _col2, _col4
    Filter Operator
    predicate:
    expr: ((_col1 <> _col4) and (_col1 < _col4))
    type: boolean
    Select Operator
    expressions:
    expr: _col1
    type: int
    expr: _col4
    type: int
    expr: _col2
    type: bigint
    outputColumnNames: _col1, _col4, _col2
    Group By Operator
    aggregations:
    expr: count(1)
    keys:
    expr: _col1
    type: int
    expr: _col4
    type: int
    expr: _col2
    type: bigint
    mode: hash
    outputColumnNames: _col0, _col1, _col2, _col3
    File Output Operator
    compressed: false
    GlobalTableId: 0
    table:
    input format:
    org.apache.hadoop.mapred.SequenceFileInputFormat
    output format:
    org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

    Stage: Stage-2
    Map Reduce
    Alias -> Map Operator Tree:
    hdfs://hippo-0:5555/tmp/hive-defenestrator/609733444/10002
    Reduce Output Operator
    key expressions:
    expr: _col0
    type: int
    expr: _col1
    type: int
    expr: _col2
    type: bigint
    sort order: +++
    Map-reduce partition columns:
    expr: _col0
    type: int
    expr: _col1
    type: int
    expr: _col2
    type: bigint
    tag: -1
    value expressions:
    expr: _col3
    type: bigint
    Reduce Operator Tree:
    Group By Operator
    aggregations:
    expr: count(VALUE._col0)
    keys:
    expr: KEY._col0
    type: int
    expr: KEY._col1
    type: int
    expr: KEY._col2
    type: bigint
    mode: mergepartial
    outputColumnNames: _col0, _col1, _col2, _col3
    Select Operator
    expressions:
    expr: _col0
    type: int
    expr: _col1
    type: int
    expr: _col3
    type: bigint
    expr: _col2
    type: bigint
    outputColumnNames: _col0, _col1, _col2, _col3
    File Output Operator
    compressed: false
    GlobalTableId: 0
    table:
    input format: org.apache.hadoop.mapred.TextInputFormat
    output format:
    org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

    Stage: Stage-0
    Fetch Operator
    limit: -1


    Time taken: 3.026 seconds

    On Tue, Nov 3, 2009 at 7:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you
    can reduce the size of the data being joined on both sides? If you search
    the forums for join issue, you will see the thread for my issue and get some
    tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator wrote:

    I was able to increase the number of reduce jobs manually to 32. However,
    it finishes 28 of them and the other 4 has the same behavior of using 100%
    cpu and consuming a lot of memory. I'm suspecting that it might be an issue
    with the reduce job itself - is there a way to figure out what these jobs
    are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < <njain@facebook.com>
    njain@facebook.com> wrote:
    The number of reducers are inferred from the input data size. But, you
    can always overwrite it by setting mapred.reduce.tasks







    *From:* Defenestrator [mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* <hive-user@hadoop.apache.org>hive-user@hadoop.apache.org
    *Subject:* Re: Self join problem



    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt =
    m2.dt to the on clause. It increased the number of reduce jobs to 2. So
    now there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < <njain@facebook.com>
    njain@facebook.com> wrote:

    Get the join condition in the on condition:



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id < m2.idgroup by
    m1.id, m2.id, m1.dt;







    *From:* Defenestrator [mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* <hive-user@hadoop.apache.org>hive-user@hadoop.apache.org
    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt = m2.dt
    group by m1.id, m2.id, m1.dt;



    Any input would be appreciated.

  • Ryan LeCompte at Nov 10, 2009 at 11:54 pm
    Any thoughts on this? I've only had luck by reducing the data on each
    side of the join. Is this something Hive might be able to improve in a
    future release of the query plan optimization?

    Thanks,
    Ryan


    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that
    you can reduce the size of the data being joined on both sides? If
    you search the forums for join issue, you will see the thread for my
    issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator <defenestrationism@gmail.com
    wrote:
    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same
    behavior of using 100% cpu and consuming a lot of memory. I'm
    suspecting that it might be an issue with the reduce job itself -
    is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain <njain@facebook.com>
    wrote:
    The number of reducers are inferred from the input data size. But,
    you can always overwrite it by setting mapred.reduce.tasks







    From: Defenestrator
    Sent: Tuesday, November 03, 2009 6:46 PM


    To: hive-user@hadoop.apache.org
    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt
    = m2.dt to the on clause. It increased the number of reduce jobs
    to 2. So now there are two processes running on two nodes at 100%
    consuming a lot of memory. Is there a reason why hive doesn't
    spawn more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain <njain@facebook.com>
    wrote:

    Get the join condition in the on condition:



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id <
    m2.id group by m1.id, m2.id, m1.dt;







    From: Defenestrator
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org
    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same
    data (>29M rows) on a 3-node hadoop cluster. I'm essentially
    trying to do a self join. It ends up running 269 map jobs and 1
    reduce job. The map jobs complete but the reduce job just runs on
    one process on one of the hadoop nodes at 100% cpu utilization and
    just slowly increases in memory consumption. The reduce job never
    goes beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt =
    m2.dt group by m1.id, m2.id, m1.dt;



    Any input would be appreciated.


  • Defenestrator at Nov 11, 2009 at 2:36 am
    I would definitely appreciate any insights on this from the list. I tried
    to reduce the query down to something that is easily understood and hive
    still demonstrates a pretty poor join performance behavior on a three-node
    hadoop cluster.

    drop table m1;

    drop table foo1;

    create table m1 (
    mid int,

    aid int,

    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;
    create table foo1 (
    aid_1 int,

    aid_2 int,

    mid bigint,

    dt bigint

    );

    set mapred.reduce.tasks=32;
    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt

    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid,
    m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the
    benchmark (
    http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)
    and hive seems to be able to join much bigger data sets. And I tried
    running the same query on a single node dbms on my desktop, and it's able to
    return results in less than 3-minutes. While hive has been running for at
    least 20 minutes now.

    Thanks.
    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:

    Any thoughts on this? I've only had luck by reducing the data on each side
    of the join. Is this something Hive might be able to improve in a future
    release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you
    can reduce the size of the data being joined on both sides? If you search
    the forums for join issue, you will see the thread for my issue and get some
    tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator < <defenestrationism@gmail.com>
    defenestrationism@gmail.com> wrote:

    I was able to increase the number of reduce jobs manually to 32. However,
    it finishes 28 of them and the other 4 has the same behavior of using 100%
    cpu and consuming a lot of memory. I'm suspecting that it might be an issue
    with the reduce job itself - is there a way to figure out what these jobs
    are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < <njain@facebook.com><njain@facebook.com>
    njain@facebook.com> wrote:
    The number of reducers are inferred from the input data size. But, you
    can always overwrite it by setting mapred.reduce.tasks







    *From:* Defenestrator [mailto: <defenestrationism@gmail.com><defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* <hive-user@hadoop.apache.org> <hive-user@hadoop.apache.org>
    hive-user@hadoop.apache.org
    *Subject:* Re: Self join problem



    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt =
    m2.dt to the on clause. It increased the number of reduce jobs to 2. So
    now there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < <njain@facebook.com><njain@facebook.com>
    njain@facebook.com> wrote:

    Get the join condition in the on condition:



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id < m2.idgroup by
    m1.id, m2.id, m1.dt;







    *From:* Defenestrator [mailto: <defenestrationism@gmail.com><defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* <hive-user@hadoop.apache.org> <hive-user@hadoop.apache.org>
    hive-user@hadoop.apache.org
    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt = m2.dt
    group by m1.id, m2.id, m1.dt;



    Any input would be appreciated.

  • Ning Zhang at Nov 11, 2009 at 5:04 am
    Can you reattach the data file containing 100k rows? We will take a look at it.

    Ning

    On Nov 10, 2009, at 6:35 PM, Defenestrator wrote:

    I would definitely appreciate any insights on this from the list. I tried to reduce the query down to something that is easily understood and hive still demonstrates a pretty poor join performance behavior on a three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the benchmark (http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) and hive seems to be able to join much bigger data sets. And I tried running the same query on a single node dbms on my desktop, and it's able to return results in less than 3-minutes. While hive has been running for at least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:
    Any thoughts on this? I've only had luck by reducing the data on each side of the join. Is this something Hive might be able to improve in a future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you can reduce the size of the data being joined on both sides? If you search the forums for join issue, you will see the thread for my issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator wrote:

    I was able to increase the number of reduce jobs manually to 32. However, it finishes 28 of them and the other 4 has the same behavior of using 100% cpu and consuming a lot of memory. I'm suspecting that it might be an issue with the reduce job itself - is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain wrote:
    The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks



    From: Defenestrator [mailto: defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 6:46 PM

    To: hive-user@hadoop.apache.org
    Subject: Re: Self join problem


    Hi Namit,

    Thanks for your suggestion.

    I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query?

    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:
    Get the join condition in the on condition:

    insert overwrite table foo1
    select m1.id<http://m1.id/> as id_1, m2.id<http://m2.id/> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id/> <> m2.id<http://m2.id/> and m1.id<http://m1.id/> < m2.id<http://m2.id/> group by m1.id<http://m1.id/>, m2.id<http://m2.id/>, m1.dt;



    From: Defenestrator [mailto: defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org
    Subject: Self join problem

    Hello,

    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.

    I am running on 0.5.0 based on this morning's trunk.

    insert overwrite table foo1
    select m1.id<http://m1.id/> as id_1, m2.id<http://m2.id/> as id_2, count(1), m1.dt
    from m1 join m2 where m1.id<http://m1.id/> <> m2.id<http://m2.id/> and m1.id<http://m1.id/> < m2.id<http://m2.id/> and m1.dt = m2.dt group by m1.id<http://m1.id/>, m2.id<http://m2.id/>, m1.dt;

    Any input would be appreciated.
  • Defenestrator at Nov 11, 2009 at 7:23 am
    Here it is.
    On Tue, Nov 10, 2009 at 9:03 PM, Ning Zhang wrote:

    Can you reattach the data file containing 100k rows? We will take a look at
    it.

    Ning


    On Nov 10, 2009, at 6:35 PM, Defenestrator wrote:

    I would definitely appreciate any insights on this from the list. I tried
    to reduce the query down to something that is easily understood and hive
    still demonstrates a pretty poor join performance behavior on a three-node
    hadoop cluster.

    drop table m1;

    drop table foo1;

    create table m1 (
    mid int,

    aid int,

    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;
    create table foo1 (
    aid_1 int,

    aid_2 int,

    mid bigint,

    dt bigint

    );

    set mapred.reduce.tasks=32;
    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt

    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid,
    m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the
    benchmark (
    http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)
    and hive seems to be able to join much bigger data sets. And I tried
    running the same query on a single node dbms on my desktop, and it's able to
    return results in less than 3-minutes. While hive has been running for at
    least 20 minutes now.

    Thanks.
    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:

    Any thoughts on this? I've only had luck by reducing the data on each
    side of the join. Is this something Hive might be able to improve in a
    future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that
    you can reduce the size of the data being joined on both sides? If you
    search the forums for join issue, you will see the thread for my issue and
    get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator <<defenestrationism@gmail.com>
    defenestrationism@gmail.com> wrote:

    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same behavior of
    using 100% cpu and consuming a lot of memory. I'm suspecting that it might
    be an issue with the reduce job itself - is there a way to figure out what
    these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < <njain@facebook.com><njain@facebook.com>
    njain@facebook.com> wrote:
    The number of reducers are inferred from the input data size. But, you
    can always overwrite it by setting mapred.reduce.tasks




    *From:* Defenestrator [mailto: <defenestrationism@gmail.com><defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* <hive-user@hadoop.apache.org> <hive-user@hadoop.apache.org>
    hive-user@hadoop.apache.org
    *Subject:* Re: Self join problem



    Hi Namit,


    Thanks for your suggestion.


    I tried changing the query as you had suggested by moving the m1.dt =
    m2.dt to the on clause. It increased the number of reduce jobs to 2. So
    now there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?


    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < <njain@facebook.com><njain@facebook.com>
    njain@facebook.com> wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 on m1.dt=m2.dt where m1.id <> m2.id and m1.id < m2.idgroup by
    m1.id, m2.id, m1.dt;




    *From:* Defenestrator [mailto: <defenestrationism@gmail.com><defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* <hive-user@hadoop.apache.org> <hive-user@hadoop.apache.org>
    hive-user@hadoop.apache.org
    *Subject:* Self join problem


    Hello,


    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.


    I am running on 0.5.0 based on this morning's trunk.


    insert overwrite table foo1

    select m1.id as id_1, m2.id as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <> m2.id and m1.id < m2.id and m1.dt = m2.dt
    group by m1.id, m2.id, m1.dt;


    Any input would be appreciated.
  • Namit Jain at Nov 11, 2009 at 5:07 am
    I think you missed the attachment.


    Which job is taking more time - join or group by ?

    Can you send the data characteristics for m1 and foo1 - is it possible that there is a large skew on aid and dt which is forcing the data to be send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" wrote:

    I would definitely appreciate any insights on this from the list. I tried to reduce the query down to something that is easily understood and hive still demonstrates a pretty poor join performance behavior on a three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the benchmark (http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) and hive seems to be able to join much bigger data sets. And I tried running the same query on a single node dbms on my desktop, and it's able to return results in less than 3-minutes. While hive has been running for at least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:
    Any thoughts on this? I've only had luck by reducing the data on each side of the join. Is this something Hive might be able to improve in a future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you can reduce the size of the data being joined on both sides? If you search the forums for join issue, you will see the thread for my issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator wrote:

    I was able to increase the number of reduce jobs manually to 32. However, it finishes 28 of them and the other 4 has the same behavior of using 100% cpu and consuming a lot of memory. I'm suspecting that it might be an issue with the reduce job itself - is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain wrote:
    The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks




    From: Defenestrator [mailto: defenestrationism@gmail.com]
    Sent: Tuesday, November 03, 2009 6:46 PM

    To: hive-user@hadoop.apache.org
    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id <http://m1.id><http://m1.id> as id_1, m2.id <http://m2.id><http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id <http://m1.id><http://m1.id> <> m2.id <http://m2.id><http://m2.id> and m1.id <http://m1.id><http://m1.id> < m2.id <http://m2.id><http://m2.id> group by m1.id <http://m1.id><http://m1.id> , m2.id <http://m2.id><http://m2.id> , m1.dt;




    From: Defenestrator [mailto: defenestrationism@gmail.com]
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org
    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id <http://m1.id><http://m1.id> as id_1, m2.id <http://m2.id><http://m2.id> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <http://m1.id><http://m1.id> <> m2.id <http://m2.id><http://m2.id> and m1.id <http://m1.id><http://m1.id> < m2.id <http://m2.id><http://m2.id> and m1.dt = m2.dt group by m1.id <http://m1.id><http://m1.id> , m2.id <http://m2.id><http://m2.id> , m1.dt;



    Any input would be appreciated.
  • Defenestrator at Nov 11, 2009 at 7:19 am
    Definitely the join portion of the plan. The one reduce job takes over 2
    1/2 hours to complete doing the following:


    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows
    2009-11-10 18:14:30,560 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000
    rows
    2009-11-10 18:14:31,431 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows
    2009-11-10 18:14:31,431 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000
    rows
    2009-11-10 18:14:32,384 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows

    ...

    2009-11-10 20:53:19,459 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000
    rows
    2009-11-10 20:53:19,459 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000
    rows
    2009-11-10 20:53:20,374 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000
    rows
    2009-11-10 20:53:20,374 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding
    10000000000 rows


    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain wrote:

    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it possible that
    there is a large skew on aid and dt which is forcing the data to be send to
    a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" wrote:

    I would definitely appreciate any insights on this from the list. I
    tried to reduce the query down to something that is easily understood and
    hive still demonstrates a pretty poor join performance behavior on a
    three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid,
    m2.aid, m1.dt;


    Attached is the file I'm using that only has 100k rows. I've looked at the
    benchmark (
    http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)and hive seems to be able to join much bigger data sets. And I tried
    running the same query on a single node dbms on my desktop, and it's able to
    return results in less than 3-minutes. While hive has been running for at
    least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:

    Any thoughts on this? I've only had luck by reducing the data on each
    side of the join. Is this something Hive might be able to improve in a
    future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you
    can reduce the size of the data being joined on both sides? If you search
    the forums for join issue, you will see the thread for my issue and get some
    tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator <
    <defenestrationism@gmail.com>
    defenestrationism@gmail.com> wrote:

    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same behavior of
    using 100% cpu and consuming a lot of memory. I'm suspecting that it might
    be an issue with the reduce job itself - is there a way to figure out what
    these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < <njain@facebook.com>
    <njain@facebook.com> njain@facebook.com>
    wrote:

    The number of reducers are inferred from the input data size. But, you can
    always overwrite it by setting mapred.reduce.tasks




    *From:* Defenestrator [mailto: defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* <hive-user@hadoop.apache.org>
    <hive-user@hadoop.apache.org>
    hive-user@hadoop.apache.org

    *Subject:* Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt = m2.dt
    to the on clause. It increased the number of reduce jobs to 2. So now
    there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < <njain@facebook.com>
    <njain@facebook.com> njain@facebook.com>
    wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id <http://m1.id> <http://m1.id/> as id_1, m2.id <http://m2.id><http://m2.id/> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id <http://m1.id> <http://m1.id/><>
    m2.id <http://m2.id> <http://m2.id/> and m1.id <http://m1.id><http://m1.id/> <
    m2.id <http://m2.id> <http://m2.id/> group by m1.id <http://m1.id><http://m1.id/>,
    m2.id <http://m2.id> <http://m2.id/> , m1.dt;




    *From:* Defenestrator [mailto: defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* <hive-user@hadoop.apache.org>
    <hive-user@hadoop.apache.org>
    hive-user@hadoop.apache.org

    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id <http://m1.id> <http://m1.id/> as id_1, m2.id <http://m2.id><http://m2.id/> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id <http://m1.id> <http://m1.id/> <> m2.id
    <http://m2.id> <http://m2.id/> and m1.id <http://m1.id> <http://m1.id/> <
    m2.id <http://m2.id> <http://m2.id/> and m1.dt = m2.dt group by m1.id
    <http://m1.id> <http://m1.id/> , m2.id <http://m2.id> <http://m2.id/> ,
    m1.dt;




    Any input would be appreciated.



  • Ning Zhang at Nov 11, 2009 at 6:38 pm
    The problem is in the data loading process: the m1 file is a plain text CSV format and you are loading it to a Hive table with the default setting, which assumes fields are separated by ctl_A. So if you look at the first 10 rows, all fields are NULL since Hive cannot find ctl+A in a row. So your query is actually doing a cartesian product of 100k x 100k rows.

    Since the 'Load data' command doesn't check the input format not does it transform the format, you need to specify the input format in the create table DDL. Following is a working example. It finishes in my unit test (single machine) in less than 3 mins.

    8<--------
    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string)
    row format delimited fields terminated by ','
    stored as textfile;

    LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;

    select * from m1 limit 10;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );


    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;
    8<------------

    On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:

    Definitely the join portion of the plan. The one reduce job takes over 2 1/2 hours to complete doing the following:


    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000 rows
    2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows

    ...

    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000 rows
    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000 rows



    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain wrote:
    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it possible that there is a large skew on aid and dt which is forcing the data to be send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" wrote:

    I would definitely appreciate any insights on this from the list. I tried to reduce the query down to something that is easily understood and hive still demonstrates a pretty poor join performance behavior on a three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the benchmark (http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) and hive seems to be able to join much bigger data sets. And I tried running the same query on a single node dbms on my desktop, and it's able to return results in less than 3-minutes. While hive has been running for at least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:
    Any thoughts on this? I've only had luck by reducing the data on each side of the join. Is this something Hive might be able to improve in a future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you can reduce the size of the data being joined on both sides? If you search the forums for join issue, you will see the thread for my issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator wrote:

    I was able to increase the number of reduce jobs manually to 32. However, it finishes 28 of them and the other 4 has the same behavior of using 100% cpu and consuming a lot of memory. I'm suspecting that it might be an issue with the reduce job itself - is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain wrote:
    The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks




    From: Defenestrator [mailto: <mailto:defenestrationism@gmail.com <mailto:defenestrationism@gmail.com defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 6:46 PM

    To: hive-user@hadoop.apache.org

    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id<http://m1.id/> <http://m1.id><http://m1.id/> as id_1, m2.id<http://m2.id/> <http://m2.id><http://m2.id/> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id/> <http://m1.id><http://m1.id/> <> m2.id<http://m2.id/> <http://m2.id><http://m2.id/> and m1.id<http://m1.id/> <http://m1.id><http://m1.id/> < m2.id<http://m2.id/> <http://m2.id><http://m2.id/> group by m1.id<http://m1.id/> <http://m1.id><http://m1.id/> , m2.id<http://m2.id/> <http://m2.id><http://m2.id/> , m1.dt;




    From: Defenestrator [mailto: <mailto:defenestrationism@gmail.com <mailto:defenestrationism@gmail.com defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: hive-user@hadoop.apache.org

    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id<http://m1.id/> <http://m1.id><http://m1.id/> as id_1, m2.id<http://m2.id/> <http://m2.id><http://m2.id/> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id<http://m1.id/> <http://m1.id><http://m1.id/> <> m2.id<http://m2.id/> <http://m2.id><http://m2.id/> and m1.id<http://m1.id/> <http://m1.id><http://m1.id/> < m2.id<http://m2.id/> <http://m2.id><http://m2.id/> and m1.dt = m2.dt group by m1.id<http://m1.id/> <http://m1.id><http://m1.id/> , m2.id<http://m2.id/> <http://m2.id><http://m2.id/> , m1.dt;




    Any input would be appreciated.
  • Ryan LeCompte at Nov 11, 2009 at 6:46 pm
    I have two sequence file tables with 10GB each, and it's exhibiting
    the same problem of the final reducer just never finishing.

    Any ideas there? Have you tried joining across such large tables?

    Ryan



    On Nov 11, 2009, at 1:37 PM, Ning Zhang wrote:

    The problem is in the data loading process: the m1 file is a plain
    text CSV format and you are loading it to a Hive table with the
    default setting, which assumes fields are separated by ctl_A. So if
    you look at the first 10 rows, all fields are NULL since Hive cannot
    find ctl+A in a row. So your query is actually doing a cartesian
    product of 100k x 100k rows.

    Since the 'Load data' command doesn't check the input format not
    does it transform the format, you need to specify the input format
    in the create table DDL. Following is a working example. It finishes
    in my unit test (single machine) in less than 3 mins.

    8<--------
    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string)
    row format delimited fields terminated by ','
    stored as textfile;

    LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;

    select * from m1 limit 10;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );


    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
    m1.aid, m2.aid, m1.dt;
    8<------------
    On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:

    Definitely the join portion of the plan. The one reduce job takes
    over 2 1/2 hours to complete doing the following:

    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000
    rows
    2009-11-10 18:14:30,560 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000
    rows
    2009-11-10 18:14:31,431 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000
    rows
    2009-11-10 18:14:31,431 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000
    rows
    2009-11-10 18:14:32,384 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000
    rows
    ...
    2009-11-10 20:53:19,459 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding
    9999000000 rows
    2009-11-10 20:53:19,459 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000
    rows
    2009-11-10 20:53:20,374 INFO
    org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding
    10000000000 rows
    2009-11-10 20:53:20,374 INFO
    org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000
    rows


    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain <njain@facebook.com>
    wrote:
    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it poss
    ible that there is a large skew on aid and dt which is forcing the
    data to be send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" <defenestrationism@gmail.com>
    wrote:

    I would definitely appreciate any insights on this from the list.
    I tried to reduce the query down to something that is easily
    understood and hive still demonstrates a pretty poor join
    performance behavior on a three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
    m1.aid, m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've
    looked at the benchmark (http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf
    ) and hive seems to be able to join much bigger data sets. And I
    tried running the same query on a single node dbms on my desktop,
    and it's able to return results in less than 3-minutes. While hive
    has been running for at least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte <lecompte@gmail.com>
    wrote:
    Any thoughts on this? I've only had luck by reducing the data on
    each side of the join. Is this something Hive might be able to
    improve in a future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte <lecompte@gmail.com>
    wrote:

    I've had a similar issue with a small cluster. Is there any way
    that you can reduce the size of the data being joined on both
    sides? If you search the forums for join issue, you will see the
    thread for my issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator < > > defenestrationism@gmail.com> wrote:

    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same
    behavior of using 100% cpu and consuming a lot of memory. I'm
    suspecting that it might be an issue with the reduce job itself -
    is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < > > njain@facebook.com> wrote:
    The number of reducers are inferred from the input data size. But,
    you can always overwrite it by setting mapred.reduce.tasks




    From: Defenestrator [mailto: > > defenestrationism@gmail.com]
    Sent: Tuesday, November 03, 2009 6:46 PM

    To: > > hive-user@hadoop.apache.org

    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt
    = m2.dt to the on clause. It increased the number of reduce jobs
    to 2. So now there are two processes running on two nodes at 100%
    consuming a lot of memory. Is there a reason why hive doesn't
    spawn more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < > > njain@facebook.com> wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id <http://m1.id> as id_1, m2.id <http://m2.id> as
    id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id <http://m1.id> <> m2.id <http://m2.id
    and m1.id <http://m1.id> < m2.id <http://m2.id> group by m1.id
    <http://m1.id> , m2.id <http://m2.id> , m1.dt;




    From: Defenestrator [mailto: > > defenestrationism@gmail.com]
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: > > hive-user@hadoop.apache.org

    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same
    data (>29M rows) on a 3-node hadoop cluster. I'm essentially
    trying to do a self join. It ends up running 269 map jobs and 1
    reduce job. The map jobs complete but the reduce job just runs on
    one process on one of the hadoop nodes at 100% cpu utilization and
    just slowly increases in memory consumption. The reduce job never
    goes beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id <http://m1.id> as id_1, m2.id <http://m2.id> as
    id_2, count(1), m1.dt

    from m1 join m2 where m1.id <http://m1.id> <> m2.id <http://
    m2.id> and m1.id <http://m1.id> < m2.id <http://m2.id> and m1.dt
    = m2.dt group by m1.id <http://m1.id> , m2.id <http://m2.id> , m1.dt;




    Any input would be appreciated.



  • Ning Zhang at Nov 11, 2009 at 7:16 pm
    I think it depends on how many rows are there in each table and what the distribution of the join keys. I suspect that your data are very skewed so that a lot of rows in table A have the same join key with a lot of rows in table B. That will produce huge number of rows as the join result. Hive currently has at most 1 reducer for each distinct join key. So it may be very slow for this reducer. There are some JIRAs created for this problem. I don't know if there is someone actively working on this, but it should be a great research project.

    Another way you may be able to optimize is to rewrite your query to use semijoin if you could. Semijoin is just checked in to trunk so you probably need to check out trunk and try it out. If you can rewrite it using semjoin you can avoid cartesian product. The basic idea of semijoin is to implement the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you have a SQL query like:

    select m1.aid, m1.dt, count(1)
    from m1 m1
    where exists (select null from m1 m2 where m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    you can rewrite it using left semi join in HiveQL as

    select m1.aid, m1.dt, count(1)
    from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    Note that you can not 'select' any columns from m2 in the right-hand-side of the table in left semi join, just as in the above exists subquery you cannot reference inner query tables from the outer query.

    The benefits of using semijoin rather than inner join is that if there are a lot of rows with the same join key in m1 and m2, it will return after the first match, without doing a cartesian product of all matching rows in m1 and m2.

    Of course whether you can rewrite this depends on your application semantics. If you really want all the combinations of the rows from m1 AND m2, semijoin won't help you.

    Ning

    On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote:

    I have two sequence file tables with 10GB each, and it's exhibiting the same problem of the final reducer just never finishing.

    Any ideas there? Have you tried joining across such large tables?

    Ryan




    On Nov 11, 2009, at 1:37 PM, Ning Zhang wrote:

    The problem is in the data loading process: the m1 file is a plain text CSV format and you are loading it to a Hive table with the default setting, which assumes fields are separated by ctl_A. So if you look at the first 10 rows, all fields are NULL since Hive cannot find ctl+A in a row. So your query is actually doing a cartesian product of 100k x 100k rows.

    Since the 'Load data' command doesn't check the input format not does it transform the format, you need to specify the input format in the create table DDL. Following is a working example. It finishes in my unit test (single machine) in less than 3 mins.

    8<--------
    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string)
    row format delimited fields terminated by ','
    stored as textfile;

    LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;

    select * from m1 limit 10;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );


    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;
    8<------------

    On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:

    Definitely the join portion of the plan. The one reduce job takes over 2 1/2 hours to complete doing the following:


    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000 rows
    2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows

    ...

    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000 rows
    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000 rows



    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain wrote:
    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it possible that there is a large skew on aid and dt which is forcing the data to be send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" wrote:

    I would definitely appreciate any insights on this from the list. I tried to reduce the query down to something that is easily understood and hive still demonstrates a pretty poor join performance behavior on a three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the benchmark (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)>http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) and hive seems to be able to join much bigger data sets. And I tried running the same query on a single node dbms on my desktop, and it's able to return results in less than 3-minutes. While hive has been running for at least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:
    Any thoughts on this? I've only had luck by reducing the data on each side of the join. Is this something Hive might be able to improve in a future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you can reduce the size of the data being joined on both sides? If you search the forums for join issue, you will see the thread for my issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator < < mailto:defenestrationism@gmail.com> defenestrationism@gmail.com wrote:

    I was able to increase the number of reduce jobs manually to 32. However, it finishes 28 of them and the other 4 has the same behavior of using 100% cpu and consuming a lot of memory. I'm suspecting that it might be an issue with the reduce job itself - is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < < mailto:njain@facebook.com> < mailto:njain@facebook.com> njain@facebook.com wrote:
    The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks




    From: Defenestrator [mailto: defenestrationism@gmail.com defenestrationism@gmail.com defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 6:46 PM

    To: < mailto:hive-user@hadoop.apache.org> < mailto:hive-user@hadoop.apache.org> hive-user@hadoop.apache.org

    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < < mailto:njain@facebook.com> < mailto:njain@facebook.com> njain@facebook.com wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> as id_1, m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> <> m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> < m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> group by m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> , m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> , m1.dt;




    From: Defenestrator [mailto: defenestrationism@gmail.com defenestrationism@gmail.com defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: < mailto:hive-user@hadoop.apache.org> < mailto:hive-user@hadoop.apache.org> hive-user@hadoop.apache.org

    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> as id_1, m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> <> m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> < m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.dt = m2.dt group by m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> , m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> , m1.dt;




    Any input would be appreciated.
  • Defenestrator at Nov 11, 2009 at 8:21 pm
    Thanks for the explanation, Ning. This explains the behavior that I'm
    seeing with the original query that I was trying to run which was doing a
    cartesian product for each dt (of which there are only 2) and two reducers
    were running very slowly.

    insert overwrite table foo1

    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt

    from m1 m1 join m1 m2 on m1.dt = m2.dt where m1.aid <> m2.aid and m1.aid <
    m2.aid group by m1.aid, m2.aid, m1.dt;


    Here's my understanding of the issue, please correct me if I'm incorrect.
    Because there are only two distinct "dt" values, hive will only allocate a
    single reducer to do the cartesian product per distinct "dt" value.

    And this problem doesn't necessarily have anything to do with skewed data,
    right? Suppose I have a very large dataset that has an even distribution of
    x join key values that all produce a lot of join output tuples, hive will
    have x reducers that will all run very slowly. And the solution for this
    problem is really for hive to have multiple reducers computing the cartesian
    product for each distinct join key value, correct?
    On Wed, Nov 11, 2009 at 11:16 AM, Ning Zhang wrote:

    I think it depends on how many rows are there in each table and what the
    distribution of the join keys. I suspect that your data are very skewed so
    that a lot of rows in table A have the same join key with a lot of rows in
    table B. That will produce huge number of rows as the join result. Hive
    currently has at most 1 reducer for each distinct join key. So it may be
    very slow for this reducer. There are some JIRAs created for this problem. I
    don't know if there is someone actively working on this, but it should be a
    great research project.

    Another way you may be able to optimize is to rewrite your query to use
    semijoin if you could. Semijoin is just checked in to trunk so you probably
    need to check out trunk and try it out. If you can rewrite it using semjoin
    you can avoid cartesian product. The basic idea of semijoin is to implement
    the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you
    have a SQL query like:

    select m1.aid, m1.dt, count(1)
    from m1 m1
    where exists (select null from m1 m2 where m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    you can rewrite it using left semi join in HiveQL as

    select m1.aid, m1.dt, count(1)
    from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    Note that you can not 'select' any columns from m2 in the right-hand-side
    of the table in left semi join, just as in the above exists subquery you
    cannot reference inner query tables from the outer query.

    The benefits of using semijoin rather than inner join is that if there are
    a lot of rows with the same join key in m1 and m2, it will return after the
    first match, without doing a cartesian product of all matching rows in m1
    and m2.

    Of course whether you can rewrite this depends on your application
    semantics. If you really want all the combinations of the rows from m1 AND
    m2, semijoin won't help you.

    Ning

    On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote:

    I have two sequence file tables with 10GB each, and it's exhibiting the
    same problem of the final reducer just never finishing.

    Any ideas there? Have you tried joining across such large tables?

    Ryan




    On Nov 11, 2009, at 1:37 PM, Ning Zhang wrote:

    The problem is in the data loading process: the m1 file is a plain text CSV
    format and you are loading it to a Hive table with the default setting,
    which assumes fields are separated by ctl_A. So if you look at the first 10
    rows, all fields are NULL since Hive cannot find ctl+A in a row. So your
    query is actually doing a cartesian product of 100k x 100k rows.

    Since the 'Load data' command doesn't check the input format not does it
    transform the format, you need to specify the input format in the create
    table DDL. Following is a working example. It finishes in my unit test
    (single machine) in less than 3 mins.

    8<--------
    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string)
    row format delimited fields terminated by ','
    stored as textfile;

    LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;

    select * from m1 limit 10;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );


    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid,
    m2.aid, m1.dt;
    8<------------

    On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:

    Definitely the join portion of the plan. The one reduce job takes over 2
    1/2 hours to complete doing the following:


    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000 rows
    2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows

    ...

    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000 rows
    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000 rows



    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain < <njain@facebook.com>
    njain@facebook.com> wrote:
    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it possible
    that there is a large skew on aid and dt which is forcing the data to be
    send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" < <defenestrationism@gmail.com>
    defenestrationism@gmail.com> wrote:

    I would definitely appreciate any insights on this from the list. I
    tried to reduce the query down to something that is easily understood and
    hive still demonstrates a pretty poor join performance behavior on a
    three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
    m1.aid, m2.aid, m1.dt;


    Attached is the file I'm using that only has 100k rows. I've looked at
    the benchmark (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)>
    http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)
    and hive seems to be able to join much bigger data sets. And I tried
    running the same query on a single node dbms on my desktop, and it's able to
    return results in less than 3-minutes. While hive has been running for at
    least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte < <lecompte@gmail.com>
    lecompte@gmail.com> wrote:

    Any thoughts on this? I've only had luck by reducing the data on each
    side of the join. Is this something Hive might be able to improve in a
    future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte < <lecompte@gmail.com>
    lecompte@gmail.com> wrote:

    I've had a similar issue with a small cluster. Is there any way that
    you can reduce the size of the data being joined on both sides? If you
    search the forums for join issue, you will see the thread for my issue and
    get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator < <<defenestrationism@gmail.com>
    mailto:defenestrationism@gmail.com <defenestrationism@gmail.com>>
    <defenestrationism@gmail.com>defenestrationism@gmail.com> wrote:

    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same behavior of
    using 100% cpu and consuming a lot of memory. I'm suspecting that it might
    be an issue with the reduce job itself - is there a way to figure out what
    these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> <njain@facebook.com>
    njain@facebook.com> wrote:

    The number of reducers are inferred from the input data size. But, you can
    always overwrite it by setting mapred.reduce.tasks




    *From:* Defenestrator [mailto: <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    <hive-user@hadoop.apache.org>hive-user@hadoop.apache.org

    *Subject:* Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt =
    m2.dt to the on clause. It increased the number of reduce jobs to 2. So
    now there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> <njain@facebook.com>
    njain@facebook.com> wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id < <http://m1.id/>http://m1.id> as id_1, m2.id <<http://m2.id/>
    http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id < <http://m1.id/>http://m1.id>
    <> m2.id < <http://m2.id/>http://m2.id> and m1.id < <http://m1.id/>
    http://m1.id> < m2.id < <http://m2.id/>http://m2.id> group by m1.id <<http://m1.id/>
    http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt;




    *From:* Defenestrator [mailto: <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    <hive-user@hadoop.apache.org>hive-user@hadoop.apache.org

    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id < <http://m1.id/>http://m1.id> as id_1, m2.id <<http://m2.id/>
    http://m2.id> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id < <http://m1.id/>http://m1.id> <> m2.id <<http://m2.id/>
    http://m2.id> and m1.id < <http://m1.id/>http://m1.id> < m2.id <<http://m2.id/>
    http://m2.id> and m1.dt = m2.dt group by m1.id < <http://m1.id/>
    http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt;




    Any input would be appreciated.



  • Ning Zhang at Nov 12, 2009 at 5:22 am
    Your understanding is mostly correct. But the number of reducers are estimated at compile time. It is not a strict 1:1 mapping between distinct join keys with # of reducers since the stats are not available at compile time now.

    Your solution is correct in theory. but it is not trivial to partition mapper output with the same key to different reducers. It probably need to add a "grouping ID" to the sorting key before sending to the reducers.

    Ning

    On Nov 11, 2009, at 12:21 PM, Defenestrator wrote:

    Thanks for the explanation, Ning. This explains the behavior that I'm seeing with the original query that I was trying to run which was doing a cartesian product for each dt (of which there are only 2) and two reducers were running very slowly.

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.dt = m2.dt where m1.aid <> m2.aid and m1.aid < m2.aid group by m1.aid, m2.aid, m1.dt;


    Here's my understanding of the issue, please correct me if I'm incorrect. Because there are only two distinct "dt" values, hive will only allocate a single reducer to do the cartesian product per distinct "dt" value.

    And this problem doesn't necessarily have anything to do with skewed data, right? Suppose I have a very large dataset that has an even distribution of x join key values that all produce a lot of join output tuples, hive will have x reducers that will all run very slowly. And the solution for this problem is really for hive to have multiple reducers computing the cartesian product for each distinct join key value, correct?

    On Wed, Nov 11, 2009 at 11:16 AM, Ning Zhang wrote:
    I think it depends on how many rows are there in each table and what the distribution of the join keys. I suspect that your data are very skewed so that a lot of rows in table A have the same join key with a lot of rows in table B. That will produce huge number of rows as the join result. Hive currently has at most 1 reducer for each distinct join key. So it may be very slow for this reducer. There are some JIRAs created for this problem. I don't know if there is someone actively working on this, but it should be a great research project.

    Another way you may be able to optimize is to rewrite your query to use semijoin if you could. Semijoin is just checked in to trunk so you probably need to check out trunk and try it out. If you can rewrite it using semjoin you can avoid cartesian product. The basic idea of semijoin is to implement the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you have a SQL query like:

    select m1.aid, m1.dt, count(1)
    from m1 m1
    where exists (select null from m1 m2 where m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    you can rewrite it using left semi join in HiveQL as

    select m1.aid, m1.dt, count(1)
    from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    Note that you can not 'select' any columns from m2 in the right-hand-side of the table in left semi join, just as in the above exists subquery you cannot reference inner query tables from the outer query.

    The benefits of using semijoin rather than inner join is that if there are a lot of rows with the same join key in m1 and m2, it will return after the first match, without doing a cartesian product of all matching rows in m1 and m2.

    Of course whether you can rewrite this depends on your application semantics. If you really want all the combinations of the rows from m1 AND m2, semijoin won't help you.

    Ning

    On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote:

    I have two sequence file tables with 10GB each, and it's exhibiting the same problem of the final reducer just never finishing.

    Any ideas there? Have you tried joining across such large tables?

    Ryan




    On Nov 11, 2009, at 1:37 PM, Ning Zhang wrote:

    The problem is in the data loading process: the m1 file is a plain text CSV format and you are loading it to a Hive table with the default setting, which assumes fields are separated by ctl_A. So if you look at the first 10 rows, all fields are NULL since Hive cannot find ctl+A in a row. So your query is actually doing a cartesian product of 100k x 100k rows.

    Since the 'Load data' command doesn't check the input format not does it transform the format, you need to specify the input format in the create table DDL. Following is a working example. It finishes in my unit test (single machine) in less than 3 mins.

    8<--------
    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string)
    row format delimited fields terminated by ','
    stored as textfile;

    LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;

    select * from m1 limit 10;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );


    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;
    8<------------

    On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:

    Definitely the join portion of the plan. The one reduce job takes over 2 1/2 hours to complete doing the following:


    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000 rows
    2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows

    ...

    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000 rows
    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000 rows



    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain wrote:
    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it possible that there is a large skew on aid and dt which is forcing the data to be send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" wrote:

    I would definitely appreciate any insights on this from the list. I tried to reduce the query down to something that is easily understood and hive still demonstrates a pretty poor join performance behavior on a three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt;

    Attached is the file I'm using that only has 100k rows. I've looked at the benchmark (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)>http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) and hive seems to be able to join much bigger data sets. And I tried running the same query on a single node dbms on my desktop, and it's able to return results in less than 3-minutes. While hive has been running for at least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte wrote:
    Any thoughts on this? I've only had luck by reducing the data on each side of the join. Is this something Hive might be able to improve in a future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte wrote:

    I've had a similar issue with a small cluster. Is there any way that you can reduce the size of the data being joined on both sides? If you search the forums for join issue, you will see the thread for my issue and get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator < < mailto:defenestrationism@gmail.com> defenestrationism@gmail.com wrote:

    I was able to increase the number of reduce jobs manually to 32. However, it finishes 28 of them and the other 4 has the same behavior of using 100% cpu and consuming a lot of memory. I'm suspecting that it might be an issue with the reduce job itself - is there a way to figure out what these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < < mailto:njain@facebook.com> < mailto:njain@facebook.com> njain@facebook.com wrote:
    The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks




    From: Defenestrator [mailto: defenestrationism@gmail.com defenestrationism@gmail.com defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 6:46 PM

    To: < mailto:hive-user@hadoop.apache.org> < mailto:hive-user@hadoop.apache.org> hive-user@hadoop.apache.org

    Subject: Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < < mailto:njain@facebook.com> < mailto:njain@facebook.com> njain@facebook.com wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> as id_1, m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> <> m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> < m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> group by m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> , m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> , m1.dt;




    From: Defenestrator [mailto: defenestrationism@gmail.com defenestrationism@gmail.com defenestrationism@gmail.com ]
    Sent: Tuesday, November 03, 2009 4:44 PM
    To: < mailto:hive-user@hadoop.apache.org> < mailto:hive-user@hadoop.apache.org> hive-user@hadoop.apache.org

    Subject: Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> as id_1, m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> <> m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> < m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.dt = m2.dt group by m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> , m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> , m1.dt;




    Any input would be appreciated.
  • Defenestrator at Nov 12, 2009 at 6:27 pm
    Ning,

    Can you explain how it comes up with the estimate at compile time, if the #
    of distinct join keys is known only at runtime? Is this just based on the
    number of hdfs blocks of the relations being joined?

    Thanks.
    On Wed, Nov 11, 2009 at 9:21 PM, Ning Zhang wrote:

    Your understanding is mostly correct. But the number of reducers are
    estimated at compile time. It is not a strict 1:1 mapping between distinct
    join keys with # of reducers since the stats are not available at compile
    time now.

    Your solution is correct in theory. but it is not trivial to partition
    mapper output with the same key to different reducers. It probably need to
    add a "grouping ID" to the sorting key before sending to the reducers.

    Ning


    On Nov 11, 2009, at 12:21 PM, Defenestrator wrote:

    Thanks for the explanation, Ning. This explains the behavior that I'm
    seeing with the original query that I was trying to run which was doing a
    cartesian product for each dt (of which there are only 2) and two reducers
    were running very slowly.

    insert overwrite table foo1

    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt

    from m1 m1 join m1 m2 on m1.dt = m2.dt where m1.aid <> m2.aid and m1.aid <
    m2.aid group by m1.aid, m2.aid, m1.dt;


    Here's my understanding of the issue, please correct me if I'm incorrect.
    Because there are only two distinct "dt" values, hive will only allocate a
    single reducer to do the cartesian product per distinct "dt" value.

    And this problem doesn't necessarily have anything to do with skewed data,
    right? Suppose I have a very large dataset that has an even distribution of
    x join key values that all produce a lot of join output tuples, hive will
    have x reducers that will all run very slowly. And the solution for this
    problem is really for hive to have multiple reducers computing the cartesian
    product for each distinct join key value, correct?
    On Wed, Nov 11, 2009 at 11:16 AM, Ning Zhang wrote:

    I think it depends on how many rows are there in each table and what the
    distribution of the join keys. I suspect that your data are very skewed so
    that a lot of rows in table A have the same join key with a lot of rows in
    table B. That will produce huge number of rows as the join result. Hive
    currently has at most 1 reducer for each distinct join key. So it may be
    very slow for this reducer. There are some JIRAs created for this problem. I
    don't know if there is someone actively working on this, but it should be a
    great research project.

    Another way you may be able to optimize is to rewrite your query to use
    semijoin if you could. Semijoin is just checked in to trunk so you probably
    need to check out trunk and try it out. If you can rewrite it using semjoin
    you can avoid cartesian product. The basic idea of semijoin is to implement
    the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you
    have a SQL query like:

    select m1.aid, m1.dt, count(1)
    from m1 m1
    where exists (select null from m1 m2 where m1.aid = m2.aid and
    m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    you can rewrite it using left semi join in HiveQL as

    select m1.aid, m1.dt, count(1)
    from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt)
    group by m1.aid, m1.dt;

    Note that you can not 'select' any columns from m2 in the right-hand-side
    of the table in left semi join, just as in the above exists subquery you
    cannot reference inner query tables from the outer query.

    The benefits of using semijoin rather than inner join is that if there are
    a lot of rows with the same join key in m1 and m2, it will return after the
    first match, without doing a cartesian product of all matching rows in m1
    and m2.

    Of course whether you can rewrite this depends on your application
    semantics. If you really want all the combinations of the rows from m1 AND
    m2, semijoin won't help you.

    Ning

    On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote:

    I have two sequence file tables with 10GB each, and it's exhibiting the
    same problem of the final reducer just never finishing.

    Any ideas there? Have you tried joining across such large tables?

    Ryan




    On Nov 11, 2009, at 1:37 PM, Ning Zhang wrote:

    The problem is in the data loading process: the m1 file is a plain text
    CSV format and you are loading it to a Hive table with the default setting,
    which assumes fields are separated by ctl_A. So if you look at the first 10
    rows, all fields are NULL since Hive cannot find ctl+A in a row. So your
    query is actually doing a cartesian product of 100k x 100k rows.

    Since the 'Load data' command doesn't check the input format not does it
    transform the format, you need to specify the input format in the create
    table DDL. Following is a working example. It finishes in my unit test
    (single machine) in less than 3 mins.

    8<--------
    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string)
    row format delimited fields terminated by ','
    stored as textfile;

    LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;

    select * from m1 limit 10;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );


    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
    m1.aid, m2.aid, m1.dt;
    8<------------

    On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:

    Definitely the join portion of the plan. The one reduce job takes over 2
    1/2 hours to complete doing the following:


    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
    2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
    2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
    2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
    2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
    2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
    2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows
    2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000 rows
    2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows

    ...

    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000 rows
    2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000 rows
    2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000 rows



    On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain < <njain@facebook.com>
    njain@facebook.com> wrote:
    I think you missed the attachment.


    Which job is taking more time – join or group by ?

    Can you send the data characteristics for m1 and foo1 – is it possible
    that there is a large skew on aid and dt which is forcing the data to be
    send to a single reducer



    -namit



    On 11/10/09 6:35 PM, "Defenestrator" < <defenestrationism@gmail.com>
    defenestrationism@gmail.com> wrote:

    I would definitely appreciate any insights on this from the list. I
    tried to reduce the query down to something that is easily understood and
    hive still demonstrates a pretty poor join performance behavior on a
    three-node hadoop cluster.

    drop table m1;
    drop table foo1;

    create table m1 (
    mid int,
    aid int,
    dt string);

    LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;

    create table foo1 (
    aid_1 int,
    aid_2 int,
    mid bigint,
    dt bigint
    );

    set mapred.reduce.tasks=32;

    insert overwrite table foo1
    select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
    from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
    m1.aid, m2.aid, m1.dt;


    Attached is the file I'm using that only has 100k rows. I've looked at
    the benchmark (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)>
    http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)
    and hive seems to be able to join much bigger data sets. And I tried
    running the same query on a single node dbms on my desktop, and it's able to
    return results in less than 3-minutes. While hive has been running for at
    least 20 minutes now.

    Thanks.

    On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte < <lecompte@gmail.com>
    lecompte@gmail.com> wrote:

    Any thoughts on this? I've only had luck by reducing the data on each
    side of the join. Is this something Hive might be able to improve in a
    future release of the query plan optimization?

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:55 PM, Ryan LeCompte < <lecompte@gmail.com>
    lecompte@gmail.com> wrote:

    I've had a similar issue with a small cluster. Is there any way that
    you can reduce the size of the data being joined on both sides? If you
    search the forums for join issue, you will see the thread for my issue and
    get some tips.

    Thanks,
    Ryan



    On Nov 3, 2009, at 10:45 PM, Defenestrator < <<defenestrationism@gmail.com>
    mailto:defenestrationism@gmail.com <defenestrationism@gmail.com>>
    <defenestrationism@gmail.com>defenestrationism@gmail.com> wrote:

    I was able to increase the number of reduce jobs manually to 32.
    However, it finishes 28 of them and the other 4 has the same behavior of
    using 100% cpu and consuming a lot of memory. I'm suspecting that it might
    be an issue with the reduce job itself - is there a way to figure out what
    these jobs are doing exactly?

    Thanks.

    On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> <njain@facebook.com>
    njain@facebook.com> wrote:

    The number of reducers are inferred from the input data size. But, you
    can always overwrite it by setting mapred.reduce.tasks




    *From:* Defenestrator [mailto: <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 6:46 PM

    *To:* < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    <hive-user@hadoop.apache.org>hive-user@hadoop.apache.org

    *Subject:* Re: Self join problem


    Hi Namit,



    Thanks for your suggestion.



    I tried changing the query as you had suggested by moving the m1.dt =
    m2.dt to the on clause. It increased the number of reduce jobs to 2. So
    now there are two processes running on two nodes at 100% consuming a lot of
    memory. Is there a reason why hive doesn't spawn more reduce jobs for this
    query?



    On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> < <njain@facebook.com>
    mailto:njain@facebook.com <njain@facebook.com>> <njain@facebook.com>
    njain@facebook.com> wrote:

    Get the join condition in the on condition:


    insert overwrite table foo1
    select m1.id < <http://m1.id/>http://m1.id> as id_1, m2.id <<http://m2.id/>
    http://m2.id> as id_2, count(1), m1.dt
    from m1 join m2 on m1.dt=m2.dt where m1.id < <http://m1.id/>http://m1.id>
    <> m2.id < <http://m2.id/>http://m2.id> and m1.id < <http://m1.id/>
    http://m1.id> < m2.id < <http://m2.id/>http://m2.id> group by m1.id <<http://m1.id/>
    http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt;




    *From:* Defenestrator [mailto: <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <mailto: <defenestrationism@gmail.com>
    defenestrationism@gmail.com> <defenestrationism@gmail.com>
    defenestrationism@gmail.com]
    *Sent:* Tuesday, November 03, 2009 4:44 PM
    *To:* < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    < <hive-user@hadoop.apache.org>mailto:hive-user@hadoop.apache.org<hive-user@hadoop.apache.org>>
    <hive-user@hadoop.apache.org>hive-user@hadoop.apache.org

    *Subject:* Self join problem



    Hello,



    I'm trying to run the following query where m1 and m2 have the same data
    (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self
    join. It ends up running 269 map jobs and 1 reduce job. The map jobs
    complete but the reduce job just runs on one process on one of the hadoop
    nodes at 100% cpu utilization and just slowly increases in memory
    consumption. The reduce job never goes beyond 82% complete despite letting
    it run for a day.



    I am running on 0.5.0 based on this morning's trunk.



    insert overwrite table foo1

    select m1.id < <http://m1.id/>http://m1.id> as id_1, m2.id <<http://m2.id/>
    http://m2.id> as id_2, count(1), m1.dt

    from m1 join m2 where m1.id < <http://m1.id/>http://m1.id> <> m2.id <<http://m2.id/>
    http://m2.id> and m1.id < <http://m1.id/>http://m1.id> < m2.id <<http://m2.id/>
    http://m2.id> and m1.dt = m2.dt group by m1.id < <http://m1.id/>
    http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt;




    Any input would be appreciated.



Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categorieshive, hadoop
postedNov 4, '09 at 12:44a
activeNov 12, '09 at 6:27p
posts19
users4
websitehive.apache.org

People

Translate

site design / logo © 2022 Grokbase