FAQ
Author: kevinwilfong
Date: Tue Mar 12 17:50:55 2013
New Revision: 1455650

URL: http://svn.apache.org/r1455650
Log:
HIVE-4096. problem in hive.map.groupby.sorted with distincts. (njain via kevinwilfong)

Added:
     hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q
     hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out
Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1455650&r1=1455649&r2=1455650&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Tue Mar 12 17:50:55 2013
@@ -175,7 +175,9 @@ public class GroupByOptimizer implements
        boolean useMapperSort =
            HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);

- if (useMapperSort && (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
+ // Dont remove the operator for distincts
+ if (useMapperSort && !groupByOp.getConf().isDistinct() &&
+ (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
          convertGroupByMapSideSortedGroupBy(groupByOp, depth);
        }
        else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1455650&r1=1455649&r2=1455650&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Mar 12 17:50:55 2013
@@ -2849,7 +2849,7 @@ public class SemanticAnalyzer extends Ba

      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+ false, groupByMemoryUsage, memoryThreshold, null, false, 0, numDistinctUDFs > 0),
          new RowSchema(groupByOutputRowResolver.getColumnInfos()),
          reduceSinkOperatorInfo), groupByOutputRowResolver);
      op.setColumnExprMap(colExprMap);
@@ -3023,11 +3023,13 @@ public class SemanticAnalyzer extends Ba
        reduceValues = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getValueCols();
      }
      int numDistinctUDFs = 0;
+ boolean containsDistinctAggr = false;
      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
        ASTNode value = entry.getValue();
        String aggName = unescapeIdentifier(value.getChild(0).getText());
        ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
        boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI);
+ containsDistinctAggr = containsDistinctAggr || isDistinct;

        // If the function is distinct, partial aggregation has not been done on
        // the client side.
@@ -3129,7 +3131,7 @@ public class SemanticAnalyzer extends Ba
              distPartAgg, groupByMemoryUsage, memoryThreshold,
              groupingSets,
              groupingSetsPresent && groupingSetsNeedAdditionalMRJob,
- groupingSetsPosition),
+ groupingSetsPosition, containsDistinctAggr),
          new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo),
          groupByOutputRowResolver);
      op.setColumnExprMap(colExprMap);
@@ -3250,6 +3252,7 @@ public class SemanticAnalyzer extends Ba
          .getAggregationExprsForClause(dest);
      assert (aggregationTrees != null);

+ boolean containsDistinctAggr = false;
      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
        ASTNode value = entry.getValue();
        String aggName = unescapeIdentifier(value.getChild(0).getText());
@@ -3265,6 +3268,7 @@ public class SemanticAnalyzer extends Ba
        }

        boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+ containsDistinctAggr = containsDistinctAggr || isDistinct;
        boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
        Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);

@@ -3293,7 +3297,7 @@ public class SemanticAnalyzer extends Ba
      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
              false, groupByMemoryUsage, memoryThreshold,
- groupingSetKeys, groupingSetsPresent, groupingSetsPosition),
+ groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr),
          new RowSchema(groupByOutputRowResolver.getColumnInfos()),
          inputOperatorInfo), groupByOutputRowResolver);
      op.setColumnExprMap(colExprMap);
@@ -3752,6 +3756,7 @@ public class SemanticAnalyzer extends Ba

      HashMap<String, ASTNode> aggregationTrees = parseInfo
          .getAggregationExprsForClause(dest);
+ boolean containsDistinctAggr = false;
      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
        ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
        ASTNode value = entry.getValue();
@@ -3768,6 +3773,7 @@ public class SemanticAnalyzer extends Ba
        String aggName = unescapeIdentifier(value.getChild(0).getText());

        boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI;
+ containsDistinctAggr = containsDistinctAggr || isDistinct;
        boolean isStar = value.getType() == HiveParser.TOK_FUNCTIONSTAR;
        Mode amode = groupByDescModeToUDAFMode(mode, isDistinct);
        GenericUDAFEvaluator genericUDAFEvaluator = genericUDAFEvaluators
@@ -3795,7 +3801,7 @@ public class SemanticAnalyzer extends Ba

      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+ false, groupByMemoryUsage, memoryThreshold, null, false, 0, containsDistinctAggr),
          new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
          reduceSinkOperatorInfo2), groupByOutputRowResolver2);
      op.setColumnExprMap(colExprMap);
@@ -5998,7 +6004,7 @@ public class SemanticAnalyzer extends Ba
          .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+ false, groupByMemoryUsage, memoryThreshold, null, false, 0, false),
          new RowSchema(groupByOutputRowResolver.getColumnInfos()),
          inputOperatorInfo), groupByOutputRowResolver);


Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1455650&r1=1455649&r2=1455650&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Tue Mar 12 17:50:55 2013
@@ -65,6 +65,7 @@ public class GroupByDesc extends Abstrac
    private ArrayList<java.lang.String> outputColumnNames;
    private float groupByMemoryUsage;
    private float memoryThreshold;
+ transient private boolean isDistinct;

    public GroupByDesc() {
    }
@@ -79,10 +80,11 @@ public class GroupByDesc extends Abstrac
        final float memoryThreshold,
        final List<Integer> listGroupingSets,
        final boolean groupingSetsPresent,
- final int groupingSetsPosition) {
+ final int groupingSetsPosition,
+ final boolean isDistinct) {
      this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
        false, groupByMemoryUsage, memoryThreshold, listGroupingSets,
- groupingSetsPresent, groupingSetsPosition);
+ groupingSetsPresent, groupingSetsPosition, isDistinct);
    }

    public GroupByDesc(
@@ -96,7 +98,8 @@ public class GroupByDesc extends Abstrac
        final float memoryThreshold,
        final List<Integer> listGroupingSets,
        final boolean groupingSetsPresent,
- final int groupingSetsPosition) {
+ final int groupingSetsPosition,
+ final boolean isDistinct) {
      this.mode = mode;
      this.outputColumnNames = outputColumnNames;
      this.keys = keys;
@@ -108,6 +111,7 @@ public class GroupByDesc extends Abstrac
      this.listGroupingSets = listGroupingSets;
      this.groupingSetsPresent = groupingSetsPresent;
      this.groupingSetPosition = groupingSetsPosition;
+ this.isDistinct = isDistinct;
    }

    public Mode getMode() {
@@ -249,4 +253,8 @@ public class GroupByDesc extends Abstrac
    public void setGroupingSetPosition(int groupingSetPosition) {
      this.groupingSetPosition = groupingSetPosition;
    }
+
+ public boolean isDistinct() {
+ return isDistinct;
+ }
  }

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q?rev=1455650&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_8.q Tue Mar 12 17:50:55 2013
@@ -0,0 +1,20 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+
+CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1');
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1';
+
+-- The plan is not converted to a map-side, since although the sorting columns and grouping
+-- columns match, the user is issueing a distinct
+EXPLAIN
+select count(distinct key) from T1;
+select count(distinct key) from T1;
+
+DROP TABLE T1;
\ No newline at end of file

Added: hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out?rev=1455650&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby_sort_8.q.out Tue Mar 12 17:50:55 2013
@@ -0,0 +1,124 @@
+PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@T1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@t1
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@t1
+POSTHOOK: Output: default@t1@ds=1
+PREHOOK: query: -- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Input: default@t1@ds=1
+PREHOOK: Output: default@t1@ds=1
+POSTHOOK: query: -- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Input: default@t1@ds=1
+POSTHOOK: Output: default@t1@ds=1
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
+PREHOOK: query: -- The plan is not converted to a map-side, since although the sorting columns and grouping
+-- columns match, the user is issueing a distinct
+EXPLAIN
+select count(distinct key) from T1
+PREHOOK: type: QUERY
+POSTHOOK: query: -- The plan is not converted to a map-side, since although the sorting columns and grouping
+-- columns match, the user is issueing a distinct
+EXPLAIN
+select count(distinct key) from T1
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ t1
+ TableScan
+ alias: t1
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ outputColumnNames: key
+ Group By Operator
+ aggregations:
+ expr: count(DISTINCT key)
+ bucketGroup: true
+ keys:
+ expr: key
+ type: string
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col1
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(DISTINCT KEY._col0:0._col0)
+ bucketGroup: false
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: bigint
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select count(distinct key) from T1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Input: default@t1@ds=1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(distinct key) from T1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Input: default@t1@ds=1
+#### A masked pattern was here ####
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]
+5
+PREHOOK: query: DROP TABLE T1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: DROP TABLE T1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ]
+POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ]

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 12, '13 at 5:51p
activeMar 12, '13 at 5:51p
posts1
users1
websitehive.apache.org

1 user in discussion

Kevinwilfong: 1 post

People

Translate

site design / logo © 2021 Grokbase