FAQ
Author: hashutosh
Date: Sat Jun 7 16:07:34 2014
New Revision: 1601146

URL: http://svn.apache.org/r1601146
Log:
HIVE-7143 : Add Streaming support in Windowing mode for more UDAFs (min/max, lead/lag, fval/lval) (Harish Butani via Ashutosh Chauhan)

Added:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java
Removed:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEnhancer.java
Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java Sat Jun 7 16:07:34 2014
@@ -28,10 +28,8 @@ import org.apache.hadoop.hive.ql.metadat
  import org.apache.hadoop.hive.ql.parse.SemanticException;
  import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
  import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
-import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef;
  import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
  import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum.GenericUDAFSumDouble.SumDoubleAgg;
  import org.apache.hadoop.hive.ql.util.JavaDataModel;
  import org.apache.hadoop.hive.serde2.io.DoubleWritable;
  import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -164,26 +162,12 @@ public class GenericUDAFAverage extends
        BoundaryDef start = wFrmDef.getStart();
        BoundaryDef end = wFrmDef.getEnd();

- /*
- * Currently we are not handling dynamic sized windows implied by range based windows.
- */
- if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
- return null;
- }
-
- /*
- * Windows that are unbounded following don't benefit from Streaming.
- */
- if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
- return null;
- }
-
- return new GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>(this,
+ return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>(this,
            start.getAmt(), end.getAmt()) {

          @Override
          protected DoubleWritable getNextResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>.SumAvgStreamingState ss)
              throws HiveException {
            AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf;
            Double r = myagg.count == 0 ? null : myagg.sum;
@@ -201,7 +185,7 @@ public class GenericUDAFAverage extends

          @Override
          protected Object[] getCurrentIntermediateResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Object[]>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>.SumAvgStreamingState ss)
              throws HiveException {
            AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf;
            return myagg.count == 0 ? null : new Object[] {
@@ -306,20 +290,12 @@ public class GenericUDAFAverage extends
        BoundaryDef start = wFrmDef.getStart();
        BoundaryDef end = wFrmDef.getEnd();

- if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
- return null;
- }
-
- if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
- return null;
- }
-
- return new GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>(
+ return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>(
            this, start.getAmt(), end.getAmt()) {

          @Override
          protected HiveDecimalWritable getNextResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>.SumAvgStreamingState ss)
              throws HiveException {
            AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
            HiveDecimal r = myagg.count == 0 ? null : myagg.sum;
@@ -338,7 +314,7 @@ public class GenericUDAFAverage extends

          @Override
          protected Object[] getCurrentIntermediateResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, Object[]>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>.SumAvgStreamingState ss)
              throws HiveException {
            AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
            return myagg.count == 0 ? null : new Object[] { myagg.sum,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLag.java Sat Jun 7 16:07:34 2014
@@ -25,6 +25,12 @@ import org.apache.commons.logging.LogFac
  import org.apache.hadoop.hive.ql.exec.Description;
  import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead.GenericUDAFLeadEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead.GenericUDAFLeadEvaluatorStreaming;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead.LeadBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLeadLag.GenericUDAFLeadLagEvaluator;

  @WindowFunctionDescription
  (
@@ -53,10 +59,27 @@ public class GenericUDAFLag extends Gene

    public static class GenericUDAFLagEvaluator extends GenericUDAFLeadLagEvaluator {

+ public GenericUDAFLagEvaluator() {
+ }
+
+ /*
+ * used to initialize Streaming Evaluator.
+ */
+ protected GenericUDAFLagEvaluator(GenericUDAFLeadLagEvaluator src) {
+ super(src);
+ }
+
      @Override
      protected LeadLagBuffer getNewLLBuffer() throws HiveException {
       return new LagBuffer();
      }
+
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ return new GenericUDAFLagEvaluatorStreaming(this);
+ }
+
    }

    static class LagBuffer implements LeadLagBuffer {
@@ -88,6 +111,7 @@ public class GenericUDAFLag extends Gene
         * the entire partition is in lagValues.
         */
        if ( values.size() < lagAmt ) {
+ values = lagValues;
          return lagValues;
        }

@@ -99,4 +123,42 @@ public class GenericUDAFLag extends Gene
        return values;
      }
    }
+
+ /*
+ * StreamingEval: wrap regular eval. on getNext remove first row from values
+ * and return it.
+ */
+ static class GenericUDAFLagEvaluatorStreaming extends GenericUDAFLagEvaluator
+ implements ISupportStreamingModeForWindowing {
+
+ protected GenericUDAFLagEvaluatorStreaming(GenericUDAFLeadLagEvaluator src) {
+ super(src);
+ }
+
+ @Override
+ public Object getNextResult(AggregationBuffer agg) throws HiveException {
+ LagBuffer lb = (LagBuffer) agg;
+
+ if (!lb.lagValues.isEmpty()) {
+ Object res = lb.lagValues.remove(0);
+ if (res == null) {
+ return ISupportStreamingModeForWindowing.NULL_RESULT;
+ }
+ return res;
+ } else if (!lb.values.isEmpty()) {
+ Object res = lb.values.remove(0);
+ if (res == null) {
+ return ISupportStreamingModeForWindowing.NULL_RESULT;
+ }
+ return res;
+ }
+ return null;
+ }
+
+ @Override
+ public int getRowsRemainingAfterTerminate() throws HiveException {
+ return getAmt();
+ }
+ }
+
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java Sat Jun 7 16:07:34 2014
@@ -25,6 +25,8 @@ import org.apache.commons.logging.LogFac
  import org.apache.hadoop.hive.ql.exec.Description;
  import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;

  @WindowFunctionDescription
  (
@@ -53,10 +55,26 @@ public class GenericUDAFLead extends Gen

    public static class GenericUDAFLeadEvaluator extends GenericUDAFLeadLagEvaluator {

+ public GenericUDAFLeadEvaluator() {
+ }
+
+ /*
+ * used to initialize Streaming Evaluator.
+ */
+ protected GenericUDAFLeadEvaluator(GenericUDAFLeadLagEvaluator src) {
+ super(src);
+ }
+
      @Override
      protected LeadLagBuffer getNewLLBuffer() throws HiveException {
       return new LeadBuffer();
      }
+
+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+
+ return new GenericUDAFLeadEvaluatorStreaming(this);
+ }

    }

@@ -103,4 +121,34 @@ public class GenericUDAFLead extends Gen

    }

+ /*
+ * StreamingEval: wrap regular eval. on getNext remove first row from values
+ * and return it.
+ */
+ static class GenericUDAFLeadEvaluatorStreaming extends
+ GenericUDAFLeadEvaluator implements ISupportStreamingModeForWindowing {
+
+ protected GenericUDAFLeadEvaluatorStreaming(GenericUDAFLeadLagEvaluator src) {
+ super(src);
+ }
+
+ @Override
+ public Object getNextResult(AggregationBuffer agg) throws HiveException {
+ LeadBuffer lb = (LeadBuffer) agg;
+ if (!lb.values.isEmpty()) {
+ Object res = lb.values.remove(0);
+ if (res == null) {
+ return ISupportStreamingModeForWindowing.NULL_RESULT;
+ }
+ return res;
+ }
+ return null;
+ }
+
+ @Override
+ public int getRowsRemainingAfterTerminate() throws HiveException {
+ return getAmt();
+ }
+ }
+
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java Sat Jun 7 16:07:34 2014
@@ -89,6 +89,20 @@ public abstract class GenericUDAFLeadLag
      String fnName;
      private transient Converter defaultValueConverter;

+ public GenericUDAFLeadLagEvaluator() {
+ }
+
+ /*
+ * used to initialize Streaming Evaluator.
+ */
+ protected GenericUDAFLeadLagEvaluator(GenericUDAFLeadLagEvaluator src) {
+ this.inputOI = src.inputOI;
+ this.amt = src.amt;
+ this.fnName = src.fnName;
+ this.defaultValueConverter = src.defaultValueConverter;
+ this.mode = src.mode;
+ }
+
      @Override
      public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        super.init(m, parameters);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java Sat Jun 7 16:07:34 2014
@@ -17,13 +17,21 @@
   */
  package org.apache.hadoop.hive.ql.udf.generic;

+import java.util.ArrayDeque;
+import java.util.Deque;
+
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.hive.ql.exec.Description;
  import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
  import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -120,6 +128,177 @@ public class GenericUDAFMax extends Abst
        return myagg.o;
      }

+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+ return new MaxStreamingFixedWindow(this, start.getAmt(), end.getAmt());
+ }
+
+ }
+
+ /*
+ * Based on the Paper by Daniel Lemire: Streaming Max-Min filter using no more
+ * than 3 comparisons per elem.
+ *
+ * 1. His algorithm works on fixed size windows up to the current row. For row
+ * 'i' and window 'w' it computes the min/max for window (i-w, i). 2. The core
+ * idea is to keep a queue of (max, idx) tuples. A tuple in the queue
+ * represents the max value in the range (prev tuple.idx, idx). Using the
+ * queue data structure and following 2 operations it is easy to see that
+ * maxes can be computed: - on receiving the ith row; drain the queue from the
+ * back of any entries whose value is less than the ith entry; add the ith
+ * value as a tuple in the queue (i-val, i) - on the ith step, check if the
+ * element at the front of the queue has reached its max range of influence;
+ * i.e. frontTuple.idx + w > i. If yes we can remove it from the queue. - on
+ * the ith step o/p the front of the queue as the max for the ith entry.
+ *
+ * Here we modify the algorithm: 1. to handle window's that are of the form
+ * (i-p, i+f), where p is numPreceding,f = numFollowing - we start outputing
+ * rows only after receiving f rows. - the formula for 'influence range' of an
+ * idx accounts for the following rows. 2. optimize for the case when
+ * numPreceding is Unbounded. In this case only 1 max needs to be tarcked at
+ * any given time.
+ */
+ static class MaxStreamingFixedWindow extends
+ GenericUDAFStreamingEvaluator<Object> {
+
+ class State extends GenericUDAFStreamingEvaluator<Object>.StreamingState {
+ private final Deque<Object[]> maxChain;
+
+ public State(int numPreceding, int numFollowing, AggregationBuffer buf) {
+ super(numPreceding, numFollowing, buf);
+ maxChain = new ArrayDeque<Object[]>(numPreceding + numFollowing + 1);
+ }
+
+ @Override
+ public int estimate() {
+ if (!(wrappedBuf instanceof AbstractAggregationBuffer)) {
+ return -1;
+ }
+ int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate();
+ if (underlying == -1) {
+ return -1;
+ }
+ if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return -1;
+ }
+ /*
+ * sz Estimate = sz needed by underlying AggBuffer + sz for results + sz
+ * for maxChain + 3 * JavaDataModel.PRIMITIVES1 sz of results = sz of
+ * underlying * wdwSz sz of maxChain = sz of underlying * wdwSz
+ */
+
+ int wdwSz = numPreceding + numFollowing + 1;
+ return underlying + (underlying * wdwSz) + (underlying * wdwSz)
+ + (3 * JavaDataModel.PRIMITIVES1);
+ }
+
+ protected void reset() {
+ maxChain.clear();
+ super.reset();
+ }
+
+ }
+
+ public MaxStreamingFixedWindow(GenericUDAFEvaluator wrappedEval,
+ int numPreceding, int numFollowing) {
+ super(wrappedEval, numPreceding, numFollowing);
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
+ return new State(numPreceding, numFollowing, underlying);
+ }
+
+ protected ObjectInspector inputOI() {
+ return ((GenericUDAFMaxEvaluator) wrappedEval).inputOI;
+ }
+
+ protected ObjectInspector outputOI() {
+ return ((GenericUDAFMaxEvaluator) wrappedEval).outputOI;
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters)
+ throws HiveException {
+
+ State s = (State) agg;
+ Object o = parameters[0];
+
+ while (!s.maxChain.isEmpty()) {
+ if (!removeLast(o, s.maxChain.getLast()[0])) {
+ break;
+ } else {
+ s.maxChain.removeLast();
+ }
+ }
+
+ /*
+ * add row to chain. except in case of UNB preceding: - only 1 max needs
+ * to be tracked. - current max will never become out of range. It can
+ * only be replaced by a larger max.
+ */
+ if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ || s.maxChain.isEmpty()) {
+ o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o,
+ inputOI(), ObjectInspectorCopyOption.JAVA);
+ s.maxChain.addLast(new Object[] { o, s.numRows });
+ }
+
+ if (s.numRows >= (s.numFollowing)) {
+ s.results.add(s.maxChain.getFirst()[0]);
+ }
+ s.numRows++;
+
+ int fIdx = (Integer) s.maxChain.getFirst()[1];
+ if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && s.numRows > fIdx + s.numPreceding + s.numFollowing) {
+ s.maxChain.removeFirst();
+ }
+ }
+
+ protected boolean removeLast(Object in, Object last) {
+ return isGreater(in, last);
+ }
+
+ private boolean isGreater(Object in, Object last) {
+ if (in == null) {
+ return false;
+ }
+ if (last == null) {
+ return true;
+ }
+ return ObjectInspectorUtils.compare(in, inputOI(), last, outputOI()) > 0;
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+
+ State s = (State) agg;
+ Object[] r = s.maxChain.getFirst();
+
+ for (int i = 0; i < s.numFollowing; i++) {
+ s.results.add(r[0]);
+ s.numRows++;
+ int fIdx = (Integer) r[1];
+ if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && s.numRows - s.numFollowing + i > fIdx + s.numPreceding
+ && !s.maxChain.isEmpty()) {
+ s.maxChain.removeFirst();
+ r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public int getRowsRemainingAfterTerminate() throws HiveException {
+ throw new UnsupportedOperationException();
+ }
+
    }

  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMin.java Sat Jun 7 16:07:34 2014
@@ -23,7 +23,10 @@ import org.apache.hadoop.hive.ql.exec.De
  import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
  import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax.MaxStreamingFixedWindow;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -120,6 +123,44 @@ public class GenericUDAFMin extends Abst
        return myagg.o;
      }

+ @Override
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+ BoundaryDef start = wFrmDef.getStart();
+ BoundaryDef end = wFrmDef.getEnd();
+ return new MinStreamingFixedWindow(this, start.getAmt(), end.getAmt());
+ }
+
+ }
+
+ static class MinStreamingFixedWindow extends MaxStreamingFixedWindow {
+
+ public MinStreamingFixedWindow(GenericUDAFEvaluator wrappedEval,
+ int numPreceding, int numFollowing) {
+ super(wrappedEval, numPreceding, numFollowing);
+ }
+
+ protected ObjectInspector inputOI() {
+ return ((GenericUDAFMinEvaluator) wrappedEval).inputOI;
+ }
+
+ protected ObjectInspector outputOI() {
+ return ((GenericUDAFMinEvaluator) wrappedEval).outputOI;
+ }
+
+ protected boolean removeLast(Object in, Object last) {
+ return isLess(in, last);
+ }
+
+ private boolean isLess(Object in, Object last) {
+ if (in == null) {
+ return false;
+ }
+ if (last == null) {
+ return true;
+ }
+ return ObjectInspectorUtils.compare(in, inputOI(), last, outputOI()) < 0;
+ }
+
    }

  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFRank.java Sat Jun 7 16:07:34 2014
@@ -214,6 +214,11 @@ public class GenericUDAFRank extends Abs
        return this;
      }

+ @Override
+ public int getRowsRemainingAfterTerminate()
+ throws HiveException {
+ return 0;
+ }
    }

    public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2,

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java?rev=1601146&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java Sat Jun 7 16:07:34 2014
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+@SuppressWarnings({ "deprecation", "unchecked" })
+public abstract class GenericUDAFStreamingEvaluator<T1> extends
+ GenericUDAFEvaluator implements ISupportStreamingModeForWindowing {
+
+ protected final GenericUDAFEvaluator wrappedEval;
+ protected final int numPreceding;
+ protected final int numFollowing;
+
+ public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval,
+ int numPreceding, int numFollowing) {
+ this.wrappedEval = wrappedEval;
+ this.numPreceding = numPreceding;
+ this.numFollowing = numFollowing;
+ this.mode = wrappedEval.mode;
+ }
+
+ class StreamingState extends AbstractAggregationBuffer {
+ final AggregationBuffer wrappedBuf;
+ final int numPreceding;
+ final int numFollowing;
+ final List<T1> results;
+ int numRows;
+
+ StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) {
+ this.wrappedBuf = buf;
+ this.numPreceding = numPreceding;
+ this.numFollowing = numFollowing;
+ results = new ArrayList<T1>();
+ numRows = 0;
+ }
+
+ protected void reset() {
+ results.clear();
+ numRows = 0;
+ }
+ }
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters)
+ throws HiveException {
+ throw new HiveException(getClass().getSimpleName() + ": init not supported");
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ StreamingState ss = (StreamingState) agg;
+ wrappedEval.reset(ss.wrappedBuf);
+ ss.reset();
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ throw new HiveException(getClass().getSimpleName()
+ + ": terminatePartial not supported");
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ throw new HiveException(getClass().getSimpleName()
+ + ": merge not supported");
+ }
+
+ @Override
+ public Object getNextResult(AggregationBuffer agg) throws HiveException {
+ StreamingState ss = (StreamingState) agg;
+ if (!ss.results.isEmpty()) {
+ T1 res = ss.results.remove(0);
+ if (res == null) {
+ return ISupportStreamingModeForWindowing.NULL_RESULT;
+ }
+ return res;
+ }
+ return null;
+ }
+
+ public static abstract class SumAvgEnhancer<T1, T2> extends
+ GenericUDAFStreamingEvaluator<T1> {
+
+ public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, int numPreceding,
+ int numFollowing) {
+ super(wrappedEval, numPreceding, numFollowing);
+ }
+
+ class SumAvgStreamingState extends StreamingState {
+
+ final List<T2> intermediateVals;
+
+ SumAvgStreamingState(int numPreceding, int numFollowing,
+ AggregationBuffer buf) {
+ super(numPreceding, numFollowing, buf);
+ intermediateVals = new ArrayList<T2>();
+ }
+
+ @Override
+ public int estimate() {
+ if (!(wrappedBuf instanceof AbstractAggregationBuffer)) {
+ return -1;
+ }
+ int underlying = ((AbstractAggregationBuffer) wrappedBuf).estimate();
+ if (underlying == -1) {
+ return -1;
+ }
+ if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return -1;
+ }
+ /*
+ * sz Estimate = sz needed by underlying AggBuffer + sz for results + sz
+ * for intermediates + 3 * JavaDataModel.PRIMITIVES1 sz of results = sz
+ * of underlying * wdwSz sz of intermediates = sz of underlying * wdwSz
+ */
+
+ int wdwSz = numPreceding + numFollowing + 1;
+ return underlying + (underlying * wdwSz) + (underlying * wdwSz)
+ + (3 * JavaDataModel.PRIMITIVES1);
+ }
+
+ protected void reset() {
+ intermediateVals.clear();
+ super.reset();
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
+ return new SumAvgStreamingState(numPreceding, numFollowing, underlying);
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters)
+ throws HiveException {
+ SumAvgStreamingState ss = (SumAvgStreamingState) agg;
+
+ wrappedEval.iterate(ss.wrappedBuf, parameters);
+
+ if (ss.numRows >= ss.numFollowing) {
+ ss.results.add(getNextResult(ss));
+ }
+ if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) {
+ ss.intermediateVals.add(getCurrentIntermediateResult(ss));
+ }
+
+ ss.numRows++;
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ SumAvgStreamingState ss = (SumAvgStreamingState) agg;
+ Object o = wrappedEval.terminate(ss.wrappedBuf);
+
+ for (int i = 0; i < ss.numFollowing; i++) {
+ ss.results.add(getNextResult(ss));
+ }
+ return o;
+ }
+
+ @Override
+ public int getRowsRemainingAfterTerminate() throws HiveException {
+ throw new UnsupportedOperationException();
+ }
+
+ protected abstract T1 getNextResult(SumAvgStreamingState ss)
+ throws HiveException;
+
+ protected abstract T2 getCurrentIntermediateResult(SumAvgStreamingState ss)
+ throws HiveException;
+
+ }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java Sat Jun 7 16:07:34 2014
@@ -26,9 +26,7 @@ import org.apache.hadoop.hive.ql.metadat
  import org.apache.hadoop.hive.ql.parse.SemanticException;
  import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
  import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
-import org.apache.hadoop.hive.ql.plan.ptf.ValueBoundaryDef;
  import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
  import org.apache.hadoop.hive.ql.util.JavaDataModel;
  import org.apache.hadoop.hive.serde2.io.DoubleWritable;
  import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -189,20 +187,12 @@ public class GenericUDAFSum extends Abst
        BoundaryDef start = wFrmDef.getStart();
        BoundaryDef end = wFrmDef.getEnd();

- if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
- return null;
- }
-
- if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
- return null;
- }
-
- return new GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>(
+ return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>(
            this, start.getAmt(), end.getAmt()) {

          @Override
          protected HiveDecimalWritable getNextResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>.SumAvgStreamingState ss)
              throws HiveException {
            SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
            HiveDecimal r = myagg.empty ? null : myagg.sum;
@@ -218,7 +208,7 @@ public class GenericUDAFSum extends Abst

          @Override
          protected HiveDecimal getCurrentIntermediateResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<HiveDecimalWritable, HiveDecimal>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>.SumAvgStreamingState ss)
              throws HiveException {
            SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
            return myagg.empty ? null : myagg.sum;
@@ -313,24 +303,15 @@ public class GenericUDAFSum extends Abst

      @Override
      public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
-
        BoundaryDef start = wFrmDef.getStart();
        BoundaryDef end = wFrmDef.getEnd();

- if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
- return null;
- }
-
- if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
- return null;
- }
-
- return new GenericUDAFStreamingEnhancer<DoubleWritable, Double>(this,
+ return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>(this,
            start.getAmt(), end.getAmt()) {

          @Override
          protected DoubleWritable getNextResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Double>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>.SumAvgStreamingState ss)
              throws HiveException {
            SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
            Double r = myagg.empty ? null : myagg.sum;
@@ -346,7 +327,7 @@ public class GenericUDAFSum extends Abst

          @Override
          protected Double getCurrentIntermediateResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<DoubleWritable, Double>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>.SumAvgStreamingState ss)
              throws HiveException {
            SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
            return myagg.empty ? null : new Double(myagg.sum);
@@ -443,20 +424,12 @@ public class GenericUDAFSum extends Abst
        BoundaryDef start = wFrmDef.getStart();
        BoundaryDef end = wFrmDef.getEnd();

- if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
- return null;
- }
-
- if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
- return null;
- }
-
- return new GenericUDAFStreamingEnhancer<LongWritable, Long>(this,
+ return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>(this,
            start.getAmt(), end.getAmt()) {

          @Override
          protected LongWritable getNextResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<LongWritable, Long>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>.SumAvgStreamingState ss)
              throws HiveException {
            SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
            Long r = myagg.empty ? null : myagg.sum;
@@ -472,7 +445,7 @@ public class GenericUDAFSum extends Abst

          @Override
          protected Long getCurrentIntermediateResult(
- org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEnhancer<LongWritable, Long>.StreamingState ss)
+ org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>.SumAvgStreamingState ss)
              throws HiveException {
            SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
            return myagg.empty ? null : new Long(myagg.sum);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/ISupportStreamingModeForWindowing.java Sat Jun 7 16:07:34 2014
@@ -33,6 +33,14 @@ import org.apache.hadoop.hive.ql.udf.ptf
  public interface ISupportStreamingModeForWindowing {

    Object getNextResult(AggregationBuffer agg) throws HiveException;
+
+ /*
+ * for functions that don't support a Window, this provides the rows remaining to be
+ * added to output. Functions that return a Window can throw a UnsupportedException,
+ * this method shouldn't be called. For Ranking fns return 0; lead/lag fns return the
+ * lead/lag amt.
+ */
+ int getRowsRemainingAfterTerminate() throws HiveException;

    public static Object NULL_RESULT = new Object();
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Sat Jun 7 16:07:34 2014
@@ -28,8 +28,10 @@ import org.apache.hadoop.hive.common.typ
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
  import org.apache.hadoop.hive.ql.exec.PTFOperator;
  import org.apache.hadoop.hive.ql.exec.PTFPartition;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
  import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
  import org.apache.hadoop.hive.ql.exec.PTFRollingPartition;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -142,6 +144,49 @@ public class WindowingTableFunction exte
      return true;
    }

+ private boolean streamingPossible(Configuration cfg, WindowFunctionDef wFnDef) {
+ WindowFrameDef wdwFrame = wFnDef.getWindowFrame();
+ WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFnDef
+ .getName());
+
+ if (!wFnInfo.isSupportsWindow()) {
+ return true;
+ }
+
+ BoundaryDef start = wdwFrame.getStart();
+ BoundaryDef end = wdwFrame.getEnd();
+
+ /*
+ * Currently we are not handling dynamic sized windows implied by range
+ * based windows.
+ */
+ if (start instanceof ValueBoundaryDef || end instanceof ValueBoundaryDef) {
+ return false;
+ }
+
+ /*
+ * Windows that are unbounded following don't benefit from Streaming.
+ */
+ if (end.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return false;
+ }
+
+ /*
+ * let function decide if it can handle this special case.
+ */
+ if (start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT) {
+ return true;
+ }
+
+ int windowLimit = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+
+ if (windowLimit < (start.getAmt() + end.getAmt() + 1)) {
+ return false;
+ }
+
+ return true;
+ }
+
    /*
     * (non-Javadoc)
     *
@@ -155,6 +200,7 @@ public class WindowingTableFunction exte
     * ISupportStreamingModeForWindowing. 3. Is an invocation on a 'fixed' window.
     * So no Unbounded Preceding or Following.
     */
+ @SuppressWarnings("resource")
    private int[] setCanAcceptInputAsStream(Configuration cfg) {

      canAcceptInputAsStream = false;
@@ -171,8 +217,9 @@ public class WindowingTableFunction exte
        WindowFunctionDef wFnDef = tabDef.getWindowFunctions().get(i);
        WindowFrameDef wdwFrame = wFnDef.getWindowFrame();
        GenericUDAFEvaluator fnEval = wFnDef.getWFnEval();
- GenericUDAFEvaluator streamingEval = fnEval
- .getWindowingEvaluator(wdwFrame);
+ boolean streamingPossible = streamingPossible(cfg, wFnDef);
+ GenericUDAFEvaluator streamingEval = streamingPossible ? fnEval
+ .getWindowingEvaluator(wdwFrame) : null;
        if (streamingEval != null
            && streamingEval instanceof ISupportStreamingModeForWindowing) {
          continue;
@@ -343,6 +390,14 @@ public class WindowingTableFunction exte
        int numRowsRemaining = wFn.getWindowFrame().getEnd().getAmt();
        if (fnEval instanceof ISupportStreamingModeForWindowing) {
          fnEval.terminate(streamingState.aggBuffers[i]);
+
+ WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn
+ .getName());
+ if (!wFnInfo.isSupportsWindow()) {
+ numRowsRemaining = ((ISupportStreamingModeForWindowing) fnEval)
+ .getRowsRemainingAfterTerminate();
+ }
+
          if (numRowsRemaining != BoundarySpec.UNBOUNDED_AMOUNT) {
            while (numRowsRemaining > 0) {
              Object out = ((ISupportStreamingModeForWindowing) fnEval)
@@ -411,13 +466,20 @@ public class WindowingTableFunction exte
        } else if (wFn.isPivotResult()) {
          GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame());
          if ( streamingEval != null && streamingEval instanceof ISupportStreamingModeForWindowing ) {
- wFn.setWFnEval(streamingEval);
- if ( wFn.getOI() instanceof ListObjectInspector ) {
- ListObjectInspector listOI = (ListObjectInspector) wFn.getOI();
- wFn.setOI(listOI.getListElementObjectInspector());
+ ISupportStreamingModeForWindowing strEval = (ISupportStreamingModeForWindowing) streamingEval;
+ if ( strEval.getRowsRemainingAfterTerminate() == 0 ) {
+ wFn.setWFnEval(streamingEval);
+ if ( wFn.getOI() instanceof ListObjectInspector ) {
+ ListObjectInspector listOI = (ListObjectInspector) wFn.getOI();
+ wFn.setOI(listOI.getListElementObjectInspector());
+ }
+ output.add(null);
+ wFnsWithWindows.add(i);
+ } else {
+ outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn,
+ pItr);
+ output.add(null);
            }
- output.add(null);
- wFnsWithWindows.add(i);
          } else {
            outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
            output.add(null);

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java?rev=1601146&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMax.java Sat Jun 7 16:07:34 2014
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udaf;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+public class TestStreamingMax {
+
+ public void maxLong(Iterator<Long> inVals, int inSz, int numPreceding,
+ int numFollowing, Iterator<Long> outVals) throws HiveException {
+
+ GenericUDAFMax fnR = new GenericUDAFMax();
+ TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo };
+ ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector };
+
+ LongWritable[] in = new LongWritable[1];
+ in[0] = new LongWritable();
+
+ TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in,
+ inputOIs, inSz, numPreceding, numFollowing, outVals);
+
+ }
+
+ @Test
+ public void testLong_3_4() throws HiveException {
+
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(5L, 6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L,
+ 10L);
+ maxLong(inVals.iterator(), 10, 3, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_3_4() throws HiveException {
+
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays.asList(10L, 10L, 10L, 10L, 9L, 8L, 7L, 6L, 5L,
+ 4L);
+ maxLong(inVals.iterator(), 10, 3, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_1_4() throws HiveException {
+
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays
+ .asList(10L, 10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L);
+ maxLong(inVals.iterator(), 10, 1, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_3_0() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ maxLong(inVals.iterator(), 10, 3, 0, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_0_5() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, 10L,
+ 10L);
+ maxLong(inVals.iterator(), 10, 0, 5, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_7_2() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 10L,
+ 10L);
+ maxLong(inVals.iterator(), 10, 7, 2, outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_7_2() throws HiveException {
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays.asList(10L, 10L, 10L, 10L, 10L, 10L, 10L, 10L,
+ 9L, 8L);
+ maxLong(inVals.iterator(), 10, 7, 2, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_15_15() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(10L, 10L, 10L, 10L, 10L, 10L, 10L, 10L,
+ 10L, 10L);
+ maxLong(inVals.iterator(), 10, 15, 15, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_unb_0() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ maxLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0,
+ outVals.iterator());
+ }
+
+ @Test
+ public void testLong_unb_5() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(6L, 7L, 8L, 9L, 10L, 10L, 10L, 10L, 10L,
+ 10L);
+ maxLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5,
+ outVals.iterator());
+ }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java?rev=1601146&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingMin.java Sat Jun 7 16:07:34 2014
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udaf;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.udaf.TestStreamingSum.TypeHandler;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+public class TestStreamingMin {
+
+ public void minLong(Iterator<Long> inVals, int inSz, int numPreceding,
+ int numFollowing, Iterator<Long> outVals) throws HiveException {
+
+ GenericUDAFMin fnR = new GenericUDAFMin();
+ TypeInfo[] inputTypes = { TypeInfoFactory.longTypeInfo };
+ ObjectInspector[] inputOIs = { PrimitiveObjectInspectorFactory.writableLongObjectInspector };
+
+ LongWritable[] in = new LongWritable[1];
+ in[0] = new LongWritable();
+
+ TestStreamingSum._agg(fnR, inputTypes, inVals, TypeHandler.LongHandler, in,
+ inputOIs, inSz, numPreceding, numFollowing, outVals);
+
+ }
+
+ @Test
+ public void testLong_3_4() throws HiveException {
+
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 1L, 1L, 1L, 2L, 3L, 4L, 5L, 6L, 7L);
+ minLong(inVals.iterator(), 10, 3, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_3_4() throws HiveException {
+
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays.asList(6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, 1L);
+ minLong(inVals.iterator(), 10, 3, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_1_4() throws HiveException {
+
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays.asList(6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, 1L);
+ minLong(inVals.iterator(), 10, 1, 4, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_3_0() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 1L, 1L, 1L, 2L, 3L, 4L, 5L, 6L, 7L);
+ minLong(inVals.iterator(), 10, 3, 0, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_0_5() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ minLong(inVals.iterator(), 10, 0, 5, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_7_2() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 3L);
+ minLong(inVals.iterator(), 10, 7, 2, outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_7_2() throws HiveException {
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays.asList(8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L, 1L, 1L);
+ minLong(inVals.iterator(), 10, 7, 2, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_15_15() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L);
+ minLong(inVals.iterator(), 10, 15, 15, outVals.iterator());
+ }
+
+ @Test
+ public void testLong_unb_0() throws HiveException {
+ List<Long> inVals = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
+ List<Long> outVals = Arrays.asList(1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L);
+ minLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 0,
+ outVals.iterator());
+ }
+
+ @Test
+ public void testLongr_unb_5() throws HiveException {
+ List<Long> inVals = Arrays.asList(10L, 9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
+ List<Long> outVals = Arrays.asList(5L, 4L, 3L, 2L, 1L, 1L, 1L, 1L, 1L, 1L);
+ minLong(inVals.iterator(), 10, BoundarySpec.UNBOUNDED_AMOUNT, 5,
+ outVals.iterator());
+ }
+
+}

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java?rev=1601146&r1=1601145&r2=1601146&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java Sat Jun 7 16:07:34 2014
@@ -174,8 +174,14 @@ public class TestStreamingSum {
        fn.aggregate(agg, in);
        Object out = oS.getNextResult(agg);
        if (out != null) {
- out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null
- : typeHandler.get((TW) out);
+ if ( out == ISupportStreamingModeForWindowing.NULL_RESULT ) {
+ out = null;
+ } else {
+ try {
+ out = typeHandler.get((TW) out);
+ } catch(ClassCastException ce) {
+ }
+ }
          Assert.assertEquals(out, outVals.next());
          outSz++;
        }
@@ -185,8 +191,14 @@ public class TestStreamingSum {

      while (outSz < inSz) {
        Object out = oS.getNextResult(agg);
- out = out == ISupportStreamingModeForWindowing.NULL_RESULT ? null
- : typeHandler.get((TW) out);
+ if ( out == ISupportStreamingModeForWindowing.NULL_RESULT ) {
+ out = null;
+ } else {
+ try {
+ out = typeHandler.get((TW) out);
+ } catch(ClassCastException ce) {
+ }
+ }
        Assert.assertEquals(out, outVals.next());
        outSz++;
      }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJun 7, '14 at 4:07p
activeJun 7, '14 at 4:07p
posts1
users1
websitehive.apache.org

1 user in discussion

Hashutosh: 1 post

People

Translate

site design / logo © 2021 Grokbase