FAQ
Author: kevinwilfong
Date: Wed Dec 19 01:20:56 2012
New Revision: 1423731

URL: http://svn.apache.org/viewvc?rev=1423731&view=rev
Log:
HIVE-3633. sort-merge join does not work with sub-queries. (njain via kevinwilfong)

Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java
hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_14.q.out
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java Wed Dec 19 01:20:56 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+
+/**
+ * For SortMerge joins, this is a dummy operator, which stores the row for the
+ * small table before it reaches the sort merge join operator.
+ *
+ * Consider a query like:
+ *
+ * select * from
+ * (subq1 --> has a filter)
+ * join
+ * (subq2 --> has a filter)
+ * on some key
+ *
+ * Let us assume that subq1 is the small table (either specified by the user or inferred
+ * automatically). Since there can be multiple buckets/partitions for the table corresponding
+ * to subq1 given a file in subq2, a priority queue is present in SMBMapJoinOperator to scan the
+ * various buckets and fetch the least row (corresponding to the join key). The tree corresponding
+ * to subq1 needs to be evaluated in order to compute the join key (since the select list for the
+ * join key can move across different object inspectors).
+ *
+ * Therefore the following operator tree is created:
+ *
+ * TableScan (subq1) --> Select --> Filter --> DummyStore
+ * \
+ * \ SMBJoin
+ * /
+ * /
+ * TableScan (subq2) --> Select --> Filter
+ *
+ * In order to fetch the row with the least join key from the small table, the row from subq1
+ * is partially processed, and stored in DummyStore. For the actual processing of the join,
+ * SMBJoin (child of DummyStore) is processed for the transformed row. Note that in the absence of
+ * support for joins for sub-queries, this was not needed, since all transformations were done
+ * after SMBJoin, or for the small tables, nothing could have been present between TableScan and
+ * SMBJoin.
+ */
+public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Serializable {
+
+ private transient InspectableObject result;
+
+ public DummyStoreOperator() {
+ super();
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ outputObjInspector = inputObjInspectors[0];
+ result = new InspectableObject(null, outputObjInspector);
+ initializeChildren(hconf);
+ }
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ // Store the row
+ result.o = row;
+ }
+
+ @Override
+ public void reset() {
+ result = new InspectableObject(null, result.oi);
+ }
+
+ public InspectableObject getResult() {
+ return result;
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.FORWARD;
+ }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Dec 19 01:20:56 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +92,8 @@ public final class OperatorFactory {
HashTableDummyOperator.class));
opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
HashTableSinkOperator.class));
+ opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
+ DummyStoreOperator.class));
}

public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Wed Dec 19 01:20:56 2012
@@ -142,7 +142,7 @@ public class SMBMapJoinOperator extends
super.initializeLocalWork(hconf);
}

- public void initializeMapredLocalWork(MapJoinDesc conf, Configuration hconf,
+ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf,
MapredLocalWork localWork, Log l4j) throws HiveException {
if (localWork == null || localWorkInited) {
return;
@@ -154,7 +154,13 @@ public class SMBMapJoinOperator extends
// create map local operators
Map<String,FetchWork> aliasToFetchWork = localWork.getAliasToFetchWork();
Map<String, Operator<? extends OperatorDesc>> aliasToWork = localWork.getAliasToWork();
+ Map<String, DummyStoreOperator> aliasToSinkWork = conf.getAliasToSink();

+ // The operator tree till the sink operator needs to be processed while
+ // fetching the next row to fetch from the priority queue (possibly containing
+ // multiple files in the small table given a file in the big table). The remaining
+ // tree will be processed while processing the join.
+ // Look at comments in DummyStoreOperator for additional explanation.
for (Map.Entry<String, FetchWork> entry : aliasToFetchWork.entrySet()) {
String alias = entry.getKey();
FetchWork fetchWork = entry.getValue();
@@ -167,7 +173,9 @@ public class SMBMapJoinOperator extends
forwardOp.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()});
fetchOp.clearFetchContext();

- MergeQueue mergeQueue = new MergeQueue(alias, fetchWork, jobClone);
+ DummyStoreOperator sinkOp = aliasToSinkWork.get(alias);
+
+ MergeQueue mergeQueue = new MergeQueue(alias, fetchWork, jobClone, forwardOp, sinkOp);

aliasToMergeQueue.put(alias, mergeQueue);
l4j.info("fetch operators for " + alias + " initialized");
@@ -515,15 +523,20 @@ public class SMBMapJoinOperator extends
String table = tagToAlias.get(tag);
MergeQueue mergeQueue = aliasToMergeQueue.get(table);

- Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
- .get(table);
+ // The operator tree till the sink operator has already been processed while
+ // fetching the next row to fetch from the priority queue (possibly containing
+ // multiple files in the small table given a file in the big table). Now, process
+ // the remaining tree. Look at comments in DummyStoreOperator for additional
+ // explanation.
+ Operator<? extends OperatorDesc> forwardOp =
+ conf.getAliasToSink().get(table).getChildOperators().get(0);
try {
InspectableObject row = mergeQueue.getNextRow();
if (row == null) {
fetchDone[tag] = true;
return;
}
- forwardOp.process(row.o, 0);
+ forwardOp.process(row.o, tag);
// check if any operator had a fatal error or early exit during
// execution
if (forwardOp.getDone()) {
@@ -624,15 +637,21 @@ public class SMBMapJoinOperator extends
transient FetchOperator[] segments;
transient List<ExprNodeEvaluator> keyFields;
transient List<ObjectInspector> keyFieldOIs;
+ transient Operator<? extends OperatorDesc> forwardOp;
+ transient DummyStoreOperator sinkOp;

// index of FetchOperator which is providing smallest one
transient Integer currentMinSegment;
transient ObjectPair<List<Object>, InspectableObject>[] keys;

- public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf) {
+ public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf,
+ Operator<? extends OperatorDesc> forwardOp,
+ DummyStoreOperator sinkOp) {
this.alias = alias;
this.fetchWork = fetchWork;
this.jobConf = jobConf;
+ this.forwardOp = forwardOp;
+ this.sinkOp = sinkOp;
}

// paths = bucket files of small table for current bucket file of big table
@@ -684,6 +703,7 @@ public class SMBMapJoinOperator extends
}
}

+ @Override
protected boolean lessThan(Object a, Object b) {
return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0;
}
@@ -730,20 +750,31 @@ public class SMBMapJoinOperator extends
// return true if current min segment(FetchOperator) has next row
private boolean next(Integer current) throws IOException, HiveException {
if (keyFields == null) {
- // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime
byte tag = tagForAlias(alias);
+ // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime
keyFields = joinKeys.get(tag);
keyFieldOIs = joinKeysObjectInspectors.get(tag);
}
InspectableObject nextRow = segments[current].getNextRow();
- if (nextRow != null) {
+ while (nextRow != null) {
+ sinkOp.reset();
if (keys[current] == null) {
keys[current] = new ObjectPair<List<Object>, InspectableObject>();
}
- // todo this should be changed to be evaluated lazily, especially for single segment case
- keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs));
- keys[current].setSecond(nextRow);
- return true;
+
+ // Pass the row though the operator tree. It is guaranteed that not more than 1 row can
+ // be produced from a input row.
+ forwardOp.process(nextRow.o, 0);
+ nextRow = sinkOp.getResult();
+
+ // It is possible that the row got absorbed in the operator tree.
+ if (nextRow.o != null) {
+ // todo this should be changed to be evaluated lazily, especially for single segment case
+ keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs));
+ keys[current].setSecond(nextRow);
+ return true;
+ }
+ nextRow = segments[current].getNextRow();
}
keys[current] = null;
return false;

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Wed Dec 19 01:20:56 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * this transformation does bucket map join optimization.
+ */
+abstract public class AbstractBucketJoinProc implements NodeProcessor {
+
+ private static final Log LOG = LogFactory.getLog(AbstractBucketJoinProc.class.getName());
+
+ public AbstractBucketJoinProc() {
+ }
+
+ @Override
+ abstract public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException;
+
+ public List<String> toColumns(List<ExprNodeDesc> keys) {
+ List<String> columns = new ArrayList<String>();
+ for (ExprNodeDesc key : keys) {
+ if (!(key instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ columns.add(((ExprNodeColumnDesc) key).getColumn());
+ }
+ return columns;
+ }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Wed Dec 19 01:20:56 2012
@@ -59,15 +59,15 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;

/**
- *this transformation does bucket map join optimization.
+ * this transformation does bucket map join optimization.
*/
public class BucketMapJoinOptimizer implements Transform {

@@ -82,21 +82,21 @@ public class BucketMapJoinOptimizer impl

Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx =
- new BucketMapjoinOptProcCtx(pctx.getConf());
+ new BucketMapjoinOptProcCtx(pctx.getConf());

// process map joins with no reducers pattern
opRules.put(new RuleRegExp("R1",
- MapJoinOperator.getOperatorName() + "%"),
- getBucketMapjoinProc(pctx));
+ MapJoinOperator.getOperatorName() + "%"),
+ getBucketMapjoinProc(pctx));
opRules.put(new RuleRegExp("R2",
- ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName()),
- getBucketMapjoinRejectProc(pctx));
+ ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName()),
+ getBucketMapjoinRejectProc(pctx));
opRules.put(new RuleRegExp(new String("R3"),
- UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
- getBucketMapjoinRejectProc(pctx));
+ UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
+ getBucketMapjoinRejectProc(pctx));
opRules.put(new RuleRegExp(new String("R4"),
- MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
- getBucketMapjoinRejectProc(pctx));
+ MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
+ getBucketMapjoinRejectProc(pctx));

// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -113,7 +113,7 @@ public class BucketMapJoinOptimizer impl
}

private NodeProcessor getBucketMapjoinRejectProc(ParseContext pctx) {
- return new NodeProcessor () {
+ return new NodeProcessor() {
@Override
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx, Object... nodeOutputs)
@@ -141,7 +141,7 @@ public class BucketMapJoinOptimizer impl
};
}

- class BucketMapjoinOptProc implements NodeProcessor {
+ class BucketMapjoinOptProc extends AbstractBucketJoinProc implements NodeProcessor {

protected ParseContext pGraphContext;

@@ -156,12 +156,12 @@ public class BucketMapJoinOptimizer impl
BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
HiveConf conf = context.getConf();

- if(context.getListOfRejectedMapjoins().contains(mapJoinOp)) {
+ if (context.getListOfRejectedMapjoins().contains(mapJoinOp)) {
return false;
}

QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext().get(mapJoinOp);
- if(joinCxt == null) {
+ if (joinCxt == null) {
return false;
}

@@ -170,19 +170,27 @@ public class BucketMapJoinOptimizer impl
String[] left = joinCxt.getLeftAliases();
List<String> mapAlias = joinCxt.getMapAliases();
String baseBigAlias = null;
- for(String s : left) {
- if(s != null && !joinAliases.contains(s)) {
- joinAliases.add(s);
- if(!mapAlias.contains(s)) {
- baseBigAlias = s;
+
+ for (String s : left) {
+ if (s != null) {
+ String subQueryAlias = QB.getAppendedAliasFromId(joinCxt.getId(), s);
+ if (!joinAliases.contains(subQueryAlias)) {
+ joinAliases.add(subQueryAlias);
+ if(!mapAlias.contains(s)) {
+ baseBigAlias = subQueryAlias;
+ }
}
}
}
- for(String s : srcs) {
- if(s != null && !joinAliases.contains(s)) {
- joinAliases.add(s);
- if(!mapAlias.contains(s)) {
- baseBigAlias = s;
+
+ for (String s : srcs) {
+ if (s != null) {
+ String subQueryAlias = QB.getAppendedAliasFromId(joinCxt.getId(), s);
+ if (!joinAliases.contains(subQueryAlias)) {
+ joinAliases.add(subQueryAlias);
+ if(!mapAlias.contains(s)) {
+ baseBigAlias = subQueryAlias;
+ }
}
}
}
@@ -194,7 +202,7 @@ public class BucketMapJoinOptimizer impl
new LinkedHashMap<String, List<List<String>>>();

Map<String, Operator<? extends OperatorDesc>> topOps =
- this.pGraphContext.getTopOps();
+ this.pGraphContext.getTopOps();
Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();

// (partition to bucket file names) and (partition to bucket number) for
@@ -206,26 +214,60 @@ public class BucketMapJoinOptimizer impl
boolean bigTablePartitioned = true;
for (int index = 0; index < joinAliases.size(); index++) {
String alias = joinAliases.get(index);
- TableScanOperator tso = (TableScanOperator) topOps.get(alias);
- if (tso == null) {
+ Operator<? extends OperatorDesc> topOp = joinCxt.getAliasToOpInfo().get(alias);
+ if (topOp == null) {
return false;
}
List<String> keys = toColumns(mjDesc.getKeys().get((byte) index));
if (keys == null || keys.isEmpty()) {
return false;
}
+ int oldKeySize = keys.size();
+ TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, keys);
+ if (tso == null) {
+ return false;
+ }
+
+ // For nested sub-queries, the alias mapping is not maintained in QB currently.
+ if (topOps.containsValue(tso)) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : topOps.entrySet()) {
+ if (topOpEntry.getValue() == tso) {
+ String newAlias = topOpEntry.getKey();
+ joinAliases.set(index, newAlias);
+ if (baseBigAlias.equals(alias)) {
+ baseBigAlias = newAlias;
+ }
+ alias = newAlias;
+ break;
+ }
+ }
+ }
+ else {
+ // Ideally, this should never happen, and this should be an assert.
+ return false;
+ }
+
+ // The join keys cannot be transformed in the sub-query currently.
+ // TableAccessAnalyzer.genRootTableScan will only return the base table scan
+ // if the join keys are constants or a column. Even a simple cast of the join keys
+ // will result in a null table scan operator. In case of constant join keys, they would
+ // be removed, and the size before and after the genRootTableScan will be different.
+ if (keys.size() != oldKeySize) {
+ return false;
+ }
if (orders == null) {
orders = new Integer[keys.size()];
}

Table tbl = topToTable.get(tso);
- if(tbl.isPartitioned()) {
+ if (tbl.isPartitioned()) {
PrunedPartitionList prunedParts;
try {
prunedParts = pGraphContext.getOpToPartList().get(tso);
if (prunedParts == null) {
- prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), pGraphContext.getConf(), alias,
- pGraphContext.getPrunedPartitions());
+ prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso),
+ pGraphContext.getConf(), alias,
+ pGraphContext.getPrunedPartitions());
pGraphContext.getOpToPartList().put(tso, prunedParts);
}
} catch (HiveException e) {
@@ -238,7 +280,7 @@ public class BucketMapJoinOptimizer impl
// construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
if (partitions.isEmpty()) {
if (!alias.equals(baseBigAlias)) {
- aliasToPartitionBucketNumberMapping.put(alias, Arrays.<Integer>asList());
+ aliasToPartitionBucketNumberMapping.put(alias, Arrays.<Integer> asList());
aliasToPartitionBucketFileNamesMapping.put(alias, new ArrayList<List<String>>());
}
} else {
@@ -253,10 +295,10 @@ public class BucketMapJoinOptimizer impl
int bucketCount = p.getBucketCount();
if (fileNames.size() != bucketCount) {
String msg = "The number of buckets for table " +
- tbl.getTableName() + " partition " + p.getName() + " is " +
- p.getBucketCount() + ", whereas the number of files is " + fileNames.size();
+ tbl.getTableName() + " partition " + p.getName() + " is " +
+ p.getBucketCount() + ", whereas the number of files is " + fileNames.size();
throw new SemanticException(
- ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
+ ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
}
if (alias.equals(baseBigAlias)) {
bigTblPartsToBucketFileNames.put(p, fileNames);
@@ -280,10 +322,10 @@ public class BucketMapJoinOptimizer impl
// The number of files for the table should be same as number of buckets.
if (fileNames.size() != num) {
String msg = "The number of buckets for table " +
- tbl.getTableName() + " is " + tbl.getNumBuckets() +
- ", whereas the number of files is " + fileNames.size();
+ tbl.getTableName() + " is " + tbl.getNumBuckets() +
+ ", whereas the number of files is " + fileNames.size();
throw new SemanticException(
- ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
+ ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
}
if (alias.equals(baseBigAlias)) {
bigTblPartsToBucketFileNames.put(null, fileNames);
@@ -308,10 +350,10 @@ public class BucketMapJoinOptimizer impl
MapJoinDesc desc = mapJoinOp.getConf();

Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
- new LinkedHashMap<String, Map<String, List<String>>>();
+ new LinkedHashMap<String, Map<String, List<String>>>();

- //sort bucket names for the big table
- for(List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
+ // sort bucket names for the big table
+ for (List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
Collections.sort(partBucketNames);
}

@@ -333,7 +375,7 @@ public class BucketMapJoinOptimizer impl

// for each bucket file in big table, get the corresponding bucket file
// name in the small table.
- //more than 1 partition in the big table, do the mapping for each partition
+ // more than 1 partition in the big table, do the mapping for each partition
Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames =
bigTblPartsToBucketFileNames.entrySet().iterator();
Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
@@ -376,17 +418,6 @@ public class BucketMapJoinOptimizer impl
return null;
}

- private List<String> toColumns(List<ExprNodeDesc> keys) {
- List<String> columns = new ArrayList<String>();
- for (ExprNodeDesc key : keys) {
- if (!(key instanceof ExprNodeColumnDesc)) {
- return null;
- }
- columns.add(((ExprNodeColumnDesc) key).getColumn());
- }
- return columns;
- }
-
// convert partition to partition spec string
private Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
Map<String, List<String>> converted = new HashMap<String, List<String>>();
@@ -406,7 +437,7 @@ public class BucketMapJoinOptimizer impl

for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) {
ArrayList<String> resultFileNames = new ArrayList<String>();
- for (int sindex = 0 ; sindex < smallTblBucketNums.size(); sindex++) {
+ for (int sindex = 0; sindex < smallTblBucketNums.size(); sindex++) {
int smallTblBucketNum = smallTblBucketNums.get(sindex);
List<String> smallTblFileNames = smallTblFilesList.get(sindex);
if (bigTblBucketNum >= smallTblBucketNum) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Wed Dec 19 01:20:56 2012
@@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -50,14 +50,12 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

//try to replace a bucket map join with a sorted merge map join
public class SortedMergeBucketMapJoinOptimizer implements Transform {
@@ -104,7 +102,7 @@ public class SortedMergeBucketMapJoinOpt
};
}

- class SortedMergeBucketMapjoinProc implements NodeProcessor {
+ class SortedMergeBucketMapjoinProc extends AbstractBucketJoinProc implements NodeProcessor {
private ParseContext pGraphContext;

public SortedMergeBucketMapjoinProc(ParseContext pctx) {
@@ -134,7 +132,9 @@ public class SortedMergeBucketMapJoinOpt
return false;
}
String[] srcs = joinCxt.getBaseSrc();
- int pos = 0;
+ for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
+ srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]);
+ }

// All the tables/partitions columns should be sorted in the same order
// For example, if tables A and B are being joined on columns c1, c2 and c3
@@ -142,15 +142,14 @@ public class SortedMergeBucketMapJoinOpt
// c1, c2 and c3 are sorted in the same order.
List<Order> sortColumnsFirstTable = new ArrayList<Order>();

- for (String src : srcs) {
+ for (int pos = 0; pos < srcs.length; pos++) {
tableSorted = tableSorted
&& isTableSorted(this.pGraphContext,
mapJoinOp,
joinCxt,
- src,
pos,
- sortColumnsFirstTable);
- pos++;
+ sortColumnsFirstTable,
+ srcs);
}
if (!tableSorted) {
//this is a mapjoin but not suit for a sort merge bucket map join. check outer joins
@@ -196,13 +195,55 @@ public class SortedMergeBucketMapJoinOpt
this.pGraphContext.getListMapJoinOpsNoReducer().add(indexInListMapJoinNoReducer, smbJop);
}

+ Map<String, DummyStoreOperator> aliasToSink =
+ new HashMap<String, DummyStoreOperator>();
+ // For all parents (other than the big table), insert a dummy store operator
+ /* Consider a query like:
+ *
+ * select * from
+ * (subq1 --> has a filter)
+ * join
+ * (subq2 --> has a filter)
+ * on some key
+ *
+ * Let us assume that subq1 is the small table (either specified by the user or inferred
+ * automatically). The following operator tree will be created:
+ *
+ * TableScan (subq1) --> Select --> Filter --> DummyStore
+ * \
+ * \ SMBJoin
+ * /
+ * /
+ * TableScan (subq2) --> Select --> Filter
+ */
List<? extends Operator> parentOperators = mapJoinOp.getParentOperators();
for (int i = 0; i < parentOperators.size(); i++) {
Operator par = parentOperators.get(i);
int index = par.getChildOperators().indexOf(mapJoinOp);
par.getChildOperators().remove(index);
- par.getChildOperators().add(index, smbJop);
+ if (i == smbJoinDesc.getPosBigTable()) {
+ par.getChildOperators().add(index, smbJop);
+ }
+ else {
+ DummyStoreOperator dummyStoreOp = new DummyStoreOperator();
+ par.getChildOperators().add(index, dummyStoreOp);
+
+ List<Operator<? extends OperatorDesc>> childrenOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ childrenOps.add(smbJop);
+ dummyStoreOp.setChildOperators(childrenOps);
+
+ List<Operator<? extends OperatorDesc>> parentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ parentOps.add(par);
+ dummyStoreOp.setParentOperators(parentOps);
+
+ aliasToSink.put(srcs[i], dummyStoreOp);
+ smbJop.getParentOperators().remove(i);
+ smbJop.getParentOperators().add(i, dummyStoreOp);
+ }
}
+ smbJoinDesc.setAliasToSink(aliasToSink);
List<? extends Operator> childOps = mapJoinOp.getChildOperators();
for (int i = 0; i < childOps.size(); i++) {
Operator child = childOps.get(i);
@@ -229,40 +270,74 @@ public class SortedMergeBucketMapJoinOpt
private boolean isTableSorted(ParseContext pctx,
MapJoinOperator op,
QBJoinTree joinTree,
- String alias,
int pos,
- List<Order> sortColumnsFirstTable)
+ List<Order> sortColumnsFirstTable,
+ String[] aliases)
throws SemanticException {
-
- Map<String, Operator<? extends OperatorDesc>> topOps = this.pGraphContext
- .getTopOps();
+ String alias = aliases[pos];
Map<TableScanOperator, Table> topToTable = this.pGraphContext
.getTopToTable();
- TableScanOperator tso = (TableScanOperator) topOps.get(alias);
+
+ /*
+ * Consider a query like:
+ *
+ * select -- mapjoin(subq1) -- * from
+ * (select a.key, a.value from tbl1 a) subq1
+ * join
+ * (select a.key, a.value from tbl2 a) subq2
+ * on subq1.key = subq2.key;
+ *
+ * aliasToOpInfo contains the SelectOperator for subq1 and subq2.
+ * We need to traverse the tree (using TableAccessAnalyzer) to get to the base
+ * table. If the object being map-joined is a base table, then aliasToOpInfo
+ * contains the TableScanOperator, and TableAccessAnalyzer is a no-op.
+ */
+ Operator<? extends OperatorDesc> topOp = joinTree.getAliasToOpInfo().get(alias);
+ if (topOp == null) {
+ return false;
+ }
+ List<String> joinCols = toColumns(op.getConf().getKeys().get((byte) pos));
+ if (joinCols == null || joinCols.isEmpty()) {
+ return false;
+ }
+ TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, joinCols);
if (tso == null) {
return false;
}

- List<ExprNodeDesc> keys = op.getConf().getKeys().get((byte) pos);
- // get all join columns from join keys stored in MapJoinDesc
- List<String> joinCols = new ArrayList<String>();
- List<ExprNodeDesc> joinKeys = new ArrayList<ExprNodeDesc>();
- joinKeys.addAll(keys);
- while (joinKeys.size() > 0) {
- ExprNodeDesc node = joinKeys.remove(0);
- if (node instanceof ExprNodeColumnDesc) {
- joinCols.addAll(node.getCols());
- } else if (node instanceof ExprNodeGenericFuncDesc) {
- ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node);
- GenericUDF udf = udfNode.getGenericUDF();
- if (!FunctionRegistry.isDeterministic(udf)) {
- return false;
+ // For nested sub-queries, the alias mapping is not maintained in QB currently.
+ /*
+ * Consider a query like:
+ *
+ * select count(*) from
+ * (
+ * select key, count(*) from
+ * (
+ * select --mapjoin(a)-- a.key as key, a.value as val1, b.value as val2
+ * from tbl1 a join tbl2 b on a.key = b.key
+ * ) subq1
+ * group by key
+ * ) subq2;
+ *
+ * The table alias should be subq2:subq1:a which needs to be fetched from topOps.
+ */
+ if (pGraphContext.getTopOps().containsValue(tso)) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry :
+ this.pGraphContext.getTopOps().entrySet()) {
+ if (topOpEntry.getValue() == tso) {
+ alias = topOpEntry.getKey();
+ aliases[pos] = alias;
+ break;
}
- joinKeys.addAll(0, udfNode.getChildExprs());
}
}
+ else {
+ // Ideally, this should never happen, and this should be an assert.
+ return false;
+ }

Table tbl = topToTable.get(tso);
+
if (tbl.isPartitioned()) {
PrunedPartitionList prunedParts = null;
try {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Wed Dec 19 01:20:56 2012
@@ -76,7 +76,20 @@ public class QB {
}
qbp = new QBParseInfo(alias, isSubQ);
qbm = new QBMetaData();
- id = (outer_id == null ? alias : outer_id + ":" + alias);
+ id = getAppendedAliasFromId(outer_id, alias);
+ }
+
+ // For sub-queries, the id. and alias should be appended since same aliases can be re-used
+ // within different sub-queries.
+ // For a query like:
+ // select ...
+ // (select * from T1 a where ...) subq1
+ // join
+ // (select * from T2 a where ...) subq2
+ // ..
+ // the alias is modified to subq1:a and subq2:a from a, to identify the right sub-query.
+ public static String getAppendedAliasFromId(String outer_id, String alias) {
+ return (outer_id == null ? alias : outer_id + ":" + alias);
}

public QBParseInfo getParseInfo() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java Wed Dec 19 01:20:56 2012
@@ -22,8 +22,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;

+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
/**
* Internal representation of the join tree.
*
@@ -39,6 +43,11 @@ public class QBJoinTree implements Seria
private JoinCond[] joinCond;
private boolean noOuterJoin;
private boolean noSemiJoin;
+ private Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo;
+
+ // The subquery identifier from QB.
+ // It is of the form topSubQuery:innerSubQuery:....:innerMostSubQuery
+ private String id;

// keeps track of the right-hand-side table name of the left-semi-join, and
// its list of join keys
@@ -74,6 +83,7 @@ public class QBJoinTree implements Seria
noOuterJoin = true;
noSemiJoin = true;
rhsSemijoin = new HashMap<String, ArrayList<ASTNode>>();
+ aliasToOpInfo = new HashMap<String, Operator<? extends OperatorDesc>>();
}

/**
@@ -320,4 +330,20 @@ public class QBJoinTree implements Seria
public void setFilterMap(int[][] filterMap) {
this.filterMap = filterMap;
}
+
+ public Map<String, Operator<? extends OperatorDesc>> getAliasToOpInfo() {
+ return aliasToOpInfo;
+ }
+
+ public void setAliasToOpInfo(Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo) {
+ this.aliasToOpInfo = aliasToOpInfo;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Dec 19 01:20:56 2012
@@ -5630,7 +5630,7 @@ public class SemanticAnalyzer extends Ba
}

private Operator genJoinOperator(QB qb, QBJoinTree joinTree,
- HashMap<String, Operator> map) throws SemanticException {
+ Map<String, Operator> map) throws SemanticException {
QBJoinTree leftChild = joinTree.getJoinSrc();
Operator joinSrcOp = null;
if (leftChild != null) {
@@ -5829,7 +5829,7 @@ public class SemanticAnalyzer extends Ba
}
}

- private Operator genJoinPlan(QB qb, HashMap<String, Operator> map)
+ private Operator genJoinPlan(QB qb, Map<String, Operator> map)
throws SemanticException {
QBJoinTree joinTree = qb.getQbJoinTree();
Operator joinOp = genJoinOperator(qb, joinTree, map);
@@ -5841,7 +5841,7 @@ public class SemanticAnalyzer extends Ba
* source operators. This procedure traverses the query tree recursively,
*/
private void pushJoinFilters(QB qb, QBJoinTree joinTree,
- HashMap<String, Operator> map) throws SemanticException {
+ Map<String, Operator> map) throws SemanticException {
if (joinTree.getJoinSrc() != null) {
pushJoinFilters(qb, joinTree.getJoinSrc(), map);
}
@@ -5881,7 +5881,15 @@ public class SemanticAnalyzer extends Ba
return cols;
}

- private QBJoinTree genUniqueJoinTree(QB qb, ASTNode joinParseTree)
+ // The join alias is modified before being inserted for consumption by sort-merge
+ // join queries. If the join is part of a sub-query the alias is modified to include
+ // the sub-query alias.
+ private String getModifiedAlias(QB qb, String alias) {
+ return QB.getAppendedAliasFromId(qb.getId(), alias);
+ }
+
+ private QBJoinTree genUniqueJoinTree(QB qb, ASTNode joinParseTree,
+ Map<String, Operator> aliasToOpInfo)
throws SemanticException {
QBJoinTree joinTree = new QBJoinTree();
joinTree.setNoOuterJoin(false);
@@ -5920,6 +5928,9 @@ public class SemanticAnalyzer extends Ba
} else {
rightAliases.add(alias);
}
+ joinTree.getAliasToOpInfo().put(
+ getModifiedAlias(qb, alias), aliasToOpInfo.get(alias));
+ joinTree.setId(qb.getId());
baseSrc.add(alias);

preserved.add(lastPreserved);
@@ -5977,7 +5988,8 @@ public class SemanticAnalyzer extends Ba
return joinTree;
}

- private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree)
+ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree,
+ Map<String, Operator> aliasToOpInfo)
throws SemanticException {
QBJoinTree joinTree = new QBJoinTree();
JoinCond[] condn = new JoinCond[1];
@@ -6024,8 +6036,11 @@ public class SemanticAnalyzer extends Ba
String[] children = new String[2];
children[0] = alias;
joinTree.setBaseSrc(children);
+ joinTree.setId(qb.getId());
+ joinTree.getAliasToOpInfo().put(
+ getModifiedAlias(qb, alias), aliasToOpInfo.get(alias));
} else if (isJoinToken(left)) {
- QBJoinTree leftTree = genJoinTree(qb, left);
+ QBJoinTree leftTree = genJoinTree(qb, left, aliasToOpInfo);
joinTree.setJoinSrc(leftTree);
String[] leftChildAliases = leftTree.getLeftAliases();
String leftAliases[] = new String[leftChildAliases.length + 1];
@@ -6054,6 +6069,10 @@ public class SemanticAnalyzer extends Ba
}
children[1] = alias;
joinTree.setBaseSrc(children);
+ aliasToOpInfo.get(alias);
+ joinTree.setId(qb.getId());
+ joinTree.getAliasToOpInfo().put(
+ getModifiedAlias(qb, alias), aliasToOpInfo.get(alias));
// remember rhs table for semijoin
if (joinTree.getNoSemiJoin() == false) {
joinTree.addRHSSemijoin(alias);
@@ -6158,6 +6177,7 @@ public class SemanticAnalyzer extends Ba
rightAliases[i + trgtRightAliases.length] = nodeRightAliases[i];
}
target.setRightAliases(rightAliases);
+ target.getAliasToOpInfo().putAll(node.getAliasToOpInfo());

String[] nodeBaseSrc = node.getBaseSrc();
String[] trgtBaseSrc = target.getBaseSrc();
@@ -7477,7 +7497,7 @@ public class SemanticAnalyzer extends Ba
public Operator genPlan(QB qb) throws SemanticException {

// First generate all the opInfos for the elements in the from clause
- HashMap<String, Operator> aliasToOpInfo = new HashMap<String, Operator>();
+ Map<String, Operator> aliasToOpInfo = new HashMap<String, Operator>();

// Recurse over the subqueries to fill the subquery part of the plan
for (String alias : qb.getSubqAliases()) {
@@ -7503,10 +7523,10 @@ public class SemanticAnalyzer extends Ba
ASTNode joinExpr = qb.getParseInfo().getJoinExpr();

if (joinExpr.getToken().getType() == HiveParser.TOK_UNIQUEJOIN) {
- QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr);
+ QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr, aliasToOpInfo);
qb.setQbJoinTree(joinTree);
} else {
- QBJoinTree joinTree = genJoinTree(qb, joinExpr);
+ QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo);
qb.setQbJoinTree(joinTree);
mergeJoinTree(qb);
}
@@ -7542,7 +7562,7 @@ public class SemanticAnalyzer extends Ba
* @throws SemanticException
*/

- void genLateralViewPlans(HashMap<String, Operator> aliasToOpInfo, QB qb)
+ void genLateralViewPlans(Map<String, Operator> aliasToOpInfo, QB qb)
throws SemanticException {
Map<String, ArrayList<ASTNode>> aliasToLateralViews = qb.getParseInfo()
.getAliasToLateralViews();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java Wed Dec 19 01:20:56 2012
@@ -226,10 +226,9 @@ public class TableAccessAnalyzer {
* names on that table that map to the keys used for the input
* operator (which is currently only a join or group by).
*/
- private static TableScanOperator genRootTableScan(
+ public static TableScanOperator genRootTableScan(
Operator<? extends OperatorDesc> op, List<String> keyNames) {

- boolean complexTree = false;
Operator<? extends OperatorDesc> currOp = op;
List<String> currColNames = keyNames;
List<Operator<? extends OperatorDesc>> parentOps = null;
@@ -238,26 +237,24 @@ public class TableAccessAnalyzer {
// along the way that changes the rows from the table through
// joins or aggregations. Only allowed operators are selects
// and filters.
- while (!complexTree) {
+ while (true) {
parentOps = currOp.getParentOperators();
if (parentOps == null) {
- break;
+ return (TableScanOperator) currOp;
}

if (parentOps.size() > 1 ||
!(currOp.columnNamesRowResolvedCanBeObtained())) {
- complexTree = true;
+ return null;
} else {
// Generate the map of the input->output column name for the keys
// we are about
if (!TableAccessAnalyzer.genColNameMap(currOp, currColNames)) {
- complexTree = true;
+ return null;
}
currOp = parentOps.get(0);
}
}
-
- return complexTree? null: (TableScanOperator) currOp;
}

/*

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java Wed Dec 19 01:20:56 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+
+/**
+ * Dummy Store Desc. This is only used by sort-merge joins to store the
+ * result for the small table (sub-query) being scanned.
+ */
+@Explain(displayName = "Dummy Store")
+public class DummyStoreDesc extends AbstractOperatorDesc {
+ private static final long serialVersionUID = 1L;
+
+ public DummyStoreDesc() {
+ }
+
+ @Override
+ public DummyStoreDesc clone() {
+ return new DummyStoreDesc();
+ }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java Wed Dec 19 01:20:56 2012
@@ -20,16 +20,20 @@ package org.apache.hadoop.hive.ql.plan;

import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;

@Explain(displayName = "Sorted Merge Bucket Map Join Operator")
public class SMBJoinDesc extends MapJoinDesc implements Serializable {

private static final long serialVersionUID = 1L;
-
+
private MapredLocalWork localWork;
-
- //keep a mapping from tag to the fetch operator alias
+
+ //keep a mapping from tag to the fetch operator alias
private HashMap<Byte, String> tagToAlias;
+ private Map<String, DummyStoreOperator> aliasToSink;

public SMBJoinDesc(MapJoinDesc conf) {
super(conf);
@@ -53,5 +57,12 @@ public class SMBJoinDesc extends MapJoin
public void setTagToAlias(HashMap<Byte, String> tagToAlias) {
this.tagToAlias = tagToAlias;
}
-
+
+ public Map<String, DummyStoreOperator> getAliasToSink() {
+ return aliasToSink;
+ }
+
+ public void setAliasToSink(Map<String, DummyStoreOperator> aliasToSink) {
+ this.aliasToSink = aliasToSink;
+ }
}

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q Wed Dec 19 01:20:56 2012
@@ -0,0 +1,280 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 1;
+
+CREATE TABLE tbl1(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE tbl2(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+insert overwrite table tbl1
+select * from src where key < 10;
+
+insert overwrite table tbl2
+select * from src where key < 10;
+
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+
+-- The mapjoin is being performed as part of sub-query. It should be converted to a sort-merge join
+explain
+select count(*) from (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1;
+
+select count(*) from (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1;
+
+-- The mapjoin is being performed as part of sub-query. It should be converted to a sort-merge join
+-- Add a order by at the end to make the results deterministic.
+explain
+select key, count(*) from
+(
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+group by key
+order by key;
+
+select key, count(*) from
+(
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+group by key
+order by key;
+
+-- The mapjoin is being performed as part of more than one sub-query. It should be converted to a sort-merge join
+explain
+select count(*) from
+(
+ select key, count(*) from
+ (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+ ) subq1
+ group by key
+) subq2;
+
+select count(*) from
+(
+ select key, count(*) from
+ (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+ ) subq1
+ group by key
+) subq2;
+
+-- A join is being performed across different sub-queries, where a mapjoin is being performed in each of them.
+-- Each sub-query should be converted to a sort-merge join.
+explain
+select src1.key, src1.cnt1, src2.cnt1 from
+(
+ select key, count(*) as cnt1 from
+ (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+ ) subq1 group by key
+) src1
+join
+(
+ select key, count(*) as cnt1 from
+ (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+ ) subq2 group by key
+) src2
+on src1.key = src2.key
+order by src1.key, src1.cnt1, src2.cnt1;
+
+select src1.key, src1.cnt1, src2.cnt1 from
+(
+ select key, count(*) as cnt1 from
+ (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+ ) subq1 group by key
+) src1
+join
+(
+ select key, count(*) as cnt1 from
+ (
+ select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+ ) subq2 group by key
+) src2
+on src1.key = src2.key
+order by src1.key, src1.cnt1, src2.cnt1;
+
+-- The subquery itself is being map-joined. Since the sub-query only contains selects and filters, it should
+-- be converted to a sort-merge join.
+explain
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join
+ (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+ on subq1.key = subq2.key;
+
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join
+ (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+ on subq1.key = subq2.key;
+
+-- The subquery itself is being map-joined. Since the sub-query only contains selects and filters, it should
+-- be converted to a sort-merge join, although there is more than one level of sub-query
+explain
+select /*+mapjoin(subq2)*/ count(*) from
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq1
+ where key < 6
+ ) subq2
+ join tbl2 b
+ on subq2.key = b.key;
+
+select /*+mapjoin(subq2)*/ count(*) from
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq1
+ where key < 6
+ ) subq2
+ join tbl2 b
+ on subq2.key = b.key;
+
+-- Both the big table and the small table are nested sub-queries i.e more then 1 level of sub-query.
+-- The join should be converted to a sort-merge join
+explain
+select /*+mapjoin(subq2)*/ count(*) from
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq1
+ where key < 6
+ ) subq2
+ join
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq3
+ where key < 6
+ ) subq4
+ on subq2.key = subq4.key;
+
+select /*+mapjoin(subq2)*/ count(*) from
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq1
+ where key < 6
+ ) subq2
+ join
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq3
+ where key < 6
+ ) subq4
+ on subq2.key = subq4.key;
+
+-- The subquery itself is being map-joined. Since the sub-query only contains selects and filters and the join key
+-- is not getting modified, it should be converted to a sort-merge join. Note that the sub-query modifies one
+-- item, but that is not part of the join key.
+explain
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key as key, concat(a.value, a.value) as value from tbl1 a where key < 8) subq1
+ join
+ (select a.key as key, concat(a.value, a.value) as value from tbl2 a where key < 8) subq2
+ on subq1.key = subq2.key;
+
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key as key, concat(a.value, a.value) as value from tbl1 a where key < 8) subq1
+ join
+ (select a.key as key, concat(a.value, a.value) as value from tbl2 a where key < 8) subq2
+ on subq1.key = subq2.key;
+
+-- Since the join key is modified by the sub-query, neither sort-merge join not bucketized map-side
+-- join should be performed
+explain
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key +1 as key, concat(a.value, a.value) as value from tbl1 a) subq1
+ join
+ (select a.key +1 as key, concat(a.value, a.value) as value from tbl2 a) subq2
+ on subq1.key = subq2.key;
+
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key +1 as key, concat(a.value, a.value) as value from tbl1 a) subq1
+ join
+ (select a.key +1 as key, concat(a.value, a.value) as value from tbl2 a) subq2
+ on subq1.key = subq2.key;
+
+-- The small table is a sub-query and the big table is not.
+-- It should be converted to a sort-merge join.
+explain
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join tbl2 a on subq1.key = a.key;
+
+select /*+mapjoin(subq1)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join tbl2 a on subq1.key = a.key;
+
+-- The big table is a sub-query and the small table is not.
+-- It should be converted to a sort-merge join.
+explain
+select /*+mapjoin(a)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join tbl2 a on subq1.key = a.key;
+
+select /*+mapjoin(a)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join tbl2 a on subq1.key = a.key;
+
+-- There are more than 2 inputs to the join, all of them being sub-queries.
+-- It should be converted to to a sort-merge join
+explain
+select /*+mapjoin(subq1, subq2)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join
+ (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+ on (subq1.key = subq2.key)
+ join
+ (select a.key as key, a.value as value from tbl2 a where key < 6) subq3
+ on (subq1.key = subq3.key);
+
+select /*+mapjoin(subq1, subq2)*/ count(*) from
+ (select a.key as key, a.value as value from tbl1 a where key < 6) subq1
+ join
+ (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+ on subq1.key = subq2.key
+ join
+ (select a.key as key, a.value as value from tbl2 a where key < 6) subq3
+ on (subq1.key = subq3.key);
+
+-- The mapjoin is being performed on a nested sub-query, and an aggregation is performed after that.
+-- The join should be converted to a sort-merge join
+explain
+select count(*) from (
+ select /*+mapjoin(subq2)*/ subq2.key as key, subq2.value as value1, b.value as value2 from
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq1
+ where key < 6
+ ) subq2
+join tbl2 b
+on subq2.key = b.key) a;
+
+select count(*) from (
+ select /*+mapjoin(subq2)*/ subq2.key as key, subq2.value as value1, b.value as value2 from
+ (
+ select * from
+ (
+ select a.key as key, a.value as value from tbl1 a where key < 8
+ ) subq1
+ where key < 6
+ ) subq2
+join tbl2 b
+on subq2.key = b.key) a;

Search Discussions

Discussion Posts

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 2 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedDec 19, '12 at 1:21a
activeDec 19, '12 at 1:21a
posts2
users1
websitehive.apache.org

1 user in discussion

Kevinwilfong: 2 posts

People

Translate

site design / logo © 2021 Grokbase