FAQ
Author: vikram
Date: Wed Apr 1 21:45:34 2015
New Revision: 1670779

URL: http://svn.apache.org/r1670779
Log:
HIVE-10001: SMB join in reduce side (Vikram Dixit K, reviewed by Gunther Hagleitner)

Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
     hive/trunk/ql/src/test/queries/clientpositive/tez_join.q
     hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q
     hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q
     hive/trunk/ql/src/test/results/clientpositive/tez/tez_join.q.out
     hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out
     hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Wed Apr 1 21:45:34 2015
@@ -501,12 +501,13 @@ public class CommonMergeJoinOperator ext
      if (parent == null) {
        throw new HiveException("No valid parents.");
      }
- Map<Integer, DummyStoreOperator> dummyOps = parent.getTagToOperatorTree();
+ Map<Integer, DummyStoreOperator> dummyOps =
+ ((TezContext) (MapredContext.get())).getDummyOpsMap();
      for (Entry<Integer, DummyStoreOperator> connectOp : dummyOps.entrySet()) {
        if (connectOp.getValue().getChildOperators() == null
- || connectOp.getValue().getChildOperators().isEmpty()) {
- parentOperators.add(connectOp.getKey(), connectOp.getValue());
- connectOp.getValue().getChildOperators().add(this);
+ || connectOp.getValue().getChildOperators().isEmpty()) {
+ parentOperators.add(connectOp.getKey(), connectOp.getValue());
+ connectOp.getValue().getChildOperators().add(this);
        }
      }
      super.initializeLocalWork(hconf);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Apr 1 21:45:34 2015
@@ -637,11 +637,6 @@ public class MapOperator extends Operato
      return null;
    }

- @Override
- public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
- return MapRecordProcessor.getConnectOps();
- }
-
    public void initializeContexts() {
      Path fpath = getExecContext().getCurrentInputPath();
      String nominalPath = getNominalPath(fpath);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Apr 1 21:45:34 2015
@@ -1350,12 +1350,4 @@ public abstract class Operator<T extends
        return childOperators;
      }
    }
-
- public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
- if ((parentOperators == null) || (parentOperators.size() == 0)) {
- return null;
- }
- Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
- return dummyOps;
- }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Apr 1 21:45:34 2015
@@ -208,6 +208,8 @@ public final class Utilities {
    public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
    public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
    public static final String HIVE_ADDED_JARS = "hive.added.jars";
+ public static String MAPNAME = "Map ";
+ public static String REDUCENAME = "Reducer ";

    /**
     * ReduceField:
@@ -239,6 +241,7 @@ public final class Utilities {

    private static ThreadLocal<Map<Path, BaseWork>> gWorkMap =
        new ThreadLocal<Map<Path, BaseWork>>() {
+ @Override
      protected Map<Path, BaseWork> initialValue() {
        return new HashMap<Path, BaseWork>();
      }
@@ -304,12 +307,13 @@ public final class Utilities {
    public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
        boolean useCache) {
      for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
- setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
+ String prefix = baseWork.getName();
+ setBaseWork(conf, baseWork, mrScratchDir, prefix + MERGE_PLAN_NAME, useCache);
        String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
        if (prefixes == null) {
- prefixes = baseWork.getName();
+ prefixes = prefix;
        } else {
- prefixes = prefixes + "," + baseWork.getName();
+ prefixes = prefixes + "," + prefix;
        }
        conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
      }
@@ -429,7 +433,13 @@ public final class Utilities {
                  + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
            }
          } else if (name.contains(MERGE_PLAN_NAME)) {
- gWork = deserializePlan(in, MapWork.class, conf);
+ if (name.startsWith(MAPNAME)) {
+ gWork = deserializePlan(in, MapWork.class, conf);
+ } else if (name.startsWith(REDUCENAME)) {
+ gWork = deserializePlan(in, ReduceWork.class, conf);
+ } else {
+ throw new RuntimeException("Unknown work type: " + name);
+ }
          }
          gWorkMap.get().put(path, gWork);
        } else if (LOG.isDebugEnabled()) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Apr 1 21:45:34 2015
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.te
  import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
  import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
  import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
  import org.apache.hadoop.hive.ql.plan.MapWork;
  import org.apache.hadoop.hive.ql.plan.OperatorDesc;
  import org.apache.hadoop.hive.serde2.Deserializer;
@@ -82,7 +83,7 @@ public class MapRecordProcessor extends
    private boolean abort = false;
    protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
    private MapWork mapWork;
- List<MapWork> mergeWorkList = null;
+ List<BaseWork> mergeWorkList = null;
    List<String> cacheKeys;
    ObjectCache cache;

@@ -108,28 +109,7 @@ public class MapRecordProcessor extends
        });
      Utilities.setMapWork(jconf, mapWork);

- String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
- if (prefixes != null) {
- mergeWorkList = new ArrayList<MapWork>();
-
- for (final String prefix : prefixes.split(",")) {
- if (prefix == null || prefix.isEmpty()) {
- continue;
- }
-
- key = queryId + prefix;
- cacheKeys.add(key);
-
- mergeWorkList.add(
- (MapWork) cache.retrieve(key,
- new Callable<Object>() {
- @Override
- public Object call() {
- return Utilities.getMergeWork(jconf, prefix);
- }
- }));
- }
- }
+ mergeWorkList = getMergeWorkList(jconf, key, queryId, cache, cacheKeys);
    }

    @Override
@@ -174,7 +154,8 @@ public class MapRecordProcessor extends
        connectOps.clear();
        if (mergeWorkList != null) {
          MapOperator mergeMapOp = null;
- for (MapWork mergeMapWork : mergeWorkList) {
+ for (BaseWork mergeWork : mergeWorkList) {
+ MapWork mergeMapWork = (MapWork) mergeWork;
            if (mergeMapWork.getVectorMode()) {
              mergeMapOp = new VectorMapOperator();
            } else {
@@ -199,6 +180,8 @@ public class MapRecordProcessor extends
          }
        }

+ ((TezContext) (MapredContext.get())).setDummyOpsMap(connectOps);
+
        // initialize map operator
        mapOp.setConf(mapWork);
        l4j.info("Main input name is " + mapWork.getName());
@@ -356,10 +339,6 @@ public class MapRecordProcessor extends
      }
    }

- public static Map<Integer, DummyStoreOperator> getConnectOps() {
- return connectOps;
- }
-
    private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
      // there should be only one MRInput
      MRInputLegacy theMRInput = null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Apr 1 21:45:34 2015
@@ -20,8 +20,13 @@ import com.google.common.base.Preconditi
  import com.google.common.collect.Maps;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
  import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.OutputCollector;
  import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -32,9 +37,12 @@ import org.apache.tez.runtime.api.Proces
  import java.lang.management.ManagementFactory;
  import java.lang.management.MemoryMXBean;
  import java.net.URLClassLoader;
+import java.util.ArrayList;
  import java.util.Arrays;
+import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
+import java.util.concurrent.Callable;

  /**
   * Process input from tez LogicalInput and write output
@@ -110,4 +118,32 @@ public abstract class RecordProcessor {
        outMap.put(entry.getKey(), collector);
      }
    }
+
+ public List<BaseWork> getMergeWorkList(final JobConf jconf, String key, String queryId,
+ ObjectCache cache, List<String> cacheKeys) throws HiveException {
+ String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes != null) {
+ List<BaseWork> mergeWorkList = new ArrayList<BaseWork>();
+
+ for (final String prefix : prefixes.split(",")) {
+ if (prefix == null || prefix.isEmpty()) {
+ continue;
+ }
+
+ key = queryId + prefix;
+ cacheKeys.add(key);
+
+ mergeWorkList.add((BaseWork) cache.retrieve(key, new Callable<Object>() {
+ @Override
+ public Object call() {
+ return Utilities.getMergeWork(jconf, prefix);
+ }
+ }));
+ }
+
+ return mergeWorkList;
+ } else {
+ return null;
+ }
+ }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Apr 1 21:45:34 2015
@@ -18,16 +18,18 @@
  package org.apache.hadoop.hive.ql.exec.tez;

  import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
  import java.util.LinkedList;
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
+import java.util.TreeMap;
  import java.util.concurrent.Callable;

  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
  import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
  import org.apache.hadoop.hive.ql.exec.MapredContext;
  import org.apache.hadoop.hive.ql.exec.ObjectCache;
@@ -38,7 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
  import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
  import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
  import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
  import org.apache.hadoop.hive.ql.plan.ReduceWork;
  import org.apache.hadoop.hive.ql.plan.TableDesc;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -64,84 +66,124 @@ public class ReduceRecordProcessor exte

    public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);

- private ReduceWork redWork;
+ private ReduceWork reduceWork;
+
+ List<BaseWork> mergeWorkList = null;
+ List<String> cacheKeys;
+
+ private final Map<Integer, DummyStoreOperator> connectOps =
+ new TreeMap<Integer, DummyStoreOperator>();
+ private final Map<Integer, ReduceWork> tagToReducerMap = new HashMap<Integer, ReduceWork>();

    private Operator<?> reducer;

    private ReduceRecordSource[] sources;

- private final byte position = 0;
+ private byte bigTablePosition = 0;

    private boolean abort;

- @Override
- void init(final JobConf jconf, ProcessorContext processorContext,
- MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
- super.init(jconf, processorContext, mrReporter, inputs, outputs);
+ public ReduceRecordProcessor(final JobConf jconf) throws Exception {

      ObjectCache cache = ObjectCacheFactory.getCache(jconf);

      String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
      cacheKey = queryId + REDUCE_PLAN_KEY;
- redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
+ cacheKeys = new ArrayList<String>();
+ cacheKeys.add(cacheKey);
+ reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
          @Override
          public Object call() {
            return Utilities.getReduceWork(jconf);
- }
- });
- Utilities.setReduceWork(jconf, redWork);
+ }
+ });

- reducer = redWork.getReducer();
- reducer.getParentOperators().clear();
- reducer.setParentOperators(null); // clear out any parents as reducer is the root
+ Utilities.setReduceWork(jconf, reduceWork);
+ mergeWorkList = getMergeWorkList(jconf, cacheKey, queryId, cache, cacheKeys);
+ }

- int numTags = redWork.getTagToValueDesc().size();
+ @Override
+ void init(JobConf jconf, ProcessorContext processorContext,
+ MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ super.init(jconf, processorContext, mrReporter, inputs, outputs);

- ObjectInspector[] ois = new ObjectInspector[numTags];
- sources = new ReduceRecordSource[numTags];
+ MapredContext.init(false, new JobConf(jconf));
+ List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
+ if (shuffleInputs != null) {
+ l4j.info("Waiting for ShuffleInputs to become ready");
+ processorContext.waitForAllInputsReady(new ArrayList<Input>(shuffleInputs));
+ }

- for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
- TableDesc keyTableDesc = redWork.getKeyDesc();
- TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
+ connectOps.clear();
+ ReduceWork redWork = reduceWork;
+ tagToReducerMap.put(redWork.getTag(), redWork);
+ if (mergeWorkList != null) {
+ for (BaseWork mergeWork : mergeWorkList) {
+ ReduceWork mergeReduceWork = (ReduceWork) mergeWork;
+ reducer = mergeReduceWork.getReducer();
+ DummyStoreOperator dummyStoreOp = getJoinParentOp(reducer);
+ connectOps.put(mergeReduceWork.getTag(), dummyStoreOp);
+ tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork);
+ }

- // make the reader ready for prime time
- Input input = inputs.get(redWork.getTagToInput().get(tag));
- input.start();
- processorContext.waitForAnyInputReady(Collections.singleton(input));
- KeyValuesReader reader = (KeyValuesReader) input.getReader();
-
- // now we can setup the record source
- sources[tag] = new ReduceRecordSource();
- sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
- reader, tag == position, (byte) tag,
- redWork.getAllScratchColumnVectorTypeMaps());
- ois[tag] = sources[tag].getObjectInspector();
+ bigTablePosition = (byte) reduceWork.getTag();
+ ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps);
      }

- MapredContext.init(false, new JobConf(jconf));
+ ObjectInspector[] mainWorkOIs = null;
      ((TezContext) MapredContext.get()).setInputs(inputs);
      ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
- ((TezContext) MapredContext.get()).setRecordSources(sources);
+ int numTags = reduceWork.getTagToValueDesc().size();
+ reducer = reduceWork.getReducer();
+ if (numTags > 1) {
+ sources = new ReduceRecordSource[numTags];
+ mainWorkOIs = new ObjectInspector[numTags];
+ initializeMultipleSources(reduceWork, numTags, mainWorkOIs, sources);
+ ((TezContext) MapredContext.get()).setRecordSources(sources);
+ reducer.initialize(jconf, mainWorkOIs);
+ } else {
+ numTags = tagToReducerMap.keySet().size();
+ sources = new ReduceRecordSource[numTags];
+ mainWorkOIs = new ObjectInspector[numTags];
+ for (int i : tagToReducerMap.keySet()) {
+ redWork = tagToReducerMap.get(i);
+ reducer = redWork.getReducer();
+ initializeSourceForTag(redWork, i, mainWorkOIs, sources,
+ redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0));
+ reducer.initializeLocalWork(jconf);
+ }
+ reducer = reduceWork.getReducer();
+ ((TezContext) MapredContext.get()).setRecordSources(sources);
+ reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] });
+ for (int i : tagToReducerMap.keySet()) {
+ if (i == bigTablePosition) {
+ continue;
+ }
+ redWork = tagToReducerMap.get(i);
+ reducer = redWork.getReducer();
+ reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] });
+ }
+ }

+ reducer = reduceWork.getReducer();
      // initialize reduce operator tree
      try {
        l4j.info(reducer.dump(0));
- reducer.initialize(jconf, ois);

        // Initialization isn't finished until all parents of all operators
        // are initialized. For broadcast joins that means initializing the
        // dummy parent operators as well.
        List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
        if (dummyOps != null) {
- for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ for (HashTableDummyOperator dummyOp : dummyOps) {
            dummyOp.initialize(jconf, null);
          }
        }

        // set output collector for any reduce sink operators in the pipeline.
- List<Operator<? extends OperatorDesc>> children = new LinkedList<Operator<? extends OperatorDesc>>();
+ List<Operator<?>> children = new LinkedList<Operator<?>>();
        children.add(reducer);
        if (dummyOps != null) {
          children.addAll(dummyOps);
@@ -165,13 +207,36 @@ public class ReduceRecordProcessor exte
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
    }

+ private void initializeMultipleSources(ReduceWork redWork, int numTags, ObjectInspector[] ois,
+ ReduceRecordSource[] sources) throws Exception {
+ for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
+ if (redWork.getTagToValueDesc().get(tag) == null) {
+ continue;
+ }
+ initializeSourceForTag(redWork, tag, ois, sources, redWork.getTagToValueDesc().get(tag),
+ redWork.getTagToInput().get(tag));
+ }
+ }
+
+ private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois,
+ ReduceRecordSource[] sources, TableDesc valueTableDesc, String inputName)
+ throws Exception {
+ reducer = redWork.getReducer();
+ reducer.getParentOperators().clear();
+ reducer.setParentOperators(null); // clear out any parents as reducer is the root
+
+ TableDesc keyTableDesc = redWork.getKeyDesc();
+ KeyValuesReader reader = (KeyValuesReader) inputs.get(inputName).getReader();
+
+ sources[tag] = new ReduceRecordSource();
+ sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc,
+ valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
+ redWork.getAllScratchColumnVectorTypeMaps());
+ ois[tag] = sources[tag].getObjectInspector();
+ }
+
    @Override
    void run() throws Exception {
- List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
- if (shuffleInputs != null) {
- l4j.info("Waiting for ShuffleInputs to become ready");
- processorContext.waitForAllInputsReady(new ArrayList<Input>(shuffleInputs));
- }

      for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
        l4j.info("Starting Output: " + outputEntry.getKey());
@@ -180,22 +245,26 @@ public class ReduceRecordProcessor exte
      }

      // run the operator pipeline
- while (sources[position].pushRecord()) {}
+ while (sources[bigTablePosition].pushRecord()) {
+ }
    }

    /**
     * Get the inputs that should be streamed through reduce plan.
+ *
     * @param inputs
     * @return
+ * @throws Exception
     */
- private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
- //the reduce plan inputs have tags, add all inputs that have tags
- Map<Integer, String> tagToinput = redWork.getTagToInput();
+ private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) throws Exception {
+ // the reduce plan inputs have tags, add all inputs that have tags
+ Map<Integer, String> tagToinput = reduceWork.getTagToInput();
      ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
- for(String inpStr : tagToinput.values()){
+ for (String inpStr : tagToinput.values()) {
        if (inputs.get(inpStr) == null) {
          throw new AssertionError("Cound not find input: " + inpStr);
        }
+ inputs.get(inpStr).start();
        shuffleInputs.add(inputs.get(inpStr));
      }
      return shuffleInputs;
@@ -203,8 +272,10 @@ public class ReduceRecordProcessor exte

    @Override
    void close(){
- if (cache != null) {
- cache.release(cacheKey);
+ if (cache != null && cacheKeys != null) {
+ for (String key : cacheKeys) {
+ cache.release(key);
+ }
      }

      try {
@@ -213,13 +284,18 @@ public class ReduceRecordProcessor exte
        }

        reducer.close(abort);
+ if (mergeWorkList != null) {
+ for (BaseWork redWork : mergeWorkList) {
+ ((ReduceWork) redWork).getReducer().close(abort);
+ }
+ }

        // Need to close the dummyOps as well. The operator pipeline
        // is not considered "closed/done" unless all operators are
        // done. For broadcast joins that includes the dummy parents.
- List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
+ List<HashTableDummyOperator> dummyOps = reduceWork.getDummyOps();
        if (dummyOps != null) {
- for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ for (Operator<?> dummyOp : dummyOps) {
            dummyOp.close(abort);
          }
        }
@@ -230,8 +306,8 @@ public class ReduceRecordProcessor exte
        if (!abort) {
          // signal new failure to map-reduce
          l4j.error("Hit error while closing operators - failing tree");
- throw new RuntimeException("Hive Runtime Error while closing operators: "
- + e.getMessage(), e);
+ throw new RuntimeException(
+ "Hive Runtime Error while closing operators: " + e.getMessage(), e);
        }
      } finally {
        Utilities.clearWorkMap();
@@ -239,4 +315,19 @@ public class ReduceRecordProcessor exte
      }
    }

+ private DummyStoreOperator getJoinParentOp(Operator<?> mergeReduceOp) {
+ for (Operator<?> childOp : mergeReduceOp.getChildOperators()) {
+ if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
+ if (childOp instanceof DummyStoreOperator) {
+ return (DummyStoreOperator) childOp;
+ } else {
+ throw new IllegalStateException("Was expecting dummy store operator but found: "
+ + childOp);
+ }
+ } else {
+ return getJoinParentOp(childOp);
+ }
+ }
+ throw new IllegalStateException("Expecting a DummyStoreOperator found op: " + mergeReduceOp);
+ }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Apr 1 21:45:34 2015
@@ -68,7 +68,7 @@ public class ReduceRecordSource implemen

    private boolean abort = false;

- private static Deserializer inputKeyDeserializer;
+ private Deserializer inputKeyDeserializer;

    // Input value serde needs to be an array to support different SerDe
    // for different tags

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Wed Apr 1 21:45:34 2015
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.t

  import java.util.Map;

+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
  import org.apache.hadoop.hive.ql.exec.MapredContext;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.tez.runtime.api.LogicalInput;
@@ -39,6 +40,8 @@ public class TezContext extends MapredCo

    private RecordSource[] sources;

+ private Map<Integer, DummyStoreOperator> dummyOpsMap;
+
    public TezContext(boolean isMap, JobConf jobConf) {
      super(isMap, jobConf);
    }
@@ -80,4 +83,12 @@ public class TezContext extends MapredCo
    public void setRecordSources(RecordSource[] sources) {
      this.sources = sources;
    }
+
+ public void setDummyOpsMap(Map<Integer, DummyStoreOperator> dummyOpsMap) {
+ this.dummyOpsMap = dummyOpsMap;
+ }
+
+ public Map<Integer, DummyStoreOperator> getDummyOpsMap() {
+ return dummyOpsMap;
+ }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Apr 1 21:45:34 2015
@@ -131,7 +131,7 @@ public class TezProcessor extends Abstra
        if (isMap) {
          rproc = new MapRecordProcessor(jobConf);
        } else {
- rproc = new ReduceRecordProcessor();
+ rproc = new ReduceRecordProcessor(jobConf);
        }

        initializeAndRunProcessor(inputs, outputs);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Wed Apr 1 21:45:34 2015
@@ -163,7 +163,7 @@ public class ConvertJoinMapJoin implemen
      // map join operator by default has no bucket cols and num of reduce sinks
      // reduced by 1
      mapJoinOp
- .setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks()));
+.setOpTraits(new OpTraits(null, -1, null));
      mapJoinOp.setStatistics(joinOp.getStatistics());
      // propagate this change till the next RS
      for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
@@ -178,8 +178,7 @@ public class ConvertJoinMapJoin implemen
        TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
      // we cannot convert to bucket map join, we cannot convert to
      // map join either based on the size. Check if we can convert to SMB join.
- if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
- || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
        convertJoinSMBJoin(joinOp, context, 0, 0, false);
        return null;
      }
@@ -254,9 +253,9 @@ public class ConvertJoinMapJoin implemen
      CommonMergeJoinOperator mergeJoinOp =
          (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
              mapJoinConversionPos, mapJoinDesc), joinOp.getSchema());
- int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks();
- OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp
- .getOpTraits().getSortCols(), numReduceSinks);
+ OpTraits opTraits =
+ new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
+ .getSortCols());
      mergeJoinOp.setOpTraits(opTraits);
      mergeJoinOp.setStatistics(joinOp.getStatistics());

@@ -321,8 +320,7 @@ public class ConvertJoinMapJoin implemen
      if (currentOp instanceof ReduceSinkOperator) {
        return;
      }
- currentOp.setOpTraits(new OpTraits(null, -1, null,
- currentOp.getOpTraits().getNumReduceSinks()));
+ currentOp.setOpTraits(new OpTraits(null, -1, null));
      for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
        if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
          break;
@@ -345,7 +343,7 @@ public class ConvertJoinMapJoin implemen

      // we can set the traits for this join operator
      OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
- tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks());
+ tezBucketJoinProcCtx.getNumBuckets(), null);
      mapJoinOp.setOpTraits(opTraits);
      mapJoinOp.setStatistics(joinOp.getStatistics());
      setNumberOfBucketsOnChildren(mapJoinOp);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Wed Apr 1 21:45:34 2015
@@ -70,12 +70,11 @@ public class MergeJoinProc implements No
        context.opMergeJoinWorkMap.put(mergeJoinOp, mergeWork);
      }

+ mergeWork.addMergedWork(null, parentWork, context.leafOperatorToFollowingWork);
      mergeWork.setMergeJoinOperator(mergeJoinOp);
- mergeWork.addMergedWork(null, parentWork);
      tezWork.setVertexType(mergeWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);

      for (BaseWork grandParentWork : tezWork.getParents(parentWork)) {
- parentWork.setName(grandParentWork.getName());
        TezEdgeProperty edgeProp = tezWork.getEdgeProperty(grandParentWork, parentWork);
        tezWork.disconnect(grandParentWork, parentWork);
        tezWork.connect(grandParentWork, mergeWork, edgeProp);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Apr 1 21:45:34 2015
@@ -179,13 +179,44 @@ public class ReduceSinkMapJoinProc imple
        parentRS.getConf().setReducerTraits(EnumSet.of(FIXED));

        numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0];
- Operator<?> rootOp = OperatorUtils.findSingleOperatorUpstream(mapJoinOp.getParentOperators()
- .get(joinConf.getPosBigTable()), TableScanOperator.class);
-
- if (rootOp instanceof TableScanOperator) { // we will run in mapper
- edgeType = EdgeType.CUSTOM_EDGE;
- } else { // we will run in reducer
- edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+ /*
+ * Here, we can be in one of 4 states.
+ *
+ * 1. If map join work is null implies that we have not yet traversed the big table side. We
+ * just need to see if we can find a reduce sink operator in the big table side. This would
+ * imply a reduce side operation.
+ *
+ * 2. If we don't find a reducesink in 1 it has to be the case that it is a map side operation.
+ *
+ * 3. If we have already created a work item for the big table side, we need to see if we can
+ * find a table scan operator in the big table side. This would imply a map side operation.
+ *
+ * 4. If we don't find a table scan operator, it has to be a reduce side operation.
+ */
+ if (mapJoinWork == null) {
+ Operator<?> rootOp =
+ OperatorUtils.findSingleOperatorUpstream(
+ mapJoinOp.getParentOperators().get(joinConf.getPosBigTable()),
+ ReduceSinkOperator.class);
+ if (rootOp == null) {
+ // likely we found a table scan operator
+ edgeType = EdgeType.CUSTOM_EDGE;
+ } else {
+ // we have found a reduce sink
+ edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+ }
+ } else {
+ Operator<?> rootOp =
+ OperatorUtils.findSingleOperatorUpstream(
+ mapJoinOp.getParentOperators().get(joinConf.getPosBigTable()),
+ TableScanOperator.class);
+ if (rootOp != null) {
+ // likely we found a table scan operator
+ edgeType = EdgeType.CUSTOM_EDGE;
+ } else {
+ // we have found a reduce sink
+ edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+ }
        }
      }
      TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Wed Apr 1 21:45:34 2015
@@ -50,19 +50,19 @@ import org.apache.hadoop.hive.ql.plan.Op
   * 1. Bucketing columns.
   * 2. Table
   * 3. Pruned partitions
- *
+ *
   * Bucketing columns refer to not to the bucketing columns from the table object but instead
   * to the dynamic 'bucketing' done by operators such as reduce sinks and group-bys.
   * All the operators have a translation from their input names to the output names corresponding
   * to the bucketing column. The colExprMap that is a part of every operator is used in this
   * transformation.
- *
+ *
   * The table object is used for the base-case in map-reduce when deciding to perform a bucket
   * map join. This object is used in the BucketMapJoinProc to find if number of files for the
   * table correspond to the number of buckets specified in the meta data.
- *
+ *
   * The pruned partition information has the same purpose as the table object at the moment.
- *
+ *
   * The traits of sorted-ness etc. can be populated as well for future optimizations to make use of.
   */

@@ -106,13 +106,11 @@ public class OpTraitsRulesProcFactory {
        List<List<String>> listBucketCols = new ArrayList<List<String>>();
        listBucketCols.add(bucketCols);
        int numBuckets = -1;
- int numReduceSinks = 1;
        OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getTraits();
        if (parentOpTraits != null) {
          numBuckets = parentOpTraits.getNumBuckets();
- numReduceSinks += parentOpTraits.getNumReduceSinks();
        }
- OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks);
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
        rs.setOpTraits(opTraits);
        return null;
      }
@@ -132,8 +130,8 @@ public class OpTraitsRulesProcFactory {
          // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
          if (!partitions.isEmpty()) {
            for (Partition p : partitions) {
- List<String> fileNames =
- AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(),
+ List<String> fileNames =
+ AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(),
                      pGraphContext);
              // The number of files for the table should be same as number of
              // buckets.
@@ -146,8 +144,8 @@ public class OpTraitsRulesProcFactory {
          }
        } else {

- List<String> fileNames =
- AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(),
+ List<String> fileNames =
+ AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(),
                  pGraphContext);
          Integer num = new Integer(tbl.getNumBuckets());

@@ -188,7 +186,7 @@ public class OpTraitsRulesProcFactory {
          sortedColsList.add(sortCols);
        }
        // num reduce sinks hardcoded to 0 because TS has no parents
- OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0);
+ OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
        ts.setOpTraits(opTraits);
        return null;
      }
@@ -213,13 +211,8 @@ public class OpTraitsRulesProcFactory {
        }

        List<List<String>> listBucketCols = new ArrayList<List<String>>();
- int numReduceSinks = 0;
- OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits();
- if (parentOpTraits != null) {
- numReduceSinks = parentOpTraits.getNumReduceSinks();
- }
        listBucketCols.add(gbyKeys);
- OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks);
+ OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
        gbyOp.setOpTraits(opTraits);
        return null;
      }
@@ -255,7 +248,7 @@ public class OpTraitsRulesProcFactory {
      public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
          Object... nodeOutputs) throws SemanticException {
        SelectOperator selOp = (SelectOperator) nd;
- List<List<String>> parentBucketColNames =
+ List<List<String>> parentBucketColNames =
            selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();

        List<List<String>> listBucketCols = null;
@@ -264,7 +257,7 @@ public class OpTraitsRulesProcFactory {
          if (parentBucketColNames != null) {
            listBucketCols = getConvertedColNames(parentBucketColNames, selOp);
          }
- List<List<String>> parentSortColNames =
+ List<List<String>> parentSortColNames =
              selOp.getParentOperators().get(0).getOpTraits().getSortCols();
          if (parentSortColNames != null) {
            listSortCols = getConvertedColNames(parentSortColNames, selOp);
@@ -272,13 +265,11 @@ public class OpTraitsRulesProcFactory {
        }

        int numBuckets = -1;
- int numReduceSinks = 0;
        OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits();
        if (parentOpTraits != null) {
          numBuckets = parentOpTraits.getNumBuckets();
- numReduceSinks = parentOpTraits.getNumReduceSinks();
        }
- OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks);
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
        selOp.setOpTraits(opTraits);
        return null;
      }
@@ -307,13 +298,10 @@ public class OpTraitsRulesProcFactory {
          OpTraits parentOpTraits = rsOp.getOpTraits();
          bucketColsList.add(getOutputColNames(joinOp, parentOpTraits.getBucketColNames(), pos));
          sortColsList.add(getOutputColNames(joinOp, parentOpTraits.getSortCols(), pos));
- if (parentOpTraits.getNumReduceSinks() > numReduceSinks) {
- numReduceSinks = parentOpTraits.getNumReduceSinks();
- }
          pos++;
        }

- joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks));
+ joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
        return null;
      }

@@ -366,17 +354,7 @@ public class OpTraitsRulesProcFactory {
          Object... nodeOutputs) throws SemanticException {
        @SuppressWarnings("unchecked")
        Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd;
-
- int numReduceSinks = 0;
- for (Operator<?> parentOp : operator.getParentOperators()) {
- if (parentOp.getOpTraits() == null) {
- continue;
- }
- if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) {
- numReduceSinks = parentOp.getOpTraits().getNumReduceSinks();
- }
- }
- OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks);
+ OpTraits opTraits = new OpTraits(null, -1, null);
        operator.setOpTraits(opTraits);
        return null;
      }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Wed Apr 1 21:45:34 2015
@@ -103,7 +103,7 @@ public class SparkMapJoinOptimizer imple
      }

      // we can set the traits for this join operator
- OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks());
+ OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null);
      mapJoinOp.setOpTraits(opTraits);
      mapJoinOp.setStatistics(joinOp.getStatistics());
      setNumberOfBucketsOnChildren(mapJoinOp);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Wed Apr 1 21:45:34 2015
@@ -105,7 +105,7 @@ public class GenTezUtils {
      float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
      long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);

- ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
+ ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + (++sequenceNumber));
      LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
      reduceWork.setReducer(root);
      reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
@@ -180,7 +180,7 @@ public class GenTezUtils {
    public MapWork createMapWork(GenTezProcContext context, Operator<?> root,
        TezWork tezWork, PrunedPartitionList partitions) throws SemanticException {
      assert root.getParentOperators().isEmpty();
- MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
+ MapWork mapWork = new MapWork(Utilities.MAPNAME + (++sequenceNumber));
      LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);

      // map work starts with table scan operators

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Wed Apr 1 21:45:34 2015
@@ -166,7 +166,7 @@ public class GenTezWork implements NodeP
        }
        // connect the work correctly.
        work.addSortCols(root.getOpTraits().getSortCols().get(0));
- mergeJoinWork.addMergedWork(work, null);
+ mergeJoinWork.addMergedWork(work, null, context.leafOperatorToFollowingWork);
        Operator<? extends OperatorDesc> parentOp =
            getParentFromStack(context.currentMergeJoinOperator, stack);
        int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
@@ -268,6 +268,7 @@ public class GenTezWork implements NodeP
        if (LOG.isDebugEnabled()) {
          LOG.debug("Removing " + parent + " as parent from " + root);
        }
+ context.leafOperatorToFollowingWork.remove(parent);
        context.leafOperatorToFollowingWork.put(parent, work);
        root.removeParent(parent);
      }
@@ -326,7 +327,7 @@ public class GenTezWork implements NodeP
          MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
          CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
          work.setTag(mergeJoinOp.getTagForOperator(operator));
- mergeJoinWork.addMergedWork(null, work);
+ mergeJoinWork.addMergedWork(null, work, context.leafOperatorToFollowingWork);
          tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
          for (BaseWork parentWork : tezWork.getParents(work)) {
            TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
@@ -399,7 +400,7 @@ public class GenTezWork implements NodeP
      return null;
    }

- private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs)
+ private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs)
        throws SemanticException {
      int index = 0;
      for (BaseWork baseWork : tezWork.getChildren(unionWork)) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java Wed Apr 1 21:45:34 2015
@@ -21,11 +21,13 @@ package org.apache.hadoop.hive.ql.plan;
  import java.util.ArrayList;
  import java.util.List;
  import java.util.Map;
+import java.util.Map.Entry;
  import java.util.Set;

  import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
  import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
  import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
  import org.apache.hadoop.mapred.JobConf;

  public class MergeJoinWork extends BaseWork {
@@ -65,7 +67,8 @@ public class MergeJoinWork extends BaseW
      this.mergeJoinOp = mergeJoinOp;
    }

- public void addMergedWork(BaseWork work, BaseWork connectWork) {
+ public void addMergedWork(BaseWork work, BaseWork connectWork,
+ Map<Operator<?>, BaseWork> leafOperatorToFollowingWork) {
      if (work != null) {
        if ((bigTableWork != null) && (bigTableWork != work)) {
          assert false;
@@ -76,6 +79,39 @@ public class MergeJoinWork extends BaseW

      if (connectWork != null) {
        this.mergeWorkList.add(connectWork);
+ if ((connectWork instanceof ReduceWork) && (bigTableWork != null)) {
+ /*
+ * For tez to route data from an up-stream vertex correctly to the following vertex, the
+ * output name in the reduce sink needs to be setup appropriately. In the case of reduce
+ * side merge work, we need to ensure that the parent work that provides data to this merge
+ * work is setup to point to the right vertex name - the main work name.
+ *
+ * In this case, if the big table work has already been created, we can hook up the merge
+ * work items for the small table correctly.
+ */
+ setReduceSinkOutputName(connectWork, leafOperatorToFollowingWork, bigTableWork.getName());
+ }
+ }
+
+ if (work != null) {
+ /*
+ * Same reason as above. This is the case when we have the main work item after the merge work
+ * has been created for the small table side.
+ */
+ for (BaseWork mergeWork : mergeWorkList) {
+ if (mergeWork instanceof ReduceWork) {
+ setReduceSinkOutputName(mergeWork, leafOperatorToFollowingWork, work.getName());
+ }
+ }
+ }
+ }
+
+ private void setReduceSinkOutputName(BaseWork mergeWork,
+ Map<Operator<?>, BaseWork> leafOperatorToFollowingWork, String name) {
+ for (Entry<Operator<?>, BaseWork> entry : leafOperatorToFollowingWork.entrySet()) {
+ if (entry.getValue() == mergeWork) {
+ ((ReduceSinkOperator) entry.getKey()).getConf().setOutputName(name);
+ }
      }
    }


Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java Wed Apr 1 21:45:34 2015
@@ -25,14 +25,11 @@ public class OpTraits {
    List<List<String>> bucketColNames;
    List<List<String>> sortColNames;
    int numBuckets;
- int numReduceSinks;

- public OpTraits(List<List<String>> bucketColNames, int numBuckets,
- List<List<String>> sortColNames, int numReduceSinks) {
+ public OpTraits(List<List<String>> bucketColNames, int numBuckets, List<List<String>> sortColNames) {
      this.bucketColNames = bucketColNames;
      this.numBuckets = numBuckets;
      this.sortColNames = sortColNames;
- this.numReduceSinks = numReduceSinks;
    }

    public List<List<String>> getBucketColNames() {
@@ -58,12 +55,4 @@ public class OpTraits {
    public List<List<String>> getSortCols() {
      return sortColNames;
    }
-
- public void setNumReduceSinks(int numReduceSinks) {
- this.numReduceSinks = numReduceSinks;
- }
-
- public int getNumReduceSinks() {
- return this.numReduceSinks;
- }
  }

Modified: hive/trunk/ql/src/test/queries/clientpositive/tez_join.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/tez_join.q?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/tez_join.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/tez_join.q Wed Apr 1 21:45:34 2015
@@ -24,20 +24,3 @@ join
  (select rt2.id from
  (select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2
  where vt1.id=vt2.id;
-
-explain
-select vt1.id from
-(select rt1.id from
-(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1
-join
-(select rt2.id from
-(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2
-where vt1.id=vt2.id;
-
-select vt1.id from
-(select rt1.id from
-(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1
-join
-(select rt2.id from
-(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2
-where vt1.id=vt2.id;

Modified: hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/tez_smb_1.q Wed Apr 1 21:45:34 2015
@@ -33,3 +33,21 @@ set hive.auto.convert.join.noconditional
  explain
  select count(*) from tab s1 join tab s3 on s1.key=s3.key;

+set hive.auto.convert.join=false;
+
+explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;

Modified: hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/tez_smb_main.q Wed Apr 1 21:45:34 2015
@@ -82,3 +82,20 @@ UNION ALL
  select s2.key as key, s2.value as value from tab s2
  ) a join tab_part b on (a.key = b.key);

+explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+

Modified: hive/trunk/ql/src/test/results/clientpositive/tez/tez_join.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/tez/tez_join.q.out?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/tez/tez_join.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/tez/tez_join.q.out Wed Apr 1 21:45:34 2015
@@ -48,9 +48,7 @@ STAGE PLANS:
    Stage: Stage-1
      Tez
        Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
- Reducer 5 <- Map 4 (SIMPLE_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
  #### A masked pattern was here ####
        Vertices:
          Map 1
@@ -69,7 +67,7 @@ STAGE PLANS:
                          key expressions: _col0 (type: string), _col1 (type: string)
                          sort order: ++
                          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Map 4
+ Map 3
              Map Operator Tree:
                  TableScan
                    alias: t2
@@ -91,204 +89,33 @@ STAGE PLANS:
                  expressions: KEY.reducesinkkey0 (type: string)
                  outputColumnNames: _col0
                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reducer 3
- Reduce Operator Tree:
- Merge Join Operator
- condition map:
- Inner Join 0 to 1
- keys:
- 0 _col0 (type: string)
- 1 _col0 (type: string)
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Filter Operator
- predicate: (_col0 = _col1) (type: boolean)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 5
              Reduce Operator Tree:
                Select Operator
                  expressions: KEY.reducesinkkey0 (type: string)
                  outputColumnNames: _col0
                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-
- Stage: Stage-0
- Fetch Operator
- limit: -1
- Processor Tree:
- ListSink
-
-PREHOOK: query: select vt1.id from
-(select rt1.id from
-(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1
-join
-(select rt2.id from
-(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2
-where vt1.id=vt2.id
-PREHOOK: type: QUERY
-PREHOOK: Input: default@t1
-PREHOOK: Input: default@t2
-#### A masked pattern was here ####
-POSTHOOK: query: select vt1.id from
-(select rt1.id from
-(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1
-join
-(select rt2.id from
-(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2
-where vt1.id=vt2.id
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@t1
-POSTHOOK: Input: default@t2
-#### A masked pattern was here ####
-PREHOOK: query: explain
-select vt1.id from
-(select rt1.id from
-(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1
-join
-(select rt2.id from
-(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2
-where vt1.id=vt2.id
-PREHOOK: type: QUERY
-POSTHOOK: query: explain
-select vt1.id from
-(select rt1.id from
-(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1
-join
-(select rt2.id from
-(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2
-where vt1.id=vt2.id
-POSTHOOK: type: QUERY
-STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
- Stage: Stage-1
- Tez
- Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
- Reducer 5 <- Map 4 (SIMPLE_EDGE)
-#### A masked pattern was here ####
- Vertices:
- Map 1
- Map Operator Tree:
- TableScan
- alias: t1
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0, _col1
                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                    Filter Operator
- predicate: id is not null (type: boolean)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Group By Operator
- aggregations: count()
- keys: id (type: string), od (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- value expressions: _col2 (type: bigint)
- Map 4
- Map Operator Tree:
- TableScan
- alias: t2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Filter Operator
- predicate: id is not null (type: boolean)
+ predicate: (_col0 = _col1) (type: boolean)
                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Group By Operator
- aggregations: count()
- keys: id (type: string), od (type: string)
- mode: hash
- outputColumnNames: _col0, _col1, _col2
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
                        Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ File Output Operator
+ compressed: false
                          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- value expressions: _col2 (type: bigint)
- Reducer 2
- Reduce Operator Tree:
- Group By Operator
- aggregations: count(VALUE._col0)
- keys: KEY._col0 (type: string), KEY._col1 (type: string)
- mode: mergepartial
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reducer 3
- Reduce Operator Tree:
- Merge Join Operator
- condition map:
- Inner Join 0 to 1
- keys:
- 0 _col0 (type: string)
- 1 _col0 (type: string)
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Filter Operator
- predicate: (_col0 = _col1) (type: boolean)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 5
- Reduce Operator Tree:
- Group By Operator
- aggregations: count(VALUE._col0)
- keys: KEY._col0 (type: string), KEY._col1 (type: string)
- mode: mergepartial
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

    Stage: Stage-0
      Fetch Operator
@@ -298,10 +125,10 @@ STAGE PLANS:

  PREHOOK: query: select vt1.id from
  (select rt1.id from
-(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1
+(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1
  join
  (select rt2.id from
-(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2
+(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2
  where vt1.id=vt2.id
  PREHOOK: type: QUERY
  PREHOOK: Input: default@t1
@@ -309,10 +136,10 @@ PREHOOK: Input: default@t2
  #### A masked pattern was here ####
  POSTHOOK: query: select vt1.id from
  (select rt1.id from
-(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1
+(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1
  join
  (select rt2.id from
-(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2
+(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2
  where vt1.id=vt2.id
  POSTHOOK: type: QUERY
  POSTHOOK: Input: default@t1

Modified: hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_1.q.out Wed Apr 1 21:45:34 2015
@@ -177,3 +177,146 @@ STAGE PLANS:
        Processor Tree:
          ListSink

+PREHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Edges:
+ Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+ Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: t2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reducer 4
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (_col0 = _col1) (type: boolean)
+ Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 5
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+480

Modified: hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out?rev=1670779&r1=1670778&r2=1670779&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/tez/tez_smb_main.q.out Wed Apr 1 21:45:34 2015
@@ -1194,3 +1194,155 @@ STAGE PLANS:
        Processor Tree:
          ListSink

+PREHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+ Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE)
+ Reducer 5 <- Reducer 4 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: t2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reducer 4
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ outputColumnNames: _col0, _col1
+ input vertices:
+ 0 Reducer 2
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (_col0 = _col1) (type: boolean)
+ Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 5
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+480

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 1, '15 at 9:45p
activeApr 1, '15 at 9:45p
posts1
users1
websitehive.apache.org

1 user in discussion

Vikram: 1 post

People

Translate

site design / logo © 2022 Grokbase