On Wednesday, February 19, 2014 10:43:11 AM UTC-8, Marcel Kornacker wrote:On Wed, Feb 19, 2014 at 10:15 AM, <
[email protected] <javascript:>>
wrote:
Hi Marcel,
I encountered the similar problem and wondering how to do merge join in
impala.
I have two tables foo(419 millions) and bar(96 millions) partitioned by
event_date in our 5 node cluster. If I run the following query, it is very
slow.
Could you send the plan for this query?
More info about those tables: foo and bar are in parquet format. Both of
them have multiple rows given an ID.
Here is the explain plan:
PLAN FRAGMENT 0
PARTITION: UNPARTITIONED
7:AGGREGATE (merge finalize)
output: SUM(SUM(amount))
6:EXCHANGE
PLAN FRAGMENT 1
PARTITION: HASH_PARTITIONED: f.id
STREAM DATA SINK
EXCHANGE ID: 6
UNPARTITIONED
3:AGGREGATE
output: SUM(amount)
2:HASH JOIN
join op: INNER JOIN (PARTITIONED)
hash predicates:
f.id = b.id
other predicates: f.event_date >= b.event_date, f.event_date <=
date_add(b.event_date, 21) |
PLAN FRAGMENT 2
PARTITION: RANDOM
STREAM DATA SINK
EXCHANGE ID: 5
HASH_PARTITIONED: b.id
1:SCAN HDFS
table=default.bar #partitions=10/18 size=3.63GB
PLAN FRAGMENT 3
PARTITION: RANDOM
STREAM DATA SINK
EXCHANGE ID: 4
HASH_PARTITIONED: f.id
0:SCAN HDFS
table=default.foo #partitions=11/18 size=13.70GB
I know that the condition for event_date doesn't work in the
partitioned join. The partitioned table is also not taken into account. Is
there a way to improve this join in Impala? Or I should favor the big wide
table instead?
I'm not sure what you mean by that.
When I say "partitioned table is also not taken ...", I actually mean
"impala daemon does not use this constraint f.event_date between
b.event_date and date_add(event_date, 21) in join. For a row in foo,
aggregation nodes probably will scan all bar rows of the same id. Because
some of bar rows for some of ids will never satisfy this constraint, if
exploiting the partitioned tables by date and this constraint, the actual
joining data set size should be smaller."
I actually created a joined table with all foo and bar columns using Hive
which has total 1.99 billion rows, for each row in foo, there are average
4-5 rows in bar satisfying the join conditions. Querying this big wide
joined table is pretty fast. In DW, we usually join dimension tables and a
fact tables, if both dimension and fact tables are very large, is preparing
a joined table the only way in Impala.
I believe that my query should be efficient if using merge-join.
I'm using Impala 1.2.3.
I am also worrying about what is the performance of impala when equi joining
two big tables like that, which is usually common in DW by joining a large
fact and a large dimension table. I didn't get a chance to try that.
select
sum(amount)
from
foo f
inner join [SHUFFLE] bar b
on f.id = b.id
and f.event_date between b.event_date and date_add(b.event_date, 21)
and f.event_date >= '2013-01-01' and f.event_date >= '2013-01-01'
To unsubscribe from this group and stop receiving emails from it, send an
email to
[email protected] <javascript:>.
To unsubscribe from this group and stop receiving emails from it, send an email to
[email protected].