FAQ
Repository: hive
Updated Branches:
   refs/heads/master 0ec6e8893 -> 11f1e47eb


HIVE-12944: Support SUM(DISTINCT) for partitioning query. (Aihua Xu, reviewed by Szehon Ho)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/11f1e47e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/11f1e47e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/11f1e47e

Branch: refs/heads/master
Commit: 11f1e47ebadc7cba24e1fb9f0dfbfaf7f786d2cb
Parents: 0ec6e88
Author: Aihua Xu <aihuaxu@apache.org>
Authored: Wed Jan 27 11:25:00 2016 -0500
Committer: Aihua Xu <aihuaxu@apache.org>
Committed: Mon Feb 1 10:15:01 2016 -0500

----------------------------------------------------------------------
  .../functions/HiveSqlSumAggFunction.java | 3 +
  .../translator/SqlFunctionConverter.java | 6 +
  .../hive/ql/udf/generic/GenericUDAFCount.java | 3 -
  .../hive/ql/udf/generic/GenericUDAFSum.java | 110 ++++++++++++++-----
  .../queries/clientpositive/windowing_distinct.q | 8 ++
  .../clientnegative/invalid_sum_syntax.q.out | 2 +-
  .../clientpositive/windowing_distinct.q.out | 26 +++++
  7 files changed, 125 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
index 056eaeb..498cd0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
@@ -71,6 +71,9 @@ public class HiveSqlSumAggFunction extends SqlAggFunction {

    //~ Methods ----------------------------------------------------------------

+ public boolean isDistinct() {
+ return isDistinct;
+ }

    @Override
    public <T> T unwrap(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
index 75c38fa..19aa414 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
@@ -229,6 +229,12 @@ public class SqlFunctionConverter {
                      "TOK_FUNCTIONDI");
                }
              }
+ } else if (op instanceof HiveSqlSumAggFunction) { // case SUM(DISTINCT)
+ HiveSqlSumAggFunction sumFunction = (HiveSqlSumAggFunction) op;
+ if (sumFunction.isDistinct()) {
+ node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONDI,
+ "TOK_FUNCTIONDI");
+ }
            }
            node.addChild((ASTNode) ParseDriver.adaptor.create(HiveParser.Identifier, op.getName()));
          }

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
index f526c43..2825045 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -17,13 +17,11 @@
   */
  package org.apache.hadoop.hive.ql.udf.generic;

-import org.apache.commons.lang.ArrayUtils;
  import org.apache.hadoop.hive.ql.exec.Description;
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.parse.SemanticException;
  import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.lazy.LazyString;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -31,7 +29,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
  import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;

  /**
   * This class implements the COUNT aggregation function as in SQL.

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
index 0968008..7b1d6e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.hive.ql.exec.Description;
  import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
-import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
  import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
  import org.apache.hadoop.hive.ql.util.JavaDataModel;
  import org.apache.hadoop.hive.serde2.io.DoubleWritable;
  import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -87,6 +87,17 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      }
    }

+ @Override
+ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
+ throws SemanticException {
+ TypeInfo[] parameters = info.getParameters();
+
+ GenericUDAFSumEvaluator eval = (GenericUDAFSumEvaluator) getEvaluator(parameters);
+ eval.setSumDistinct(info.isDistinct());
+
+ return eval;
+ }
+
    public static PrimitiveObjectInspector.PrimitiveCategory getReturnType(TypeInfo type) {
      if (type.getCategory() != ObjectInspector.Category.PRIMITIVE) {
        return null;
@@ -111,12 +122,54 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
    }

    /**
+ * The base type for sum operator evaluator
+ *
+ */
+ public static abstract class GenericUDAFSumEvaluator<ResultType> extends GenericUDAFEvaluator {
+ static abstract class SumAgg<T> extends AbstractAggregationBuffer {
+ boolean empty;
+ T sum;
+ Object previousValue = null;
+ }
+
+ protected PrimitiveObjectInspector inputOI;
+ protected ObjectInspector outputOI;
+ protected ResultType result;
+ protected boolean sumDistinct;
+
+ public boolean sumDistinct() {
+ return sumDistinct;
+ }
+
+ public void setSumDistinct(boolean sumDistinct) {
+ this.sumDistinct = sumDistinct;
+ }
+
+ /**
+ * Check if the input object is the same as the previous one for the case of
+ * SUM(DISTINCT).
+ * @param input the input object
+ * @return True if sumDistinct is false or the input is different from the previous object
+ */
+ protected boolean checkDistinct(SumAgg agg, Object input) {
+ if (this.sumDistinct &&
+ ObjectInspectorUtils.compare(input, inputOI, agg.previousValue, outputOI) == 0) {
+ return false;
+ }
+
+ agg.previousValue = ObjectInspectorUtils.copyToStandardObject(
+ input, inputOI, ObjectInspectorCopyOption.JAVA);
+ return true;
+ }
+
+
+ }
+
+ /**
     * GenericUDAFSumHiveDecimal.
     *
     */
- public static class GenericUDAFSumHiveDecimal extends GenericUDAFEvaluator {
- private PrimitiveObjectInspector inputOI;
- private HiveDecimalWritable result;
+ public static class GenericUDAFSumHiveDecimal extends GenericUDAFSumEvaluator<HiveDecimalWritable> {

      @Override
      public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
@@ -124,6 +177,8 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        super.init(m, parameters);
        result = new HiveDecimalWritable(HiveDecimal.ZERO);
        inputOI = (PrimitiveObjectInspector) parameters[0];
+ outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ ObjectInspectorCopyOption.JAVA);
        // The output precision is 10 greater than the input which should cover at least
        // 10b rows. The scale is the same as the input.
        DecimalTypeInfo outputTypeInfo = null;
@@ -138,9 +193,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {

      /** class for storing decimal sum value. */
      @AggregationType(estimable = false) // hard to know exactly for decimals
- static class SumHiveDecimalAgg extends AbstractAggregationBuffer {
- boolean empty;
- HiveDecimal sum;
+ static class SumHiveDecimalAgg extends SumAgg<HiveDecimal> {
      }

      @Override
@@ -152,7 +205,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {

      @Override
      public void reset(AggregationBuffer agg) throws HiveException {
- SumHiveDecimalAgg bdAgg = (SumHiveDecimalAgg) agg;
+ SumAgg<HiveDecimal> bdAgg = (SumAgg<HiveDecimal>) agg;
        bdAgg.empty = true;
        bdAgg.sum = HiveDecimal.ZERO;
      }
@@ -163,7 +216,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        assert (parameters.length == 1);
        try {
- merge(agg, parameters[0]);
+ if (checkDistinct((SumAgg) agg, parameters[0])) {
+ merge(agg, parameters[0]);
+ }
        } catch (NumberFormatException e) {
          if (!warned) {
            warned = true;
@@ -239,24 +294,21 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
     * GenericUDAFSumDouble.
     *
     */
- public static class GenericUDAFSumDouble extends GenericUDAFEvaluator {
- private PrimitiveObjectInspector inputOI;
- private DoubleWritable result;
-
+ public static class GenericUDAFSumDouble extends GenericUDAFSumEvaluator<DoubleWritable> {
      @Override
      public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        assert (parameters.length == 1);
        super.init(m, parameters);
        result = new DoubleWritable(0);
        inputOI = (PrimitiveObjectInspector) parameters[0];
+ outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ ObjectInspectorCopyOption.JAVA);
        return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
      }

      /** class for storing double sum value. */
      @AggregationType(estimable = true)
- static class SumDoubleAgg extends AbstractAggregationBuffer {
- boolean empty;
- double sum;
+ static class SumDoubleAgg extends SumAgg<Double> {
        @Override
        public int estimate() { return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2; }
      }
@@ -272,7 +324,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void reset(AggregationBuffer agg) throws HiveException {
        SumDoubleAgg myagg = (SumDoubleAgg) agg;
        myagg.empty = true;
- myagg.sum = 0;
+ myagg.sum = 0.0;
      }

      boolean warned = false;
@@ -281,7 +333,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        assert (parameters.length == 1);
        try {
- merge(agg, parameters[0]);
+ if (checkDistinct((SumAgg) agg, parameters[0])) {
+ merge(agg, parameters[0]);
+ }
        } catch (NumberFormatException e) {
          if (!warned) {
            warned = true;
@@ -354,24 +408,21 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
     * GenericUDAFSumLong.
     *
     */
- public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
- private PrimitiveObjectInspector inputOI;
- protected LongWritable result;
-
+ public static class GenericUDAFSumLong extends GenericUDAFSumEvaluator<LongWritable> {
      @Override
      public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        assert (parameters.length == 1);
        super.init(m, parameters);
        result = new LongWritable(0);
        inputOI = (PrimitiveObjectInspector) parameters[0];
+ outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ ObjectInspectorCopyOption.JAVA);
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
      }

      /** class for storing double sum value. */
      @AggregationType(estimable = true)
- static class SumLongAgg extends AbstractAggregationBuffer {
- boolean empty;
- long sum;
+ static class SumLongAgg extends SumAgg<Long> {
        @Override
        public int estimate() { return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2; }
      }
@@ -387,7 +438,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void reset(AggregationBuffer agg) throws HiveException {
        SumLongAgg myagg = (SumLongAgg) agg;
        myagg.empty = true;
- myagg.sum = 0;
+ myagg.sum = 0L;
      }

      private boolean warned = false;
@@ -396,7 +447,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        assert (parameters.length == 1);
        try {
- merge(agg, parameters[0]);
+ if (checkDistinct((SumAgg) agg, parameters[0])) {
+ merge(agg, parameters[0]);
+ }
        } catch (NumberFormatException e) {
          if (!warned) {
            warned = true;
@@ -460,5 +513,4 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        };
      }
    }
-
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/test/queries/clientpositive/windowing_distinct.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q b/ql/src/test/queries/clientpositive/windowing_distinct.q
index 94f4044..9f6ddfd 100644
--- a/ql/src/test/queries/clientpositive/windowing_distinct.q
+++ b/ql/src/test/queries/clientpositive/windowing_distinct.q
@@ -28,3 +28,11 @@ SELECT COUNT(DISTINCT t) OVER (PARTITION BY index),
         COUNT(DISTINCT dec) OVER (PARTITION BY index),
         COUNT(DISTINCT bin) OVER (PARTITION BY index)
  FROM windowing_distinct;
+
+SELECT SUM(DISTINCT t) OVER (PARTITION BY index),
+ SUM(DISTINCT d) OVER (PARTITION BY index),
+ SUM(DISTINCT s) OVER (PARTITION BY index),
+ SUM(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+ SUM(DISTINCT ts) OVER (PARTITION BY index),
+ SUM(DISTINCT dec) OVER (PARTITION BY index)
+FROM windowing_distinct;

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out b/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
index 28d65d7..346bca1 100644
--- a/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
+++ b/ql/src/test/results/clientnegative/invalid_sum_syntax.q.out
@@ -1 +1 @@
-FAILED: SemanticException The specified syntax for UDAF invocation is invalid.
+FAILED: UDFArgumentTypeException Exactly one argument is expected.

http://git-wip-us.apache.org/repos/asf/hive/blob/11f1e47e/ql/src/test/results/clientpositive/windowing_distinct.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out b/ql/src/test/results/clientpositive/windowing_distinct.q.out
index 50f8ff8..0858f0f 100644
--- a/ql/src/test/results/clientpositive/windowing_distinct.q.out
+++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out
@@ -76,3 +76,29 @@ POSTHOOK: Input: default@windowing_distinct
  2 2 2 2 2 2 2 2
  2 2 2 2 2 2 2 2
  2 2 2 2 2 2 2 2
+PREHOOK: query: SELECT SUM(DISTINCT t) OVER (PARTITION BY index),
+ SUM(DISTINCT d) OVER (PARTITION BY index),
+ SUM(DISTINCT s) OVER (PARTITION BY index),
+ SUM(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+ SUM(DISTINCT ts) OVER (PARTITION BY index),
+ SUM(DISTINCT dec) OVER (PARTITION BY index)
+FROM windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT SUM(DISTINCT t) OVER (PARTITION BY index),
+ SUM(DISTINCT d) OVER (PARTITION BY index),
+ SUM(DISTINCT s) OVER (PARTITION BY index),
+ SUM(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+ SUM(DISTINCT ts) OVER (PARTITION BY index),
+ SUM(DISTINCT dec) OVER (PARTITION BY index)
+FROM windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+54 56.63 0.0 0.0 2.724315837406296E9 57
+54 56.63 0.0 0.0 2.724315837406296E9 57
+54 56.63 0.0 0.0 2.724315837406296E9 57
+235 77.42 0.0 0.0 2.724315837406612E9 69
+235 77.42 0.0 0.0 2.724315837406612E9 69
+235 77.42 0.0 0.0 2.724315837406612E9 69

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedFeb 1, '16 at 3:26p
activeFeb 1, '16 at 3:26p
posts1
users1
websitehive.apache.org

1 user in discussion

Aihuaxu: 1 post

People

Translate

site design / logo © 2021 Grokbase