Grokbase Groups Hive commits May 2016
FAQ
Repository: hive
Updated Branches:
   refs/heads/master e1e68b29a -> 15bdce43d


HIVE-13453: Support ORDER BY and windowing clause in partitioning clause with distinct function (Reviewed by Yongzhi Chen)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/15bdce43
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/15bdce43
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/15bdce43

Branch: refs/heads/master
Commit: 15bdce43db4624a63be1f648e46d1f2baa1c67de
Parents: e1e68b2
Author: Aihua Xu <aihuaxu@apache.org>
Authored: Fri May 6 11:00:20 2016 -0400
Committer: Aihua Xu <aihuaxu@apache.org>
Committed: Sat May 28 20:36:59 2016 -0400

----------------------------------------------------------------------
  .../hadoop/hive/ql/exec/FunctionRegistry.java | 2 +-
  .../apache/hadoop/hive/ql/exec/Registry.java | 8 +-
  .../hadoop/hive/ql/parse/WindowingSpec.java | 14 --
  .../hive/ql/plan/ptf/WindowFunctionDef.java | 2 +-
  .../hive/ql/udf/generic/GenericUDAFAverage.java | 68 ++++++++--
  .../hive/ql/udf/generic/GenericUDAFCount.java | 57 +++++---
  .../udf/generic/GenericUDAFParameterInfo.java | 7 +
  .../hive/ql/udf/generic/GenericUDAFSum.java | 134 +++++++++++++------
  .../generic/SimpleGenericUDAFParameterInfo.java | 9 +-
  .../hive/ql/udf/ptf/WindowingTableFunction.java | 9 +-
  .../queries/clientpositive/windowing_distinct.q | 18 +++
  .../clientpositive/windowing_distinct.q.out | 66 +++++++++
  .../objectinspector/ObjectInspectorUtils.java | 38 ++++++
  13 files changed, 333 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index fa90242..8217ad3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -902,7 +902,7 @@ public final class FunctionRegistry {

      GenericUDAFParameterInfo paramInfo =
          new SimpleGenericUDAFParameterInfo(
- args, isDistinct, isAllColumns);
+ args, false, isDistinct, isAllColumns);

      GenericUDAFEvaluator udafEvaluator;
      if (udafResolver instanceof GenericUDAFResolver2) {

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index 891514b..86df74d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -395,7 +395,7 @@ public class Registry {
     */
    @SuppressWarnings("deprecation")
    public GenericUDAFEvaluator getGenericUDAFEvaluator(String name,
- List<ObjectInspector> argumentOIs, boolean isDistinct,
+ List<ObjectInspector> argumentOIs, boolean isWindowing, boolean isDistinct,
        boolean isAllColumns) throws SemanticException {

      GenericUDAFResolver udafResolver = getGenericUDAFResolver(name);
@@ -413,7 +413,7 @@ public class Registry {

      GenericUDAFParameterInfo paramInfo =
          new SimpleGenericUDAFParameterInfo(
- args, isDistinct, isAllColumns);
+ args, isWindowing, isDistinct, isAllColumns);
      if (udafResolver instanceof GenericUDAFResolver2) {
        udafEvaluator =
            ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
@@ -433,14 +433,14 @@ public class Registry {
      }
      if (!functionName.equals(FunctionRegistry.LEAD_FUNC_NAME) &&
          !functionName.equals(FunctionRegistry.LAG_FUNC_NAME)) {
- return getGenericUDAFEvaluator(functionName, argumentOIs, isDistinct, isAllColumns);
+ return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns);
      }

      // this must be lead/lag UDAF
      ObjectInspector args[] = new ObjectInspector[argumentOIs.size()];
      GenericUDAFResolver udafResolver = info.getGenericUDAFResolver();
      GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo(
- argumentOIs.toArray(args), isDistinct, isAllColumns);
+ argumentOIs.toArray(args), true, isDistinct, isAllColumns);
      return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
index 5ce7200..ef5186a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
@@ -124,9 +124,6 @@ public class WindowingSpec {
        WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
        WindowSpec wdwSpec = wFn.getWindowSpec();

- // 0. Precheck supported syntax
- precheckSyntax(wFn, wdwSpec);
-
        // 1. For Wdw Specs that refer to Window Defns, inherit missing components
        if ( wdwSpec != null ) {
          ArrayList<String> sources = new ArrayList<String>();
@@ -153,14 +150,6 @@ public class WindowingSpec {
      }
    }

- private void precheckSyntax(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException {
- if (wdwSpec != null ) {
- if (wFn.isDistinct && (wdwSpec.windowFrame != null || wdwSpec.getOrder() != null) ) {
- throw new SemanticException("Function with DISTINCT cannot work with partition ORDER BY or windowing clause.");
- }
- }
- }
-
    private void fillInWindowSpec(String sourceId, WindowSpec dest, ArrayList<String> visited)
        throws SemanticException
    {
@@ -509,9 +498,6 @@ public class WindowingSpec {
        if ( getOrder() == null ) {
          OrderSpec order = new OrderSpec();
          order.prefixBy(getPartition());
- if (wFn.isDistinct) {
- order.addExpressions(wFn.getArgs());
- }
          setOrder(order);
        }
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
index ed6c671..84ac614 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
@@ -124,4 +124,4 @@ public class WindowFunctionDef extends WindowExpressionDef {
      this.pivotResult = pivotResult;
    }

-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
index 3c1ce26..6799978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -18,6 +18,7 @@
  package org.apache.hadoop.hive.ql.udf.generic;

  import java.util.ArrayList;
+import java.util.HashSet;

  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.StructField;
  import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
@@ -106,6 +108,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
      AbstractGenericUDAFAverageEvaluator eval =
          (AbstractGenericUDAFAverageEvaluator) getEvaluator(paramInfo.getParameters());
      eval.avgDistinct = paramInfo.isDistinct();
+ eval.isWindowing = paramInfo.isWindowing();
      return eval;
    }

@@ -115,7 +118,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
      public void doReset(AverageAggregationBuffer<Double> aggregation) throws HiveException {
        aggregation.count = 0;
        aggregation.sum = new Double(0);
- aggregation.previousValue = null;
+ aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>();
      }

      @Override
@@ -145,6 +148,12 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
      }

      @Override
+ protected void doMergeAdd(Double sum,
+ ObjectInspectorObject obj) {
+ sum += PrimitiveObjectInspectorUtils.getDouble(obj.getValues()[0], copiedOI);
+ }
+
+ @Override
      protected void doTerminatePartial(AverageAggregationBuffer<Double> aggregation) {
        if(partialResult[1] == null) {
          partialResult[1] = new DoubleWritable(0);
@@ -172,6 +181,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {

      @Override
      public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (isWindowingDistinct()) {
+ return null;
+ }

        return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>(this, wFrameDef) {

@@ -212,6 +225,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
      public void doReset(AverageAggregationBuffer<HiveDecimal> aggregation) throws HiveException {
        aggregation.count = 0;
        aggregation.sum = HiveDecimal.ZERO;
+ aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>();
      }

      @Override
@@ -263,6 +277,14 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
        }
      }

+
+ @Override
+ protected void doMergeAdd(
+ HiveDecimal sum,
+ ObjectInspectorObject obj) {
+ sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(obj.getValues()[0], copiedOI));
+ }
+
      @Override
      protected void doTerminatePartial(AverageAggregationBuffer<HiveDecimal> aggregation) {
        if(partialResult[1] == null && aggregation.sum != null) {
@@ -296,6 +318,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {

      @Override
      public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (isWindowingDistinct()) {
+ return null;
+ }

        return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>(
            this, wFrameDef) {
@@ -333,18 +359,18 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
    }

    private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer {
- private Object previousValue;
+ private HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows.
      private long count;
      private TYPE sum;
    };

    @SuppressWarnings("unchecked")
    public static abstract class AbstractGenericUDAFAverageEvaluator<TYPE> extends GenericUDAFEvaluator {
+ protected boolean isWindowing;
      protected boolean avgDistinct;
-
      // For PARTIAL1 and COMPLETE
      protected transient PrimitiveObjectInspector inputOI;
- protected transient ObjectInspector copiedOI;
+ protected transient PrimitiveObjectInspector copiedOI;
      // For PARTIAL2 and FINAL
      private transient StructObjectInspector soi;
      private transient StructField countField;
@@ -363,6 +389,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
          PrimitiveObjectInspector inputOI, Object parameter);
      protected abstract void doMerge(AverageAggregationBuffer<TYPE> aggregation, Long partialCount,
          ObjectInspector sumFieldOI, Object partialSum);
+ protected abstract void doMergeAdd(TYPE sum, ObjectInspectorObject obj);
      protected abstract void doTerminatePartial(AverageAggregationBuffer<TYPE> aggregation);
      protected abstract Object doTerminate(AverageAggregationBuffer<TYPE> aggregation);
      protected abstract void doReset(AverageAggregationBuffer<TYPE> aggregation) throws HiveException;
@@ -376,7 +403,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
        // init input
        if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
          inputOI = (PrimitiveObjectInspector) parameters[0];
- copiedOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ copiedOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
              ObjectInspectorCopyOption.JAVA);
        } else {
          soi = (StructObjectInspector) parameters[0];
@@ -410,6 +437,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
        }
      }

+ protected boolean isWindowingDistinct() {
+ return isWindowing && avgDistinct;
+ }
+
      @AggregationType(estimable = true)
      static class AverageAgg extends AbstractAggregationBuffer {
        long count;
@@ -432,12 +463,15 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
          AverageAggregationBuffer<TYPE> averageAggregation = (AverageAggregationBuffer<TYPE>) aggregation;
          try {
            // Skip the same value if avgDistinct is true
- if (this.avgDistinct &&
- ObjectInspectorUtils.compare(parameter, inputOI, averageAggregation.previousValue, copiedOI) == 0) {
- return;
+ if (isWindowingDistinct()) {
+ ObjectInspectorObject obj = new ObjectInspectorObject(
+ ObjectInspectorUtils.copyToStandardObject(parameter, inputOI, ObjectInspectorCopyOption.JAVA),
+ copiedOI);
+ if (averageAggregation.uniqueObjects.contains(obj)) {
+ return;
+ }
+ averageAggregation.uniqueObjects.add(obj);
            }
- averageAggregation.previousValue = ObjectInspectorUtils.copyToStandardObject(
- parameter, inputOI, ObjectInspectorCopyOption.JAVA);

            doIterate(averageAggregation, inputOI, parameter);
          } catch (NumberFormatException e) {
@@ -451,6 +485,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {

      @Override
      public Object terminatePartial(AggregationBuffer aggregation) throws HiveException {
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ }
+
        doTerminatePartial((AverageAggregationBuffer<TYPE>) aggregation);
        return partialResult;
      }
@@ -459,9 +497,13 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
      public void merge(AggregationBuffer aggregation, Object partial)
          throws HiveException {
        if (partial != null) {
- doMerge((AverageAggregationBuffer<TYPE>)aggregation,
- countFieldOI.get(soi.getStructFieldData(partial, countField)),
- sumFieldOI, soi.getStructFieldData(partial, sumField));
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ doMerge((AverageAggregationBuffer<TYPE>)aggregation,
+ countFieldOI.get(soi.getStructFieldData(partial, countField)),
+ sumFieldOI, soi.getStructFieldData(partial, sumField));
+ }
        }
      }


http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
index 2825045..d1d0131 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -17,6 +17,8 @@
   */
  package org.apache.hadoop.hive.ql.udf.generic;

+import java.util.HashSet;
+
  import org.apache.hadoop.hive.ql.exec.Description;
  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -25,6 +27,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -70,6 +73,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
      }

      GenericUDAFCountEvaluator countEvaluator = new GenericUDAFCountEvaluator();
+ countEvaluator.setWindowing(paramInfo.isWindowing());
      countEvaluator.setCountAllColumns(paramInfo.isAllColumns());
      countEvaluator.setCountDistinct(paramInfo.isDistinct());

@@ -81,6 +85,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
     *
     */
    public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
+ private boolean isWindowing = false;
      private boolean countAllColumns = false;
      private boolean countDistinct = false;
      private LongObjectInspector partialCountAggOI;
@@ -99,9 +104,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
              ObjectInspectorCopyOption.JAVA);
        }
        result = new LongWritable(0);
+
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
      }

+ public void setWindowing(boolean isWindowing) {
+ this.isWindowing = isWindowing;
+ }
+
      private void setCountAllColumns(boolean countAllCols) {
        countAllColumns = countAllCols;
      }
@@ -110,10 +120,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
        this.countDistinct = countDistinct;
      }

+ private boolean isWindowingDistinct() {
+ return isWindowing && countDistinct;
+ }
+
      /** class for storing count value. */
      @AggregationType(estimable = true)
      static class CountAgg extends AbstractAggregationBuffer {
- Object[] prevColumns = null; // Column values from previous row. Used to compare with current row for the case of COUNT(DISTINCT).
+ HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows
        long value;
        @Override
        public int estimate() { return JavaDataModel.PRIMITIVES2; }
@@ -128,8 +142,8 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {

      @Override
      public void reset(AggregationBuffer agg) throws HiveException {
- ((CountAgg) agg).prevColumns = null;
        ((CountAgg) agg).value = 0;
+ ((CountAgg) agg).uniqueObjects = new HashSet<ObjectInspectorObject>();
      }

      @Override
@@ -151,19 +165,16 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
            }
          }

- // Skip the counting if the values are the same for COUNT(DISTINCT) case
- if (countThisRow && countDistinct) {
- Object[] prevColumns = ((CountAgg) agg).prevColumns;
- if (prevColumns == null) {
- ((CountAgg) agg).prevColumns = new Object[parameters.length];
- } else if (ObjectInspectorUtils.compare(parameters, inputOI, prevColumns, outputOI) == 0) {
- countThisRow = false;
- }
-
- // We need to keep a copy of values from previous row.
- if (countThisRow) {
- ((CountAgg) agg).prevColumns = ObjectInspectorUtils.copyToStandardObject(
- parameters, inputOI, ObjectInspectorCopyOption.JAVA);
+ // Skip the counting if the values are the same for windowing COUNT(DISTINCT) case
+ if (countThisRow && isWindowingDistinct()) {
+ HashSet<ObjectInspectorObject> uniqueObjs = ((CountAgg) agg).uniqueObjects;
+ ObjectInspectorObject obj = new ObjectInspectorObject(
+ ObjectInspectorUtils.copyToStandardObject(parameters, inputOI, ObjectInspectorCopyOption.JAVA),
+ outputOI);
+ if (!uniqueObjs.contains(obj)) {
+ uniqueObjs.add(obj);
+ } else {
+ countThisRow = false;
            }
          }

@@ -177,8 +188,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
      public void merge(AggregationBuffer agg, Object partial)
        throws HiveException {
        if (partial != null) {
- long p = partialCountAggOI.get(partial);
- ((CountAgg) agg).value += p;
+ CountAgg countAgg = (CountAgg) agg;
+
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ long p = partialCountAggOI.get(partial);
+ countAgg.value += p;
+ }
        }
      }

@@ -190,7 +207,11 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {

      @Override
      public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ return terminate(agg);
+ }
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
index 6a62d7c..675d9f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
@@ -67,6 +67,13 @@ public interface GenericUDAFParameterInfo {
    boolean isDistinct();

    /**
+ * The flag to indicate if the UDAF invocation was from the windowing function
+ * call or not.
+ * @return <tt>true</tt> if the UDAF invocation was from the windowing function
+ * call.
+ */
+ boolean isWindowing();
+ /**
     * Returns <tt>true</tt> if the UDAF invocation was done via the wildcard
     * syntax <tt>FUNCTION(*)</tt>. Note that this is provided for informational
     * purposes only and the function implementation is not expected to ensure

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
index 7b1d6e5..f53554c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -17,6 +17,8 @@
   */
  package org.apache.hadoop.hive.ql.udf.generic;

+import java.util.HashSet;
+
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -39,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
  import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.util.StringUtils;

  /**
@@ -93,6 +97,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      TypeInfo[] parameters = info.getParameters();

      GenericUDAFSumEvaluator eval = (GenericUDAFSumEvaluator) getEvaluator(parameters);
+ eval.setWindowing(info.isWindowing());
      eval.setSumDistinct(info.isDistinct());

      return eval;
@@ -125,44 +130,69 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
     * The base type for sum operator evaluator
     *
     */
- public static abstract class GenericUDAFSumEvaluator<ResultType> extends GenericUDAFEvaluator {
+ public static abstract class GenericUDAFSumEvaluator<ResultType extends Writable> extends GenericUDAFEvaluator {
      static abstract class SumAgg<T> extends AbstractAggregationBuffer {
        boolean empty;
        T sum;
- Object previousValue = null;
+ HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows.
      }

      protected PrimitiveObjectInspector inputOI;
- protected ObjectInspector outputOI;
+ protected PrimitiveObjectInspector outputOI;
      protected ResultType result;
+ protected boolean isWindowing;
      protected boolean sumDistinct;

- public boolean sumDistinct() {
- return sumDistinct;
+ public void setWindowing(boolean isWindowing) {
+ this.isWindowing = isWindowing;
      }

      public void setSumDistinct(boolean sumDistinct) {
        this.sumDistinct = sumDistinct;
      }

+ protected boolean isWindowingDistinct() {
+ return isWindowing && sumDistinct;
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ return terminate(agg);
+ }
+ }
+
      /**
- * Check if the input object is the same as the previous one for the case of
- * SUM(DISTINCT).
+ * Check if the input object is eligible to contribute to the sum. If it's null
+ * or the same value as the previous one for the case of SUM(DISTINCT). Then
+ * skip it.
       * @param input the input object
- * @return True if sumDistinct is false or the input is different from the previous object
+ * @return True if sumDistinct is false or the non-null input is different from the previous object
       */
- protected boolean checkDistinct(SumAgg agg, Object input) {
- if (this.sumDistinct &&
- ObjectInspectorUtils.compare(input, inputOI, agg.previousValue, outputOI) == 0) {
+ protected boolean isEligibleValue(SumAgg agg, Object input) {
+ if (input == null) {
          return false;
        }

- agg.previousValue = ObjectInspectorUtils.copyToStandardObject(
- input, inputOI, ObjectInspectorCopyOption.JAVA);
- return true;
- }
+ if (isWindowingDistinct()) {
+ HashSet<ObjectInspectorObject> uniqueObjs = agg.uniqueObjects;
+ ObjectInspectorObject obj = input instanceof ObjectInspectorObject ?
+ (ObjectInspectorObject)input :
+ new ObjectInspectorObject(
+ ObjectInspectorUtils.copyToStandardObject(input, inputOI, ObjectInspectorCopyOption.JAVA),
+ outputOI);
+ if (!uniqueObjs.contains(obj)) {
+ uniqueObjs.add(obj);
+ return true;
+ }

+ return false;
+ }

+ return true;
+ }
    }

    /**
@@ -177,7 +207,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        super.init(m, parameters);
        result = new HiveDecimalWritable(HiveDecimal.ZERO);
        inputOI = (PrimitiveObjectInspector) parameters[0];
- outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ outputOI = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(inputOI,
            ObjectInspectorCopyOption.JAVA);
        // The output precision is 10 greater than the input which should cover at least
        // 10b rows. The scale is the same as the input.
@@ -208,6 +238,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        SumAgg<HiveDecimal> bdAgg = (SumAgg<HiveDecimal>) agg;
        bdAgg.empty = true;
        bdAgg.sum = HiveDecimal.ZERO;
+ bdAgg.uniqueObjects = new HashSet<ObjectInspectorObject>();
      }

      boolean warned = false;
@@ -216,8 +247,10 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        assert (parameters.length == 1);
        try {
- if (checkDistinct((SumAgg) agg, parameters[0])) {
- merge(agg, parameters[0]);
+ if (isEligibleValue((SumHiveDecimalAgg) agg, parameters[0])) {
+ ((SumHiveDecimalAgg)agg).empty = false;
+ ((SumHiveDecimalAgg)agg).sum = ((SumHiveDecimalAgg)agg).sum.add(
+ PrimitiveObjectInspectorUtils.getHiveDecimal(parameters[0], inputOI));
          }
        } catch (NumberFormatException e) {
          if (!warned) {
@@ -232,11 +265,6 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      }

      @Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
-
- @Override
      public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        if (partial != null) {
          SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) agg;
@@ -245,7 +273,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
          }

          myagg.empty = false;
- myagg.sum = myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI));
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ myagg.sum = myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI));
+ }
        }
      }

@@ -261,6 +293,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {

      @Override
      public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (sumDistinct) {
+ return null;
+ }
+
        return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>(
            this, wFrameDef) {

@@ -301,7 +338,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        super.init(m, parameters);
        result = new DoubleWritable(0);
        inputOI = (PrimitiveObjectInspector) parameters[0];
- outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ outputOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
            ObjectInspectorCopyOption.JAVA);
        return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
      }
@@ -325,6 +362,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        SumDoubleAgg myagg = (SumDoubleAgg) agg;
        myagg.empty = true;
        myagg.sum = 0.0;
+ myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
      }

      boolean warned = false;
@@ -333,8 +371,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        assert (parameters.length == 1);
        try {
- if (checkDistinct((SumAgg) agg, parameters[0])) {
- merge(agg, parameters[0]);
+ if (isEligibleValue((SumDoubleAgg) agg, parameters[0])) {
+ ((SumDoubleAgg)agg).empty = false;
+ ((SumDoubleAgg)agg).sum += PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI);
          }
        } catch (NumberFormatException e) {
          if (!warned) {
@@ -349,16 +388,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      }

      @Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
-
- @Override
      public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        if (partial != null) {
          SumDoubleAgg myagg = (SumDoubleAgg) agg;
          myagg.empty = false;
- myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
+ }
        }
      }

@@ -374,6 +412,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {

      @Override
      public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (sumDistinct) {
+ return null;
+ }
+
        return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>(this,
            wFrameDef) {

@@ -415,7 +458,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        super.init(m, parameters);
        result = new LongWritable(0);
        inputOI = (PrimitiveObjectInspector) parameters[0];
- outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+ outputOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
            ObjectInspectorCopyOption.JAVA);
        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
      }
@@ -439,6 +482,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
        SumLongAgg myagg = (SumLongAgg) agg;
        myagg.empty = true;
        myagg.sum = 0L;
+ myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
      }

      private boolean warned = false;
@@ -447,8 +491,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        assert (parameters.length == 1);
        try {
- if (checkDistinct((SumAgg) agg, parameters[0])) {
- merge(agg, parameters[0]);
+ if (isEligibleValue((SumLongAgg) agg, parameters[0])) {
+ ((SumLongAgg)agg).empty = false;
+ ((SumLongAgg)agg).sum += PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI);
          }
        } catch (NumberFormatException e) {
          if (!warned) {
@@ -460,16 +505,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
      }

      @Override
- public Object terminatePartial(AggregationBuffer agg) throws HiveException {
- return terminate(agg);
- }
-
- @Override
      public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        if (partial != null) {
          SumLongAgg myagg = (SumLongAgg) agg;
- myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
          myagg.empty = false;
+ if (isWindowingDistinct()) {
+ throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
+ } else {
+ myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
+ }
        }
      }

@@ -485,6 +529,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {

      @Override
      public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
+ // Don't use streaming for distinct cases
+ if (isWindowingDistinct()) {
+ return null;
+ }
+
        return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>(this,
            wFrameDef) {

@@ -509,7 +558,6 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver {
            SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
            return myagg.empty ? null : new Long(myagg.sum);
          }
-
        };
      }
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
index 1a1b570..728964d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
@@ -29,12 +29,14 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo
  {

    private final ObjectInspector[] parameters;
+ private final boolean isWindowing;
    private final boolean distinct;
    private final boolean allColumns;

- public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean distinct,
+ public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct,
        boolean allColumns) {
      this.parameters = params;
+ this.isWindowing = isWindowing;
      this.distinct = distinct;
      this.allColumns = allColumns;
    }
@@ -63,4 +65,9 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo
    public boolean isAllColumns() {
      return allColumns;
    }
+
+ @Override
+ public boolean isWindowing() {
+ return isWindowing;
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 858b47a..b89c14e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef;
  import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer;
  import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
  import org.apache.hadoop.hive.serde2.SerDe;
  import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -392,8 +393,6 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
      }

      streamingState.rollingPart.append(row);
- row = streamingState.rollingPart
- .getAt(streamingState.rollingPart.size() - 1);

      WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();

@@ -408,7 +407,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
          }
        }

- if (fnEval instanceof ISupportStreamingModeForWindowing) {
+ if (fnEval != null &&
+ fnEval instanceof ISupportStreamingModeForWindowing) {
          fnEval.aggregate(streamingState.aggBuffers[i], streamingState.funcArgs[i]);
          Object out = ((ISupportStreamingModeForWindowing) fnEval)
              .getNextResult(streamingState.aggBuffers[i]);
@@ -472,7 +472,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
        GenericUDAFEvaluator fnEval = wFn.getWFnEval();

        int numRowsRemaining = wFn.getWindowFrame().getEnd().getRelativeOffset();
- if (fnEval instanceof ISupportStreamingModeForWindowing) {
+ if (fnEval != null &&
+ fnEval instanceof ISupportStreamingModeForWindowing) {
          fnEval.terminate(streamingState.aggBuffers[i]);

          WindowingFunctionInfoHelper wFnInfo = getWindowingFunctionInfoHelper(wFn.getName());

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/queries/clientpositive/windowing_distinct.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q b/ql/src/test/queries/clientpositive/windowing_distinct.q
index bb192a7..6b49978 100644
--- a/ql/src/test/queries/clientpositive/windowing_distinct.q
+++ b/ql/src/test/queries/clientpositive/windowing_distinct.q
@@ -44,3 +44,21 @@ SELECT AVG(DISTINCT t) OVER (PARTITION BY index),
         AVG(DISTINCT ts) OVER (PARTITION BY index),
         AVG(DISTINCT dec) OVER (PARTITION BY index)
  FROM windowing_distinct;
+
+-- count
+select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct;
+
+-- sum
+select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct;
+
+-- avg
+select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct;

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/results/clientpositive/windowing_distinct.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out b/ql/src/test/results/clientpositive/windowing_distinct.q.out
index 074a594..86d1cdd 100644
--- a/ql/src/test/results/clientpositive/windowing_distinct.q.out
+++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out
@@ -128,3 +128,69 @@ POSTHOOK: Input: default@windowing_distinct
  117.5 38.71 NULL NULL 1.362157918703306E9 34.5000
  117.5 38.71 NULL NULL 1.362157918703306E9 34.5000
  117.5 38.71 NULL NULL 1.362157918703306E9 34.5000
+PREHOOK: query: -- count
+select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- count
+select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ count(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1 26.43 0 0 2 1
+1 26.43 1 1 1 2
+1 96.91 1 1 0 2
+2 13.01 0 0 1 2
+2 74.72 1 1 1 2
+2 74.72 2 2 0 2
+PREHOOK: query: -- sum
+select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- sum
+select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ sum(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1 26.43 NULL NULL 123.34000396728516 26.43000030517578
+1 26.43 26.43000030517578 26.43000030517578 96.91000366210938 123.34000396728516
+1 96.91 26.43000030517578 26.43000030517578 NULL 123.34000396728516
+2 13.01 NULL NULL 74.72000122070312 87.73000144958496
+2 74.72 13.010000228881836 13.010000228881836 74.72000122070312 87.73000144958496
+2 74.72 87.73000144958496 87.73000144958496 NULL 87.73000144958496
+PREHOOK: query: -- avg
+select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- avg
+select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding),
+ avg(distinct f) over (partition by index order by f rows between 1 following and 2 following),
+ avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1 26.43 NULL NULL 61.67000198364258 26.43000030517578
+1 26.43 26.43000030517578 26.43000030517578 96.91000366210938 61.67000198364258
+1 96.91 26.43000030517578 26.43000030517578 NULL 61.67000198364258
+2 13.01 NULL NULL 74.72000122070312 43.86500072479248
+2 74.72 13.010000228881836 13.010000228881836 74.72000122070312 43.86500072479248
+2 74.72 43.86500072479248 43.86500072479248 NULL 43.86500072479248

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index c58e8ed..1ac72c6 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -117,6 +117,44 @@ public final class ObjectInspectorUtils {
    }

    /**
+ * This class can be used to wrap Hive objects and put in HashMap or HashSet.
+ * The objects will be compared using ObjectInspectors.
+ *
+ */
+ public static class ObjectInspectorObject {
+ private final Object[] objects;
+ private final ObjectInspector[] oi;
+
+ public ObjectInspectorObject(Object object, ObjectInspector oi) {
+ this.objects = new Object[] { object };
+ this.oi = new ObjectInspector[] { oi };
+ }
+
+ public ObjectInspectorObject(Object[] objects, ObjectInspector[] oi) {
+ this.objects = objects;
+ this.oi = oi;
+ }
+
+ public Object[] getValues() {
+ return objects;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || obj.getClass() != this.getClass()) { return false; }
+
+ ObjectInspectorObject comparedObject = (ObjectInspectorObject)obj;
+ return ObjectInspectorUtils.compare(objects, oi, comparedObject.objects, comparedObject.oi) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return ObjectInspectorUtils.getBucketHashCode(objects, oi);
+ }
+ }
+
+ /**
     * Calculates the hash code for array of Objects that contains writables. This is used
     * to work around the buggy Hadoop DoubleWritable hashCode implementation. This should
     * only be used for process-local hash codes; don't replace stored hash codes like bucketing.

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 29, '16 at 12:37a
activeMay 29, '16 at 12:37a
posts1
users1
websitehive.apache.org

1 user in discussion

Aihuaxu: 1 post

People

Translate

site design / logo © 2021 Grokbase