FAQ
I'm wondering if it's possible to get a sort-merge bucketized join to work
when outering to a partitioned table.
My goal is to be able to do an outer join from a table with 10 million rows
to one with 8 billion rows.
The specific problem I'm having now is that while Hive generates an SMJ
query plan when outering to a table with one (bucketized) partition,
it doesn't seem possible to get an SMJ query plan if the
large/outer/streamed table has a second partition.

Is this a limitation of the feature or is it possible to achieve an SMJ to a
partitioned table?



DETAILS


--
-- Environment
--

Hadoop 0.20.2
Hive derived from svn ~8/20/10


--
-- Useful links
--

-- sorted merge join (last updated 3/4/10)
https://issues.apache.org/jira/browse/HIVE-1194

-- join operators
https://issues.apache.org/jira/browse/HIVE-741


--
-- Steps
--

The following steps reproduce the issue

--
-- Generate test data
--

hadoop fs -put /hadoop/conf /test_data_raw
/hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw
/test_data_freq '\w+'
-- Cleanup/review data
hadoop fs -rmr /test_data_freq/_logs
hadoop fs -cat /test_data_freq/*

--
-- Create an external table to the source data
--

CREATE EXTERNAL TABLE v_new_data
( cnt STRING,
word STRING
)
COMMENT 'Use this to pull data easily into Hive (managed) tables'
ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/test_data_freq';

select * from v_new_data;

--
-- Minor side issue: CLUSTER BY does not have the same effect on the table
description on CLUSTER BY and SORT BY.
--

This says: "Cluster By is a short-cut for both Distribute By and Sort By"
http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy

However, the following shows different table description when CLUSTER BY is
used without SORT BY.
The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which
seem very relant for SMJ, so SORT BY will be used.


set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;

set
hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

DROP TABLE d_test;
CREATE TABLE d_test
( cnt STRING,
word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;

DROP TABLE d_test2;
CREATE TABLE d_test2
( cnt STRING,
word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) INTO 4 BUCKETS;


Detailed Table Information Table(tableName:d_test, dbName:default,
owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
FieldSchema(name:word, type:string, comment:null)],
location:hdfs://pos01n:54310/user/hive/warehouse/d_test,
inputFormat:org.apache.hadoop.mapred.TextInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
parameters:{serialization.format=1}), bucketCols:[word],
sortCols:[Order(col:word, order:1)], parameters:{}),
partitionKeys:[FieldSchema(name:job, type:string, comment:null)],
parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.055 seconds
hive> describe extended d_test2;
OK
cnt string
word string
job string

Detailed Table Information Table(tableName:d_test2, dbName:default,
owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
FieldSchema(name:word, type:string, comment:null)],
location:hdfs://pos01n:54310/user/hive/warehouse/d_test2,
inputFormat:org.apache.hadoop.mapred.TextInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
parameters:{serialization.format=1}), bucketCols:[word], sortCols:[],
parameters:{}), partitionKeys:[FieldSchema(name:job, type:string,
comment:null)], parameters:{transient_lastDdlTime=1283277354},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.051 seconds

DROP TABLE d_test2;

--
-- Populate the RIGHT/OUTER TABLE
--

FROM v_new_data
INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file')
SELECT
cnt,
word
WHERE
word != 'apache'
and word != 'file'


--
-- Create the LEFT/INNER table
--

CREATE TABLE j_test
( cnt STRING,
word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;

FROM v_new_data
INSERT OVERWRITE TABLE j_test PARTITION(job='from')
SELECT
cnt,
word
WHERE
word = 'apache'
or word = 'file'
or word = 'name'
or word = 'property'
;


--
-- Create a table to hold the results
--
DROP TABLE r_test;

CREATE TABLE r_test
(
cnt STRING,
word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;

--
-- Perform the sort-merge bucketized outer join
--

-- GOOD: This shows "Sorted Merge Bucket Map Join Operator"
EXPLAIN
INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );

-- This works
SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
--
file NULL
property 134
apache NULL
name 136


--
-- Add a partition
--

FROM v_new_data
INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache')
SELECT
cnt,
word
WHERE
word != 'apache'


-- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only
"Common Join Operator"
EXPLAIN
INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );

NOTE: The reduce-side join will succeed but only with the small data set.
The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows,
4GB of data, in a table with 2 bucketized partitions) NEVER even complets a
single mapper.

Of course, I'd appreciate any help that can be provided.
I'm impressed with Hive so far and hope to use it replace an RDBMS for
"real" ETL, but that's only possible if I can get this sort of operation
(outering from 10M rows to 10B rows) to work.

Thanks for the work that's been done to Hive so far and for any help that
anyone can offer me in this situation.

Search Discussions

  • Yongqiang he at Sep 1, 2010 at 3:22 pm
    You are right, hive's SMJ does not support joining across multiple partitions.
    For your case, is it possible to use other solutions while still using
    smj? Like creating external tables/union join results.
    On Tue, Aug 31, 2010 at 8:53 PM, phil young wrote:
    I'm wondering if it's possible to get a sort-merge bucketized join to work
    when outering to a partitioned table.
    My goal is to be able to do an outer join from a table with 10 million rows
    to one with 8 billion rows.
    The specific problem I'm having now is that while Hive generates an SMJ
    query plan when outering to a table with one (bucketized) partition,
    it doesn't seem possible to get an SMJ query plan if the
    large/outer/streamed table has a second partition.
    Is this a limitation of the feature or is it possible to achieve an SMJ to a
    partitioned table?


    DETAILS

    --
    -- Environment
    --
    Hadoop 0.20.2
    Hive derived from svn ~8/20/10

    --
    -- Useful links
    --
    -- sorted merge join (last updated 3/4/10)
    https://issues.apache.org/jira/browse/HIVE-1194
    --  join operators
    https://issues.apache.org/jira/browse/HIVE-741

    --
    -- Steps
    --
    The following steps reproduce the issue
    --
    -- Generate test data
    --
    hadoop fs -put /hadoop/conf /test_data_raw
    /hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw
    /test_data_freq '\w+'
    -- Cleanup/review data
    hadoop fs -rmr /test_data_freq/_logs
    hadoop fs -cat /test_data_freq/*
    --
    -- Create an external table to the source data
    --
    CREATE EXTERNAL TABLE v_new_data
    ( cnt STRING,
    word STRING
    )
    COMMENT 'Use this to pull data easily into Hive (managed) tables'
    ROW FORMAT
    DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION '/test_data_freq';
    select * from v_new_data;
    --
    -- Minor side issue: CLUSTER BY does not have the same effect on the table
    description on CLUSTER BY and SORT BY.
    --
    This says: "Cluster By is a short-cut for both Distribute By and Sort By"
    http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy
    However, the following shows different table description when CLUSTER BY is
    used without SORT BY.
    The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which
    seem very relant for SMJ, so SORT BY will be used.

    set hive.enforce.bucketing=true;
    set hive.enforce.sorting=true;
    set
    hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
    set hive.optimize.bucketmapjoin = true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    DROP TABLE d_test;
    CREATE TABLE d_test
    ( cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
    DROP TABLE d_test2;
    CREATE TABLE d_test2
    ( cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) INTO 4 BUCKETS;

    Detailed Table Information Table(tableName:d_test, dbName:default,
    owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
    sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
    FieldSchema(name:word, type:string, comment:null)],
    location:hdfs://pos01n:54310/user/hive/warehouse/d_test,
    inputFormat:org.apache.hadoop.mapred.TextInputFormat,
    outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
    compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
    serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
    parameters:{serialization.format=1}), bucketCols:[word],
    sortCols:[Order(col:word, order:1)], parameters:{}),
    partitionKeys:[FieldSchema(name:job, type:string, comment:null)],
    parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354},
    viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
    Time taken: 0.055 seconds
    hive> describe extended d_test2;
    OK
    cnt string
    word string
    job string

    Detailed Table Information Table(tableName:d_test2, dbName:default,
    owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
    sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
    FieldSchema(name:word, type:string, comment:null)],
    location:hdfs://pos01n:54310/user/hive/warehouse/d_test2,
    inputFormat:org.apache.hadoop.mapred.TextInputFormat,
    outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
    compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
    serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
    parameters:{serialization.format=1}), bucketCols:[word], sortCols:[],
    parameters:{}), partitionKeys:[FieldSchema(name:job, type:string,
    comment:null)], parameters:{transient_lastDdlTime=1283277354},
    viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
    Time taken: 0.051 seconds
    DROP TABLE d_test2;
    --
    -- Populate the RIGHT/OUTER TABLE
    --
    FROM v_new_data
    INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file')
    SELECT
    cnt,
    word
    WHERE
    word != 'apache'
    and word != 'file'

    --
    -- Create the LEFT/INNER table
    --
    CREATE TABLE j_test
    ( cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
    FROM v_new_data
    INSERT OVERWRITE TABLE j_test PARTITION(job='from')
    SELECT
    cnt,
    word
    WHERE
    word = 'apache'
    or word = 'file'
    or word = 'name'
    or word = 'property'
    ;

    --
    -- Create a table to hold the results
    --
    DROP TABLE r_test;
    CREATE TABLE r_test
    (
    cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
    --
    -- Perform the sort-merge bucketized outer join
    --
    -- GOOD: This shows "Sorted Merge Bucket Map Join Operator"
    EXPLAIN
    INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
    SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
    FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
    -- This works
    SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
    FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
    --
    file NULL
    property 134
    apache NULL
    name 136

    --
    -- Add a partition
    --
    FROM v_new_data
    INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache')
    SELECT
    cnt,
    word
    WHERE
    word != 'apache'

    -- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only
    "Common Join Operator"
    EXPLAIN
    INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
    SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
    FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
    NOTE: The reduce-side join will succeed but only with the small data set.
    The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows,
    4GB of data, in a table with 2 bucketized partitions) NEVER even complets a
    single mapper.
    Of course, I'd appreciate any help that can be provided.
    I'm impressed with Hive so far and hope to use it replace an RDBMS for
    "real" ETL, but that's only possible if I can get this sort of operation
    (outering from 10M rows to 10B rows) to work.
    Thanks for the work that's been done to Hive so far and for any help that
    anyone can offer me in this situation.

  • Phil young at Sep 1, 2010 at 6:01 pm
    That might work, depending on how well the outer join to an external table
    works, and as long as I don't have to hand-code the query each time.

    I need to be able to do the following:
    Table A will have a different set of data each batch.
    Table B will be the big/historic table, e.g. d_test in the example below.
    I need to outer join the data in A to B, get the results, assign unique
    numbers to the NULL matches in B, and then append the results to B.

    The problem is how to add data to B. Currently this is only possible by
    replacing the table (ludicrously time consuming), or adding a partition, or
    using HBASE, right?
    I'm about to investigate HBASE. Can Hive do an efficient outer join to an
    HBASE table?

    I see that Hadoop had sort-merge joins to multiple tables a while ago.
    If I need to write the map-reduce code to do the join I'll focus on that.

    It seems like Hive (and maybe Hive/Hbase) should be able to do this since
    ***the most fundamental*** ETL operation !
    ...and Hive is supposed to do "ETL".

    Currently this is being done in an RDBMS.
    If this can be done efficiently in Hadoop/Hive/Hbase, then great, our
    company will continue with that "ecosystem".
    Sadly, if this can't be done, very shortly, then our company is going to end
    up using another tool.

    I appreciate the help. Personally, I hope that Hive can handle this ETL.




    On Wed, Sep 1, 2010 at 11:22 AM, yongqiang he wrote:

    You are right, hive's SMJ does not support joining across multiple
    partitions.
    For your case, is it possible to use other solutions while still using
    smj? Like creating external tables/union join results.
    On Tue, Aug 31, 2010 at 8:53 PM, phil young wrote:
    I'm wondering if it's possible to get a sort-merge bucketized join to work
    when outering to a partitioned table.
    My goal is to be able to do an outer join from a table with 10 million rows
    to one with 8 billion rows.
    The specific problem I'm having now is that while Hive generates an SMJ
    query plan when outering to a table with one (bucketized) partition,
    it doesn't seem possible to get an SMJ query plan if the
    large/outer/streamed table has a second partition.
    Is this a limitation of the feature or is it possible to achieve an SMJ to a
    partitioned table?


    DETAILS

    --
    -- Environment
    --
    Hadoop 0.20.2
    Hive derived from svn ~8/20/10

    --
    -- Useful links
    --
    -- sorted merge join (last updated 3/4/10)
    https://issues.apache.org/jira/browse/HIVE-1194
    -- join operators
    https://issues.apache.org/jira/browse/HIVE-741

    --
    -- Steps
    --
    The following steps reproduce the issue
    --
    -- Generate test data
    --
    hadoop fs -put /hadoop/conf /test_data_raw
    /hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw
    /test_data_freq '\w+'
    -- Cleanup/review data
    hadoop fs -rmr /test_data_freq/_logs
    hadoop fs -cat /test_data_freq/*
    --
    -- Create an external table to the source data
    --
    CREATE EXTERNAL TABLE v_new_data
    ( cnt STRING,
    word STRING
    )
    COMMENT 'Use this to pull data easily into Hive (managed) tables'
    ROW FORMAT
    DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION '/test_data_freq';
    select * from v_new_data;
    --
    -- Minor side issue: CLUSTER BY does not have the same effect on the table
    description on CLUSTER BY and SORT BY.
    --
    This says: "Cluster By is a short-cut for both Distribute By and Sort By"
    http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy
    However, the following shows different table description when CLUSTER BY is
    used without SORT BY.
    The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which
    seem very relant for SMJ, so SORT BY will be used.

    set hive.enforce.bucketing=true;
    set hive.enforce.sorting=true;
    set
    hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
    set hive.optimize.bucketmapjoin = true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    DROP TABLE d_test;
    CREATE TABLE d_test
    ( cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
    DROP TABLE d_test2;
    CREATE TABLE d_test2
    ( cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) INTO 4 BUCKETS;

    Detailed Table Information Table(tableName:d_test, dbName:default,
    owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
    sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string,
    comment:null),
    FieldSchema(name:word, type:string, comment:null)],
    location:hdfs://pos01n:54310/user/hive/warehouse/d_test,
    inputFormat:org.apache.hadoop.mapred.TextInputFormat,
    outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
    compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
    serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
    parameters:{serialization.format=1}), bucketCols:[word],
    sortCols:[Order(col:word, order:1)], parameters:{}),
    partitionKeys:[FieldSchema(name:job, type:string, comment:null)],
    parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354},
    viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
    Time taken: 0.055 seconds
    hive> describe extended d_test2;
    OK
    cnt string
    word string
    job string

    Detailed Table Information Table(tableName:d_test2, dbName:default,
    owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
    sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string,
    comment:null),
    FieldSchema(name:word, type:string, comment:null)],
    location:hdfs://pos01n:54310/user/hive/warehouse/d_test2,
    inputFormat:org.apache.hadoop.mapred.TextInputFormat,
    outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
    compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
    serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
    parameters:{serialization.format=1}), bucketCols:[word], sortCols:[],
    parameters:{}), partitionKeys:[FieldSchema(name:job, type:string,
    comment:null)], parameters:{transient_lastDdlTime=1283277354},
    viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
    Time taken: 0.051 seconds
    DROP TABLE d_test2;
    --
    -- Populate the RIGHT/OUTER TABLE
    --
    FROM v_new_data
    INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file')
    SELECT
    cnt,
    word
    WHERE
    word != 'apache'
    and word != 'file'

    --
    -- Create the LEFT/INNER table
    --
    CREATE TABLE j_test
    ( cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
    FROM v_new_data
    INSERT OVERWRITE TABLE j_test PARTITION(job='from')
    SELECT
    cnt,
    word
    WHERE
    word = 'apache'
    or word = 'file'
    or word = 'name'
    or word = 'property'
    ;

    --
    -- Create a table to hold the results
    --
    DROP TABLE r_test;
    CREATE TABLE r_test
    (
    cnt STRING,
    word STRING
    )
    PARTITIONED BY(job STRING)
    CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
    --
    -- Perform the sort-merge bucketized outer join
    --
    -- GOOD: This shows "Sorted Merge Bucket Map Join Operator"
    EXPLAIN
    INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
    SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
    FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
    -- This works
    SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
    FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
    --
    file NULL
    property 134
    apache NULL
    name 136

    --
    -- Add a partition
    --
    FROM v_new_data
    INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache')
    SELECT
    cnt,
    word
    WHERE
    word != 'apache'

    -- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only
    "Common Join Operator"
    EXPLAIN
    INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
    SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
    FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
    NOTE: The reduce-side join will succeed but only with the small data set.
    The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows,
    4GB of data, in a table with 2 bucketized partitions) NEVER even complets a
    single mapper.
    Of course, I'd appreciate any help that can be provided.
    I'm impressed with Hive so far and hope to use it replace an RDBMS for
    "real" ETL, but that's only possible if I can get this sort of operation
    (outering from 10M rows to 10B rows) to work.
    Thanks for the work that's been done to Hive so far and for any help that
    anyone can offer me in this situation.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categorieshive, hadoop
postedSep 1, '10 at 3:54a
activeSep 1, '10 at 6:01p
posts3
users2
websitehive.apache.org

2 users in discussion

Phil young: 2 posts Yongqiang he: 1 post

People

Translate

site design / logo © 2021 Grokbase