FAQ
Hi:
Impala seems support dynamic partition very well when inserting data into
partitioned table. However, if i specify partition column value statically
when loading data, such as execute following SQL statement.

* INSERT OVERWRITE table_name*
* PARTITION(dt ="20130729")*
* SELECT .....*

  the performance of above query is sub-optimal, b/c the top plan fragment
of the generated query plan use a constant hash partition function
STREAM DATA SINK
EXCHANGE ID: 4
HASH_PARTITIONED: '20130729'

this result in much data skew, only one plan fragment received all the
data, other plan fragments got nothing. Am i missing something or is there
something i can do to work around this?

--
Anty Rao

Search Discussions

  • Henry Robinson at Jul 29, 2013 at 11:40 pm
    Hi Anty -

    Can you share your entire query? This doesn't happen in all cases, so I
    want to narrow down exactly when it does happen.

    Thanks,
    Henry

    On 29 July 2013 08:41, Anty Rao wrote:

    Hi:
    Impala seems support dynamic partition very well when inserting data into
    partitioned table. However, if i specify partition column value statically
    when loading data, such as execute following SQL statement.

    * INSERT OVERWRITE table_name*
    * PARTITION(dt ="20130729")*
    * SELECT .....*

    the performance of above query is sub-optimal, b/c the top plan fragment
    of the generated query plan use a constant hash partition function
    STREAM DATA SINK
    EXCHANGE ID: 4
    HASH_PARTITIONED: '20130729'

    this result in much data skew, only one plan fragment received all the
    data, other plan fragments got nothing. Am i missing something or is there
    something i can do to work around this?

    --
    Anty Rao


    --
    Henry Robinson
    Software Engineer
    Cloudera
    415-994-6679
  • Anty Rao at Jul 30, 2013 at 12:41 am
    Hi Herry
    Here is the entire query
    INSERT OVERWRITE TABLE tableName

    SELECT

    c1 ,

    CASE

    WHEN substr(c2,1,2)<>'86' THEN 3

    WHEN city_id is not null then 1

    ELSE 2

    END as c3,

    c4,

    c2 ,

    sum(c5 + c6),

    sum(case WHEN statistic_code like '2%' then 1 else 0 end) as
    success_count
    FROM dw_wap a

    where url is not null

    GROUP BY

    c1,

    c3,

    c4,

    c2

    ;



    On Tue, Jul 30, 2013 at 7:40 AM, Henry Robinson wrote:

    Hi Anty -

    Can you share your entire query? This doesn't happen in all cases, so I
    want to narrow down exactly when it does happen.

    Thanks,
    Henry

    On 29 July 2013 08:41, Anty Rao wrote:

    Hi:
    Impala seems support dynamic partition very well when inserting data into
    partitioned table. However, if i specify partition column value statically
    when loading data, such as execute following SQL statement.

    * INSERT OVERWRITE table_name*
    * PARTITION(dt ="20130729")*
    * SELECT .....*

    the performance of above query is sub-optimal, b/c the top plan fragment
    of the generated query plan use a constant hash partition function
    STREAM DATA SINK
    EXCHANGE ID: 4
    HASH_PARTITIONED: '20130729'

    this result in much data skew, only one plan fragment received all the
    data, other plan fragments got nothing. Am i missing something or is there
    something i can do to work around this?

    --
    Anty Rao


    --
    Henry Robinson
    Software Engineer
    Cloudera
    415-994-6679


    --
    Anty Rao
  • Anty Rao at Jul 30, 2013 at 12:56 am
    Sorry. I pasted wrong query.Here is the correct one.
    INSERT OVERWRITE TABLE tableName
    PARTITION (dt='20130729')
    SELECT
    c1 ,
    CASE
    WHEN substr(c2,1,2)<>'86' THEN 3
    WHEN city_id is not null then 1
    ELSE 2
    END as c3,
    c4,
    c2 ,
    sum(c5 + c6),
    sum(case WHEN statistic_code like '2%' then 1 else 0 end) as
    success_count
    FROM dw_wap a
    where url is not null
    GROUP BY
    c1,
    c3,
    c4,
    c2
    ;


    On Tue, Jul 30, 2013 at 8:41 AM, Anty Rao wrote:

    Hi Herry
    Here is the entire query
    INSERT OVERWRITE TABLE tableName

    SELECT

    c1 ,

    CASE

    WHEN substr(c2,1,2)<>'86' THEN 3

    WHEN city_id is not null then 1

    ELSE 2

    END as c3,

    c4,

    c2 ,

    sum(c5 + c6),

    sum(case WHEN statistic_code like '2%' then 1 else 0 end) as
    success_count
    FROM dw_wap a

    where url is not null

    GROUP BY

    c1,

    c3,

    c4,

    c2

    ;



    On Tue, Jul 30, 2013 at 7:40 AM, Henry Robinson wrote:

    Hi Anty -

    Can you share your entire query? This doesn't happen in all cases, so I
    want to narrow down exactly when it does happen.

    Thanks,
    Henry

    On 29 July 2013 08:41, Anty Rao wrote:

    Hi:
    Impala seems support dynamic partition very well when inserting data
    into partitioned table. However, if i specify partition column value
    statically when loading data, such as execute following SQL statement.

    * INSERT OVERWRITE table_name*
    * PARTITION(dt ="20130729")*
    * SELECT .....*

    the performance of above query is sub-optimal, b/c the top plan
    fragment of the generated query plan use a constant hash partition function
    STREAM DATA SINK
    EXCHANGE ID: 4
    HASH_PARTITIONED: '20130729'

    this result in much data skew, only one plan fragment received all the
    data, other plan fragments got nothing. Am i missing something or is there
    something i can do to work around this?

    --
    Anty Rao


    --
    Henry Robinson
    Software Engineer
    Cloudera
    415-994-6679


    --
    Anty Rao


    --
    Anty Rao
  • Marcel Kornacker at Jul 30, 2013 at 3:18 am
    Could you also send us the full plan output?
    On Mon, Jul 29, 2013 at 5:56 PM, Anty Rao wrote:
    Sorry. I pasted wrong query.Here is the correct one.
    INSERT OVERWRITE TABLE tableName

    PARTITION (dt='20130729')
    SELECT
    c1 ,
    CASE
    WHEN substr(c2,1,2)<>'86' THEN 3
    WHEN city_id is not null then 1
    ELSE 2
    END as c3,
    c4,
    c2 ,
    sum(c5 + c6),
    sum(case WHEN statistic_code like '2%' then 1 else 0 end) as
    success_count
    FROM dw_wap a
    where url is not null
    GROUP BY
    c1,
    c3,
    c4,
    c2
    ;


    On Tue, Jul 30, 2013 at 8:41 AM, Anty Rao wrote:

    Hi Herry
    Here is the entire query
    INSERT OVERWRITE TABLE tableName

    SELECT

    c1 ,

    CASE

    WHEN substr(c2,1,2)<>'86' THEN 3

    WHEN city_id is not null then 1

    ELSE 2

    END as c3,

    c4,

    c2 ,

    sum(c5 + c6),

    sum(case WHEN statistic_code like '2%' then 1 else 0 end) as
    success_count

    FROM dw_wap a

    where url is not null

    GROUP BY

    c1,

    c3,

    c4,

    c2

    ;
    On Tue, Jul 30, 2013 at 7:40 AM, Henry Robinson <henry@cloudera.com>
    wrote:
    Hi Anty -

    Can you share your entire query? This doesn't happen in all cases, so I
    want to narrow down exactly when it does happen.

    Thanks,
    Henry

    On 29 July 2013 08:41, Anty Rao wrote:

    Hi:
    Impala seems support dynamic partition very well when inserting data
    into partitioned table. However, if i specify partition column value
    statically when loading data, such as execute following SQL statement.

    INSERT OVERWRITE table_name
    PARTITION(dt ="20130729")
    SELECT .....

    the performance of above query is sub-optimal, b/c the top plan
    fragment of the generated query plan use a constant hash partition function
    STREAM DATA SINK
    EXCHANGE ID: 4
    HASH_PARTITIONED: '20130729'
    this result in much data skew, only one plan fragment received all the
    data, other plan fragments got nothing. Am i missing something or is there
    something i can do to work around this?

    --
    Anty Rao



    --
    Henry Robinson
    Software Engineer
    Cloudera
    415-994-6679



    --
    Anty Rao



    --
    Anty Rao
  • Anty Rao at Aug 7, 2013 at 3:03 am
    I have to say that there is indeed a bug with the way to compute the value
    of field #numNode in plan node. I follow the code path for executing my
    SQL, I identify the following two places which cause the bug I encounter.
    And I fix it by myself, rebuild impala and the query plan generated finally
    get right.by the way, impala I’m using is 1.0.1

    1. Construct Exchange Node

    Exchange node inherit from plan node, when constructing a exchange node, it
    call one of plan node’s constructors.
    protected PlanNode(PlanNodeId id, PlanNode node, String planNodeName)

    {

    this.id = id;

    this.limit = node.limit;

    this.tupleIds = Lists.newArrayList(node.tupleIds);

    this.rowTupleIds = Lists.newArrayList(node.rowTupleIds);

    this.nullableTupleIds = Sets.newHashSet(node.nullableTupleIds);

    this.conjuncts = Expr.cloneList(node.conjuncts, null);

    this.cardinality = -1;

    this.compactData = node.compactData;

    this.planNodeName = planNodeName;

    }
    Unfortunately, the above code omits the filed #numNode. So the value of
    #numNode for any exchange node is always 0. It’s the culprit.

    2. Create plan fragment for aggregation node
    #createAggregationFragment in Planner.java, creating plan fragments for
    aggregation node. Take code path of no distinct aggregation for example, it
    create a new aggregation node responsible for doing merge aggregation, the
    value for field#numNode of the newly created aggregation node is 0 unless
    you call finalizer(analyzer) method on it, but you don't invoke it.


    The above two places cause #numNode of plan fragments are generated
    improperly, which cause the misbehavior that I encounter.



    On Fri, Aug 2, 2013 at 12:31 PM, Anty Rao wrote:

    It should be a bug in computing the value of #numNode in Plan Nodes. I
    added some logging statement in Planner, and found that only plan fragment
    who contains ScanNode has value of #numNode properly set, in other plan
    fragments its value is 0.
    After looking through the code and i believe this even more.

    On Fri, Aug 2, 2013 at 10:15 AM, Anty Rao wrote:

    Does impala 1.0.1 work properly with NameNode HA currently?

    On Thu, Aug 1, 2013 at 10:56 PM, Marcel Kornacker wrote:

    The number of nodes is determined by HdfsTable.getNumNodes().
    Something appears to get screwed up when you load the table's
    metadata. You should insert more logging statements in HdfsTable.java
    and find out what's going on there.
    On Wed, Jul 31, 2013 at 9:12 PM, Anty Rao wrote:
    I add the logging statement and re-run the query.The logged message is very
    strange,not the same as your expected.
    #partitions=1 #nodes=0



    On Thu, Aug 1, 2013 at 9:47 AM, Marcel Kornacker <marcel@cloudera.com>
    wrote:
    In Planner.java, line 288
    (" if (numPartitions <= inputFragment.getNumNodes()) return
    inputFragment;")
    could you insert a logging statement right before it:
    LOG.info("#partitions=" + Integer.toString(numPartitions) + " #nodes="
    + Integer.toString(inputFragment.getNumNodes()));

    and then let me know what the values are. You should see #partitions=1
    and #nodes=4 or 5 (but apparently you're not).
    On Wed, Jul 31, 2013 at 6:33 PM, Anty Rao wrote:
    I build my own binary from source.


    On Wed, Jul 31, 2013 at 5:18 AM, Marcel Kornacker <
    marcel@cloudera.com>
    wrote:
    On Mon, Jul 29, 2013 at 8:31 PM, Anty Rao <ant.rao@gmail.com>
    wrote:
    Actually there are 5 nodes, but only 4 node with data co-located
    with
    them.
    Did you install from a package or did you build your own binaries?
    The
    reason I'm asking is that what you're seeing is unexpected (the
    planner doesn't repartition if the expected number of partitions is
    less than the number of nodes) and some more diagnostic output
    would
    help.

    On Tue, Jul 30, 2013 at 11:27 AM, Marcel Kornacker
    <marcel@cloudera.com>
    wrote:
    How many nodes is this running on?

    On Mon, Jul 29, 2013 at 8:23 PM, Anty Rao <ant.rao@gmail.com>
    wrote:
    Hi Marcel
    Here is the full plan,
    PARTITION: HASH_PARTITIONED: '20130728'


    WRITE TO HDFS table=default.st_wap_bass_impala

    overwrite=true

    partitions: '20130728'


    4:EXCHANGE

    tuple ids: 1


    PLAN FRAGMENT 1

    PARTITION: HASH_PARTITIONED: <slot 7>, <slot 8>, <slot 9>,
    <slot
    10>


    STREAM DATA SINK

    EXCHANGE ID: 4

    HASH_PARTITIONED: '20130728'


    3:AGGREGATE
    output: SUM(<slot 11>), SUM(<slot 12>)
    group by: <slot 7>, <slot 8>, <slot 9>, <slot 10>
    tuple ids: 1
    2:EXCHANGE

    tuple ids: 1


    PLAN FRAGMENT 2

    PARTITION: RANDOM


    STREAM DATA SINK

    EXCHANGE ID: 2

    HASH_PARTITIONED: <slot 7>, <slot 8>, <slot 9>, <slot
    10>

    1:AGGREGATE
    output: SUM(uplink_traffic + downlink_traffic),
    SUM(CASE
    WHEN
    statistic_code LIKE '2%' THEN 1 ELSE 0 END)
    group by: wapgw_id, CASE WHEN substr(product_no, 1, 2)
    !=
    '86'
    THEN
    3 WHEN city_id IS NOT NULL THEN 1 ELSE 2 END,
    lower(substr(normalize_url(url), 1, 50)), product_no
    tuple ids: 1
    0:SCAN HDFS

    table=default.dw_wap #partitions=1 size=54.63GB

    predicates: url IS NOT NULL

    tuple ids: 0


    On Tue, Jul 30, 2013 at 11:18 AM, Marcel Kornacker
    <marcel@cloudera.com>
    wrote:
    Could you also send us the full plan output?

    On Mon, Jul 29, 2013 at 5:56 PM, Anty Rao <ant.rao@gmail.com
    wrote:
    Sorry. I pasted wrong query.Here is the correct one.
    INSERT OVERWRITE TABLE tableName

    PARTITION (dt='20130729')
    SELECT
    c1 ,
    CASE
    WHEN substr(c2,1,2)<>'86' THEN 3
    WHEN city_id is not null then 1
    ELSE 2
    END as c3,
    c4,
    c2 ,
    sum(c5 + c6),
    sum(case WHEN statistic_code like '2%' then 1
    else
    end)
    as
    success_count
    FROM dw_wap a
    where url is not null
    GROUP BY
    c1,
    c3,
    c4,
    c2
    ;



    On Tue, Jul 30, 2013 at 8:41 AM, Anty Rao <
    ant.rao@gmail.com>
    wrote:
    Hi Herry
    Here is the entire query
    INSERT OVERWRITE TABLE tableName

    SELECT

    c1 ,

    CASE

    WHEN substr(c2,1,2)<>'86' THEN 3

    WHEN city_id is not null then 1

    ELSE 2

    END as c3,

    c4,

    c2 ,

    sum(c5 + c6),

    sum(case WHEN statistic_code like '2%' then
    1
    else

    end)
    as
    success_count

    FROM dw_wap a

    where url is not null

    GROUP BY

    c1,

    c3,

    c4,

    c2

    ;
    On Tue, Jul 30, 2013 at 7:40 AM, Henry Robinson
    <henry@cloudera.com>
    wrote:
    Hi Anty -

    Can you share your entire query? This doesn't happen in
    all
    cases,
    so
    I
    want to narrow down exactly when it does happen.

    Thanks,
    Henry


    On 29 July 2013 08:41, Anty Rao <ant.rao@gmail.com>
    wrote:
    Hi:
    Impala seems support dynamic partition very well when
    inserting
    data
    into partitioned table. However, if i specify partition
    column
    value
    statically when loading data, such as execute following
    SQL
    statement.

    INSERT OVERWRITE table_name
    PARTITION(dt ="20130729")
    SELECT .....

    the performance of above query is sub-optimal, b/c the
    top
    plan
    fragment of the generated query plan use a constant
    hash
    partition
    function
    STREAM DATA SINK
    EXCHANGE ID: 4
    HASH_PARTITIONED: '20130729'
    this result in much data skew, only one plan fragment
    received
    all
    the
    data, other plan fragments got nothing. Am i missing
    something
    or
    is
    there
    something i can do to work around this?

    --
    Anty Rao



    --
    Henry Robinson
    Software Engineer
    Cloudera
    415-994-6679



    --
    Anty Rao



    --
    Anty Rao



    --
    Anty Rao



    --
    Anty Rao



    --
    Anty Rao



    --
    Anty Rao


    --
    Anty Rao



    --
    Anty Rao


    --
    Anty Rao

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupimpala-user @
categorieshadoop
postedJul 29, '13 at 3:41p
activeAug 7, '13 at 3:03a
posts6
users3
websitecloudera.com
irc#hadoop

People

Translate

site design / logo © 2021 Grokbase