FAQ
Author: xuefu
Date: Wed Mar 11 14:10:26 2015
New Revision: 1665877

URL: http://svn.apache.org/r1665877
Log:
HIVE-9659: 'Error while trying to create table container' occurs during hive query case execution when hive.optimize.skewjoin set to 'true' [Spark Branch] (Rui via Xuefu)

Added:
     hive/branches/spark/ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q
     hive/branches/spark/ql/src/test/results/clientpositive/runtime_skewjoin_mapjoin_spark.q.out
     hive/branches/spark/ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out
Modified:
     hive/branches/spark/itests/src/test/resources/testconfiguration.properties
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java

Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Wed Mar 11 14:10:26 2015
@@ -812,6 +812,7 @@ spark.query.files=add_part_multiple.q, \
    rcfile_bigdata.q, \
    reduce_deduplicate_exclude_join.q, \
    router_join_ppr.q, \
+ runtime_skewjoin_mapjoin_spark.q, \
    sample1.q, \
    sample10.q, \
    sample2.q, \

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java Wed Mar 11 14:10:26 2015
@@ -18,6 +18,8 @@

  package org.apache.hadoop.hive.ql.lib;

+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.Task;
  import org.apache.hadoop.hive.ql.parse.SemanticException;

  /**
@@ -58,6 +60,12 @@ public class PreOrderWalker extends Defa
        for (Node n : nd.getChildren()) {
          walk(n);
        }
+ } else if (nd instanceof ConditionalTask) {
+ for (Task n : ((ConditionalTask) nd).getListTasks()) {
+ if (n.getParentTasks() == null || n.getParentTasks().isEmpty()) {
+ walk(n);
+ }
+ }
      }

      opStack.pop();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Wed Mar 11 14:10:26 2015
@@ -335,6 +335,7 @@ public class GenSparkSkewJoinProcessor {
      if (child != null) {
        currTask.removeDependentTask(child);
        listTasks.add(child);
+ listWorks.add(child.getWork());
      }
      ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context =
          new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child);
@@ -413,12 +414,4 @@ public class GenSparkSkewJoinProcessor {
      }
      mapJoinDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
    }
-
- // check this before calling processSkewJoin
- public static boolean supportRuntimeSkewJoin(JoinOperator joinOp,
- Task<? extends Serializable> currTask, HiveConf hiveConf) {
- List<Task<? extends Serializable>> children = currTask.getChildTasks();
- return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp)
- && (children == null || children.size() <= 1);
- }
  }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Wed Mar 11 14:10:26 2015
@@ -59,6 +59,9 @@ import org.apache.hadoop.hive.ql.plan.Sp

  public class SparkMapJoinResolver implements PhysicalPlanResolver {

+ // prevents a task from being processed multiple times
+ private final Set<Task<? extends Serializable>> visitedTasks = new HashSet<>();
+
    @Override
    public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {

@@ -78,6 +81,15 @@ public class SparkMapJoinResolver implem
      return matchingOps != null && !matchingOps.isEmpty();
    }

+ private boolean containsOp(SparkWork sparkWork, Class<?> clazz) {
+ for (BaseWork work : sparkWork.getAllWorkUnsorted()) {
+ if (containsOp(work, clazz)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
    public static Set<Operator<?>> getOp(BaseWork work, Class<?> clazz) {
      Set<Operator<?>> ops = new HashSet<Operator<?>>();
      if (work instanceof MapWork) {
@@ -172,70 +184,68 @@ public class SparkMapJoinResolver implem

      private void generateLocalWork(SparkTask originalTask) {
        SparkWork originalWork = originalTask.getWork();
- Collection<BaseWork> allBaseWorks = originalWork.getAllWorkUnsorted();
-
- for (BaseWork work : allBaseWorks) {
- if (containsOp(work, SparkHashTableSinkOperator.class) ||
- containsOp(work, MapJoinOperator.class)) {
- work.setMapRedLocalWork(new MapredLocalWork());
- }
- }
-
+ Collection<BaseWork> allBaseWorks = originalWork.getAllWork();
        Context ctx = physicalContext.getContext();

        for (BaseWork work : allBaseWorks) {
- Set<Operator<?>> ops = getOp(work, MapJoinOperator.class);
- if (ops == null || ops.isEmpty()) {
- continue;
- }
- Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId());
- MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork();
- List<Operator<? extends OperatorDesc>> dummyOps =
- new ArrayList<Operator<? extends OperatorDesc>>(work.getDummyOps());
- bigTableLocalWork.setDummyParentOp(dummyOps);
- bigTableLocalWork.setTmpPath(tmpPath);
-
- // In one work, only one map join operator can be bucketed
- SparkBucketMapJoinContext bucketMJCxt = null;
- for (Operator<? extends OperatorDesc> op: ops) {
- MapJoinOperator mapJoinOp = (MapJoinOperator) op;
- MapJoinDesc mapJoinDesc = mapJoinOp.getConf();
- if (mapJoinDesc.isBucketMapJoin()) {
- bucketMJCxt = new SparkBucketMapJoinContext(mapJoinDesc);
- bucketMJCxt.setBucketMatcherClass(
- org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
- bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap());
- ((MapWork) work).setUseBucketizedHiveInputFormat(true);
- bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt);
- bigTableLocalWork.setInputFileChangeSensitive(true);
- break;
+ if (work.getMapRedLocalWork() == null) {
+ if (containsOp(work, SparkHashTableSinkOperator.class) ||
+ containsOp(work, MapJoinOperator.class)) {
+ work.setMapRedLocalWork(new MapredLocalWork());
            }
- }
-
- for (BaseWork parentWork : originalWork.getParents(work)) {
- Set<Operator<?>> hashTableSinkOps =
- getOp(parentWork, SparkHashTableSinkOperator.class);
- if (hashTableSinkOps == null || hashTableSinkOps.isEmpty()) {
+ Set<Operator<?>> ops = getOp(work, MapJoinOperator.class);
+ if (ops == null || ops.isEmpty()) {
              continue;
            }
- MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork();
- parentLocalWork.setTmpHDFSPath(tmpPath);
- if (bucketMJCxt != null) {
- // We only need to update the work with the hashtable
- // sink operator with the same mapjoin desc. We can tell
- // that by comparing the bucket file name mapping map
- // instance. They should be exactly the same one due to
- // the way how the bucket mapjoin context is constructed.
- for (Operator<? extends OperatorDesc> op: hashTableSinkOps) {
- SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) op;
- SparkHashTableSinkDesc hashTableSinkDesc = hashTableSinkOp.getConf();
- BucketMapJoinContext original = hashTableSinkDesc.getBucketMapjoinContext();
- if (original != null && original.getBucketFileNameMapping()
- == bucketMJCxt.getBucketFileNameMapping()) {
- ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true);
- parentLocalWork.setBucketMapjoinContext(bucketMJCxt);
- parentLocalWork.setInputFileChangeSensitive(true);
- break;
+ Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId());
+ MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork();
+ List<Operator<? extends OperatorDesc>> dummyOps =
+ new ArrayList<Operator<? extends OperatorDesc>>(work.getDummyOps());
+ bigTableLocalWork.setDummyParentOp(dummyOps);
+ bigTableLocalWork.setTmpPath(tmpPath);
+
+ // In one work, only one map join operator can be bucketed
+ SparkBucketMapJoinContext bucketMJCxt = null;
+ for (Operator<? extends OperatorDesc> op : ops) {
+ MapJoinOperator mapJoinOp = (MapJoinOperator) op;
+ MapJoinDesc mapJoinDesc = mapJoinOp.getConf();
+ if (mapJoinDesc.isBucketMapJoin()) {
+ bucketMJCxt = new SparkBucketMapJoinContext(mapJoinDesc);
+ bucketMJCxt.setBucketMatcherClass(
+ org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+ bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap());
+ ((MapWork) work).setUseBucketizedHiveInputFormat(true);
+ bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt);
+ bigTableLocalWork.setInputFileChangeSensitive(true);
+ break;
+ }
+ }
+
+ for (BaseWork parentWork : originalWork.getParents(work)) {
+ Set<Operator<?>> hashTableSinkOps =
+ getOp(parentWork, SparkHashTableSinkOperator.class);
+ if (hashTableSinkOps == null || hashTableSinkOps.isEmpty()) {
+ continue;
+ }
+ MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork();
+ parentLocalWork.setTmpHDFSPath(tmpPath);
+ if (bucketMJCxt != null) {
+ // We only need to update the work with the hashtable
+ // sink operator with the same mapjoin desc. We can tell
+ // that by comparing the bucket file name mapping map
+ // instance. They should be exactly the same one due to
+ // the way how the bucket mapjoin context is constructed.
+ for (Operator<? extends OperatorDesc> op : hashTableSinkOps) {
+ SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) op;
+ SparkHashTableSinkDesc hashTableSinkDesc = hashTableSinkOp.getConf();
+ BucketMapJoinContext original = hashTableSinkDesc.getBucketMapjoinContext();
+ if (original != null && original.getBucketFileNameMapping()
+ == bucketMJCxt.getBucketFileNameMapping()) {
+ ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true);
+ parentLocalWork.setBucketMapjoinContext(bucketMJCxt);
+ parentLocalWork.setInputFileChangeSensitive(true);
+ break;
+ }
                }
              }
            }
@@ -296,10 +306,12 @@ public class SparkMapJoinResolver implem
            for (Task<? extends Serializable> tsk : taskList) {
              if (tsk instanceof SparkTask) {
                processCurrentTask((SparkTask) tsk, (ConditionalTask) currentTask);
+ visitedTasks.add(tsk);
              }
            }
          } else if (currentTask instanceof SparkTask) {
            processCurrentTask((SparkTask) currentTask, null);
+ visitedTasks.add(currentTask);
          }
        }

@@ -312,32 +324,47 @@ public class SparkMapJoinResolver implem
       * wrapped in its task list.
       */
      private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditionalTask) {
- dependencyGraph.clear();
- sparkWorkMap.clear();
        SparkWork sparkWork = sparkTask.getWork();
-
- // Generate MapredLocalWorks for MJ and HTS
- generateLocalWork(sparkTask);
-
- dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
- Set<BaseWork> leaves = sparkWork.getLeaves();
- for (BaseWork leaf : leaves) {
- moveWork(sparkWork, leaf, sparkWork);
- }
-
- // Now remove all BaseWorks in all the childSparkWorks that we created
- // from the original SparkWork
- for (SparkWork newSparkWork : sparkWorkMap.values()) {
- for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
- sparkWork.remove(work);
+ if (!visitedTasks.contains(sparkTask)) {
+ dependencyGraph.clear();
+ sparkWorkMap.clear();
+
+ // Generate MapredLocalWorks for MJ and HTS
+ generateLocalWork(sparkTask);
+
+ dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
+ Set<BaseWork> leaves = sparkWork.getLeaves();
+ for (BaseWork leaf : leaves) {
+ moveWork(sparkWork, leaf, sparkWork);
+ }
+
+ // Now remove all BaseWorks in all the childSparkWorks that we created
+ // from the original SparkWork
+ for (SparkWork newSparkWork : sparkWorkMap.values()) {
+ for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
+ sparkWork.remove(work);
+ }
          }
- }

- Map<SparkWork, SparkTask> createdTaskMap = new LinkedHashMap<SparkWork, SparkTask>();
+ Map<SparkWork, SparkTask> createdTaskMap = new LinkedHashMap<SparkWork, SparkTask>();

- // Now create SparkTasks from the SparkWorks, also set up dependency
- for (SparkWork work : dependencyGraph.keySet()) {
- createSparkTask(sparkTask, work, createdTaskMap, conditionalTask);
+ // Now create SparkTasks from the SparkWorks, also set up dependency
+ for (SparkWork work : dependencyGraph.keySet()) {
+ createSparkTask(sparkTask, work, createdTaskMap, conditionalTask);
+ }
+ } else if (conditionalTask != null) {
+ // We may need to update the conditional task's list. This happens when a common map join
+ // task exists in the task list and has already been processed. In such a case,
+ // the current task is the map join task and we need to replace it with
+ // its parent, i.e. the small table task.
+ if (sparkTask.getParentTasks() != null && sparkTask.getParentTasks().size() == 1 &&
+ sparkTask.getParentTasks().get(0) instanceof SparkTask) {
+ SparkTask parent = (SparkTask) sparkTask.getParentTasks().get(0);
+ if (containsOp(sparkWork, MapJoinOperator.class) &&
+ containsOp(parent.getWork(), SparkHashTableSinkOperator.class)) {
+ updateConditionalTask(conditionalTask, sparkTask, parent);
+ }
+ }
        }
      }

@@ -382,6 +409,10 @@ public class SparkMapJoinResolver implem
            }
          }
          context.setDirToTaskMap(newbigKeysDirToTaskMap);
+ // update no skew task
+ if (context.getNoSkewTask() != null && context.getNoSkewTask().equals(originalTask)) {
+ context.setNoSkewTask(newTask);
+ }
        }
      }
    }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java Wed Mar 11 14:10:26 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
  import org.apache.hadoop.hive.ql.exec.JoinOperator;
  import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
  import org.apache.hadoop.hive.ql.exec.Operator;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
  import org.apache.hadoop.hive.ql.lib.NodeProcessor;
  import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
  import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.physical.GenMRSkewJoinProcessor;
  import org.apache.hadoop.hive.ql.optimizer.physical.GenSparkSkewJoinProcessor;
  import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory;
  import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Sp
  import org.apache.hadoop.hive.ql.plan.TableDesc;

  import java.io.Serializable;
+import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
  import java.util.Stack;
@@ -58,6 +61,9 @@ import java.util.Stack;
   * Spark-version of SkewJoinProcFactory.
   */
  public class SparkSkewJoinProcFactory {
+ // let's remember the join operators we have processed
+ private static final Set<JoinOperator> visitedJoinOp = new HashSet<JoinOperator>();
+
    private SparkSkewJoinProcFactory() {
      // prevent instantiation
    }
@@ -81,13 +87,12 @@ public class SparkSkewJoinProcFactory {
        JoinOperator op = (JoinOperator) nd;
        ReduceWork reduceWork = context.getReducerToReduceWork().get(op);
        ParseContext parseContext = context.getParseCtx();
- if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask
- && reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork)
- && GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
- op, currentTsk, parseContext.getConf())) {
+ if (reduceWork != null && !visitedJoinOp.contains(op) &&
+ supportRuntimeSkewJoin(op, reduceWork, currentTsk, parseContext.getConf())) {
          // first we try to split the task
          splitTask((SparkTask) currentTsk, reduceWork, parseContext);
          GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext);
+ visitedJoinOp.add(op);
        }
        return null;
      }
@@ -112,8 +117,7 @@ public class SparkSkewJoinProcFactory {
        SparkWork newWork =
            new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID));
        newWork.add(childWork);
- copyWorkGraph(currentWork, newWork, childWork, true);
- copyWorkGraph(currentWork, newWork, childWork, false);
+ copyWorkGraph(currentWork, newWork, childWork);
        // remove them from current spark work
        for (BaseWork baseWork : newWork.getAllWorkUnsorted()) {
          currentWork.remove(baseWork);
@@ -196,22 +200,39 @@ public class SparkSkewJoinProcFactory {
    /**
     * Copy a sub-graph from originWork to newWork.
     */
- private static void copyWorkGraph(SparkWork originWork, SparkWork newWork,
- BaseWork baseWork, boolean upWards) {
- if (upWards) {
- for (BaseWork parent : originWork.getParents(baseWork)) {
- newWork.add(parent);
- SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork);
- newWork.connect(parent, baseWork, edgeProperty);
- copyWorkGraph(originWork, newWork, parent, true);
- }
- } else {
- for (BaseWork child : originWork.getChildren(baseWork)) {
+ private static void copyWorkGraph(SparkWork originWork, SparkWork newWork, BaseWork baseWork) {
+ for (BaseWork child : originWork.getChildren(baseWork)) {
+ if (!newWork.contains(child)) {
          newWork.add(child);
          SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(baseWork, child);
          newWork.connect(baseWork, child, edgeProperty);
- copyWorkGraph(originWork, newWork, child, false);
+ copyWorkGraph(originWork, newWork, child);
        }
      }
+ for (BaseWork parent : originWork.getParents(baseWork)) {
+ if (!newWork.contains(parent)) {
+ newWork.add(parent);
+ SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork);
+ newWork.connect(parent, baseWork, edgeProperty);
+ copyWorkGraph(originWork, newWork, parent);
+ }
+ }
+ }
+
+ public static Set<JoinOperator> getVisitedJoinOp() {
+ return visitedJoinOp;
+ }
+
+ private static boolean supportRuntimeSkewJoin(JoinOperator joinOp, ReduceWork reduceWork,
+ Task<? extends Serializable> currTask, HiveConf hiveConf) {
+ if (currTask instanceof SparkTask &&
+ GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp)) {
+ SparkWork sparkWork = ((SparkTask) currTask).getWork();
+ List<Task<? extends Serializable>> children = currTask.getChildTasks();
+ return !joinOp.getConf().isFixedAsSorted() && sparkWork.contains(reduceWork) &&
+ (children == null || children.size() <= 1) &&
+ SparkMapJoinResolver.getOp(reduceWork, CommonJoinOperator.class).size() == 1;
+ }
+ return false;
    }
  }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java Wed Mar 11 14:10:26 2015
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.lib.Dis
  import org.apache.hadoop.hive.ql.lib.GraphWalker;
  import org.apache.hadoop.hive.ql.lib.Node;
  import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
  import org.apache.hadoop.hive.ql.lib.Rule;
  import org.apache.hadoop.hive.ql.lib.RuleRegExp;
  import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
@@ -53,8 +54,10 @@ import org.apache.hadoop.hive.ql.plan.Sp
  public class SparkSkewJoinResolver implements PhysicalPlanResolver {
    @Override
    public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+ SparkSkewJoinProcFactory.getVisitedJoinOp().clear();
      Dispatcher disp = new SparkSkewJoinTaskDispatcher(pctx);
- GraphWalker ogw = new DefaultGraphWalker(disp);
+ // since we may split current task, use a pre-order walker
+ GraphWalker ogw = new PreOrderWalker(disp);
      ArrayList<Node> topNodes = new ArrayList<Node>();
      topNodes.addAll(pctx.getRootTasks());
      ogw.startWalking(topNodes, null);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1665877&r1=1665876&r2=1665877&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Wed Mar 11 14:10:26 2015
@@ -236,6 +236,7 @@ public class SparkWork extends AbstractO
      List<BaseWork> parents = getParents(work);

      for (BaseWork w: children) {
+ edgeProperties.remove(new ImmutablePair<BaseWork, BaseWork>(work, w));
        invertedWorkGraph.get(w).remove(work);
        if (invertedWorkGraph.get(w).size() == 0) {
          roots.add(w);
@@ -243,6 +244,7 @@ public class SparkWork extends AbstractO
      }

      for (BaseWork w: parents) {
+ edgeProperties.remove(new ImmutablePair<BaseWork, BaseWork>(w, work));
        workGraph.get(w).remove(work);
        if (workGraph.get(w).size() == 0) {
          leaves.add(w);

Added: hive/branches/spark/ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q?rev=1665877&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q Wed Mar 11 14:10:26 2015
@@ -0,0 +1,24 @@
+set hive.optimize.skewjoin = true;
+set hive.skewjoin.key = 4;
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=50;
+
+-- This is mainly intended for spark, to test runtime skew join together with map join
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
+
+EXPLAIN
+SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key;
+
+SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key;

Added: hive/branches/spark/ql/src/test/results/clientpositive/runtime_skewjoin_mapjoin_spark.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/runtime_skewjoin_mapjoin_spark.q.out?rev=1665877&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/runtime_skewjoin_mapjoin_spark.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/runtime_skewjoin_mapjoin_spark.q.out Wed Mar 11 14:10:26 2015
@@ -0,0 +1,669 @@
+PREHOOK: query: -- This is mainly intended for spark, to test runtime skew join together with map join
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T1
+POSTHOOK: query: -- This is mainly intended for spark, to test runtime skew join together with map join
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: EXPLAIN
+SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-18 is a root stage , consists of Stage-27, Stage-28, Stage-2
+ Stage-27 has a backup stage: Stage-2
+ Stage-16 depends on stages: Stage-27
+ Stage-8 depends on stages: Stage-2, Stage-13, Stage-14, Stage-16, Stage-17, Stage-22, Stage-23 , consists of Stage-26, Stage-3
+ Stage-26
+ Stage-7 depends on stages: Stage-26
+ Stage-3 depends on stages: Stage-7
+ Stage-28 has a backup stage: Stage-2
+ Stage-17 depends on stages: Stage-28
+ Stage-2
+ Stage-21 is a root stage , consists of Stage-32, Stage-33, Stage-1
+ Stage-32 has a backup stage: Stage-1
+ Stage-19 depends on stages: Stage-32
+ Stage-10 depends on stages: Stage-1, Stage-19, Stage-20 , consists of Stage-31, Stage-2
+ Stage-31
+ Stage-9 depends on stages: Stage-31
+ Stage-15 depends on stages: Stage-9, Stage-11 , consists of Stage-29, Stage-30, Stage-2
+ Stage-29 has a backup stage: Stage-2
+ Stage-13 depends on stages: Stage-29
+ Stage-30 has a backup stage: Stage-2
+ Stage-14 depends on stages: Stage-30
+ Stage-33 has a backup stage: Stage-1
+ Stage-20 depends on stages: Stage-33
+ Stage-1
+ Stage-24 is a root stage , consists of Stage-34, Stage-35, Stage-2
+ Stage-34 has a backup stage: Stage-2
+ Stage-22 depends on stages: Stage-34
+ Stage-35 has a backup stage: Stage-2
+ Stage-23 depends on stages: Stage-35
+ Stage-37 is a root stage
+ Stage-25 depends on stages: Stage-37
+ Stage-12 depends on stages: Stage-25 , consists of Stage-36, Stage-2
+ Stage-36
+ Stage-11 depends on stages: Stage-36
+ Stage-0 depends on stages: Stage-3
+
+STAGE PLANS:
+ Stage: Stage-18
+ Conditional Operator
+
+ Stage: Stage-27
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME1
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+
+ Stage: Stage-16
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-8
+ Conditional Operator
+
+ Stage: Stage-26
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ 1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ 1
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+
+ Stage: Stage-7
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-28
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+
+ Stage: Stage-17
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-2
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ handleSkewJoin: true
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Statistics: Num rows: 302 Data size: 3213 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-21
+ Conditional Operator
+
+ Stage: Stage-32
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a:src2
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a:src2
+ TableScan
+ alias: src2
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ HashTable Sink Operator
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+
+ Stage: Stage-19
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-10
+ Conditional Operator
+
+ Stage: Stage-31
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ 1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ 1
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+
+ Stage: Stage-9
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-15
+ Conditional Operator
+
+ Stage: Stage-29
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME1
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+
+ Stage: Stage-13
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-30
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+
+ Stage: Stage-14
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-33
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a:src1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a:src1
+ TableScan
+ alias: src1
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ HashTable Sink Operator
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+
+ Stage: Stage-20
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ handleSkewJoin: true
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-24
+ Conditional Operator
+
+ Stage: Stage-34
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME1
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+
+ Stage: Stage-22
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-35
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+
+ Stage: Stage-23
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-37
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ b:t1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ b:t1
+ TableScan
+ alias: t1
+ Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ HashTable Sink Operator
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+
+ Stage: Stage-25
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-12
+ Conditional Operator
+
+ Stage: Stage-36
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ 1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ 1
+ TableScan
+ HashTable Sink Operator
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+
+ Stage: Stage-11
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####

Added: hive/branches/spark/ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out?rev=1665877&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out Wed Mar 11 14:10:26 2015
@@ -0,0 +1,314 @@
+PREHOOK: query: -- This is mainly intended for spark, to test runtime skew join together with map join
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T1
+POSTHOOK: query: -- This is mainly intended for spark, to test runtime skew join together with map join
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: EXPLAIN
+SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-7 depends on stages: Stage-1 , consists of Stage-10, Stage-9
+ Stage-10
+ Stage-6 depends on stages: Stage-10
+ Stage-9 depends on stages: Stage-6
+ Stage-5 depends on stages: Stage-9
+ Stage-4 depends on stages: Stage-5 , consists of Stage-8, Stage-2
+ Stage-8
+ Stage-3 depends on stages: Stage-8
+ Stage-2 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-2
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 5 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map 5
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ handleSkewJoin: true
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-7
+ Conditional Operator
+
+ Stage: Stage-10
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 13
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-6
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 12
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-9
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Spark HashTable Sink Operator
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-5
+ Spark
+ Edges:
+ Reducer 3 <- Map 11 (PARTITION-LEVEL SORT, 2), Map 6 (PARTITION-LEVEL SORT, 2)
+#### A masked pattern was here ####
+ Vertices:
+ Map 11
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Map 6
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ outputColumnNames: _col0
+ input vertices:
+ 1 Map 7
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Local Work:
+ Map Reduce Local Work
+ Reducer 3
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ handleSkewJoin: true
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ Statistics: Num rows: 302 Data size: 3213 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-4
+ Conditional Operator
+
+ Stage: Stage-8
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 10
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 9
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-2
+ Spark
+ Edges:
+ Reducer 4 <- Map 8 (GROUP, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 8
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 4
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT COUNT(*) FROM
+ (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a
+JOIN
+ (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b
+ON a.key=b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+3

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 11, '15 at 2:10p
activeMar 11, '15 at 2:10p
posts1
users1
websitehive.apache.org

1 user in discussion

Xuefu: 1 post

People

Translate

site design / logo © 2021 Grokbase