FAQ
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1430979&r1=1430978&r2=1430979&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Jan 9 17:59:23 2013
@@ -217,11 +217,11 @@ public class SemanticAnalyzer extends Ba
    private final UnparseTranslator unparseTranslator;
    private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx();

- //prefix for column names auto generated by hive
+ // prefix for column names auto generated by hive
    private final String autogenColAliasPrfxLbl;
    private final boolean autogenColAliasPrfxIncludeFuncName;

- //Max characters when auto generating the column name with func name
+ // Max characters when auto generating the column name with func name
    private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20;

    private static class Phase1Ctx {
@@ -250,9 +250,9 @@ public class SemanticAnalyzer extends Ba
      prunedPartitions = new HashMap<String, PrunedPartitionList>();
      unparseTranslator = new UnparseTranslator();
      autogenColAliasPrfxLbl = HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL);
+ HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL);
      autogenColAliasPrfxIncludeFuncName = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME);
+ HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME);
      queryProperties = new QueryProperties();
      opToPartToSkewedPruner = new HashMap<TableScanOperator, Map<String, ExprNodeDesc>>();
    }
@@ -355,7 +355,8 @@ public class SemanticAnalyzer extends Ba
        ASTNode selectExpr, QBParseInfo qbp) {
      for (int i = 0; i < selectExpr.getChildCount(); ++i) {
        ASTNode selExpr = (ASTNode) selectExpr.getChild(i);
- if ((selExpr.getToken().getType() == HiveParser.TOK_SELEXPR) && (selExpr.getChildCount() == 2)) {
+ if ((selExpr.getToken().getType() == HiveParser.TOK_SELEXPR)
+ && (selExpr.getChildCount() == 2)) {
          String columnAlias = unescapeIdentifier(selExpr.getChild(1).getText());
          qbp.setExprToColumnAlias((ASTNode) selExpr.getChild(0), columnAlias);
        }
@@ -490,15 +491,15 @@ public class SemanticAnalyzer extends Ba
        // Need to change it to list of columns
        if (sampleCols.size() > 2) {
          throw new SemanticException(generateErrorMessage(
- (ASTNode) tabref.getChild(0),
- ErrorMsg.SAMPLE_RESTRICTION.getMsg()));
+ (ASTNode) tabref.getChild(0),
+ ErrorMsg.SAMPLE_RESTRICTION.getMsg()));
        }
        qb.getParseInfo().setTabSample(
            alias,
            new TableSample(
- unescapeIdentifier(sampleClause.getChild(0).getText()),
- unescapeIdentifier(sampleClause.getChild(1).getText()),
- sampleCols));
+ unescapeIdentifier(sampleClause.getChild(0).getText()),
+ unescapeIdentifier(sampleClause.getChild(1).getText()),
+ sampleCols));
        if (unparseTranslator.isEnabled()) {
          for (ASTNode sampleCol : sampleCols) {
            unparseTranslator.addIdentifierTranslation((ASTNode) sampleCol
@@ -622,7 +623,7 @@ public class SemanticAnalyzer extends Ba
      if ((numChildren != 2) && (numChildren != 3)
          && join.getToken().getType() != HiveParser.TOK_UNIQUEJOIN) {
        throw new SemanticException(generateErrorMessage(join,
- "Join with multiple children"));
+ "Join with multiple children"));
      }

      for (int num = 0; num < numChildren; num++) {
@@ -735,7 +736,7 @@ public class SemanticAnalyzer extends Ba

        case HiveParser.TOK_INSERT_INTO:
          String currentDatabase = db.getCurrentDatabase();
- String tab_name = getUnescapedName((ASTNode)ast.getChild(0).getChild(0), currentDatabase);
+ String tab_name = getUnescapedName((ASTNode) ast.getChild(0).getChild(0), currentDatabase);
          qbp.addInsertIntoTable(tab_name);

        case HiveParser.TOK_DESTINATION:
@@ -759,7 +760,7 @@ public class SemanticAnalyzer extends Ba
          int child_count = ast.getChildCount();
          if (child_count != 1) {
            throw new SemanticException(generateErrorMessage(ast,
- "Multiple Children " + child_count));
+ "Multiple Children " + child_count));
          }

          // Check if this is a subquery / lateral view
@@ -792,10 +793,10 @@ public class SemanticAnalyzer extends Ba
          qbp.setDistributeByExprForClause(ctx_1.dest, ast);
          if (qbp.getClusterByForClause(ctx_1.dest) != null) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.CLUSTERBY_DISTRIBUTEBY_CONFLICT.getMsg()));
+ ErrorMsg.CLUSTERBY_DISTRIBUTEBY_CONFLICT.getMsg()));
          } else if (qbp.getOrderByForClause(ctx_1.dest) != null) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg()));
+ ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg()));
          }
          break;

@@ -806,10 +807,10 @@ public class SemanticAnalyzer extends Ba
          qbp.setSortByExprForClause(ctx_1.dest, ast);
          if (qbp.getClusterByForClause(ctx_1.dest) != null) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.CLUSTERBY_SORTBY_CONFLICT.getMsg()));
+ ErrorMsg.CLUSTERBY_SORTBY_CONFLICT.getMsg()));
          } else if (qbp.getOrderByForClause(ctx_1.dest) != null) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.ORDERBY_SORTBY_CONFLICT.getMsg()));
+ ErrorMsg.ORDERBY_SORTBY_CONFLICT.getMsg()));
          }

          break;
@@ -821,7 +822,7 @@ public class SemanticAnalyzer extends Ba
          qbp.setOrderByExprForClause(ctx_1.dest, ast);
          if (qbp.getClusterByForClause(ctx_1.dest) != null) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.CLUSTERBY_ORDERBY_CONFLICT.getMsg()));
+ ErrorMsg.CLUSTERBY_ORDERBY_CONFLICT.getMsg()));
          }
          break;

@@ -837,7 +838,7 @@ public class SemanticAnalyzer extends Ba
          }
          if (qbp.getSelForClause(ctx_1.dest).getToken().getType() == HiveParser.TOK_SELECTDI) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.SELECT_DISTINCT_WITH_GROUPBY.getMsg()));
+ ErrorMsg.SELECT_DISTINCT_WITH_GROUPBY.getMsg()));
          }
          qbp.setGroupByExprForClause(ctx_1.dest, ast);
          skipRecursion = true;
@@ -864,7 +865,7 @@ public class SemanticAnalyzer extends Ba
        case HiveParser.TOK_ANALYZE:
          // Case of analyze command

- String table_name = getUnescapedName((ASTNode)ast.getChild(0).getChild(0));
+ String table_name = getUnescapedName((ASTNode) ast.getChild(0).getChild(0));


          qb.setTabAlias(table_name, table_name);
@@ -882,7 +883,7 @@ public class SemanticAnalyzer extends Ba
          // select * from (subq1 union subq2) subqalias
          if (!qbp.getIsSubQ()) {
            throw new SemanticException(generateErrorMessage(ast,
- ErrorMsg.UNION_NOTIN_SUBQ.getMsg()));
+ ErrorMsg.UNION_NOTIN_SUBQ.getMsg()));
          }

        case HiveParser.TOK_INSERT:
@@ -925,7 +926,7 @@ public class SemanticAnalyzer extends Ba
              }
            } else {
              throw new SemanticException(ErrorMsg.INSERT_INTO_DYNAMICPARTITION_IFNOTEXISTS
- .getMsg(partition.toString()));
+ .getMsg(partition.toString()));
            }
          }

@@ -977,7 +978,7 @@ public class SemanticAnalyzer extends Ba
          }

          // Disallow INSERT INTO on bucketized tables
- if(qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()) &&
+ if (qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()) &&
              tab.getNumBuckets() > 0) {
            throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
                getMsg("Table: " + tab_name));
@@ -999,9 +1000,9 @@ public class SemanticAnalyzer extends Ba
            if (qb.getParseInfo().isAnalyzeCommand()) {
              throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg());
            }
- String fullViewName = tab.getDbName()+"."+tab.getTableName();
+ String fullViewName = tab.getDbName() + "." + tab.getTableName();
            // Prevent view cycles
- if(viewsExpanded.contains(fullViewName)){
+ if (viewsExpanded.contains(fullViewName)) {
              throw new SemanticException("Recursive view " + fullViewName +
                  " detected (cycle: " + StringUtils.join(viewsExpanded, " -> ") +
                  " -> " + fullViewName + ").");
@@ -1016,8 +1017,8 @@ public class SemanticAnalyzer extends Ba

          if (!InputFormat.class.isAssignableFrom(tab.getInputFormatClass())) {
            throw new SemanticException(generateErrorMessage(
- qb.getParseInfo().getSrcForAlias(alias),
- ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg()));
+ qb.getParseInfo().getSrcForAlias(alias),
+ ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg()));
          }

          qb.getMetaData().setSrcForAlias(alias, tab);
@@ -1028,8 +1029,9 @@ public class SemanticAnalyzer extends Ba
              try {
                ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec);
              } catch (HiveException e) {
- throw new SemanticException(generateErrorMessage(qb.getParseInfo().getSrcForAlias(alias),
- "Cannot get partitions for " + ts.partSpec), e);
+ throw new SemanticException(generateErrorMessage(
+ qb.getParseInfo().getSrcForAlias(alias),
+ "Cannot get partitions for " + ts.partSpec), e);
              }
            }
            qb.getParseInfo().addTableSpec(alias, ts);
@@ -1046,7 +1048,7 @@ public class SemanticAnalyzer extends Ba
          QBExpr qbexpr = qb.getSubqForAlias(alias);
          getMetaData(qbexpr);
          if (wasView) {
- viewsExpanded.remove(viewsExpanded.size()-1);
+ viewsExpanded.remove(viewsExpanded.size() - 1);
          }
        }

@@ -1106,7 +1108,7 @@ public class SemanticAnalyzer extends Ba
                ctx.setResFile(null);

                // allocate a temporary output dir on the location of the table
- String tableName = getUnescapedName((ASTNode)ast.getChild(0));
+ String tableName = getUnescapedName((ASTNode) ast.getChild(0));
                Table newTable = db.newTable(tableName);
                Path location;
                try {
@@ -1120,7 +1122,7 @@ public class SemanticAnalyzer extends Ba
                      FileUtils.makeQualified(location, conf).toUri());
                } catch (Exception e) {
                  throw new SemanticException(generateErrorMessage(ast,
- "Error creating temporary folder on: " + location.toString()), e);
+ "Error creating temporary folder on: " + location.toString()), e);
                }
                if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
                  tableSpec ts = new tableSpec(db, conf, this.ast);
@@ -1141,7 +1143,7 @@ public class SemanticAnalyzer extends Ba
          }
          default:
            throw new SemanticException(generateErrorMessage(ast,
- "Unknown Token Type " + ast.getToken().getType()));
+ "Unknown Token Type " + ast.getToken().getType()));
          }
        }
      } catch (HiveException e) {
@@ -1159,7 +1161,7 @@ public class SemanticAnalyzer extends Ba
      ASTNode viewTree;
      final ASTNodeOrigin viewOrigin = new ASTNodeOrigin("VIEW", tab.getTableName(),
          tab.getViewExpandedText(), alias, qb.getParseInfo().getSrcForAlias(
- alias));
+ alias));
      try {
        String viewText = tab.getViewExpandedText();
        // Reparse text, passing null for context to avoid clobbering
@@ -1341,6 +1343,7 @@ public class SemanticAnalyzer extends Ba
        joinTree.addFilterMapping(cond.getRight(), cond.getLeft(), filters.get(1).size());
      }
    }
+
    /**
     * Parse the join condition. If the condition is a join condition, throw an
     * error if it is not an equality. Otherwise, break it into left and right
@@ -1406,7 +1409,7 @@ public class SemanticAnalyzer extends Ba
          if ((rightCondAl1.size() != 0)
((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0))) {
            if (type.equals(JoinType.LEFTOUTER) ||
- type.equals(JoinType.FULLOUTER)) {
+ type.equals(JoinType.FULLOUTER)) {
              if (conf.getBoolVar(HiveConf.ConfVars.HIVEOUTERJOINSUPPORTSFILTERS)) {
                joinTree.getFilters().get(0).add(joinCond);
              } else {
@@ -1493,7 +1496,7 @@ public class SemanticAnalyzer extends Ba
        for (int ci = childrenBegin; ci < joinCond.getChildCount(); ci++) {
          parseJoinCondPopulateAlias(joinTree, (ASTNode) joinCond.getChild(ci),
              leftAlias.get(ci - childrenBegin), rightAlias.get(ci
- - childrenBegin), null);
+ - childrenBegin), null);
        }

        boolean leftAliasNull = true;
@@ -1574,7 +1577,7 @@ public class SemanticAnalyzer extends Ba

      Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new FilterDesc(genExprNodeDesc(condn, inputRR), false), new RowSchema(
- inputRR.getColumnInfos()), input), inputRR);
+ inputRR.getColumnInfos()), input), inputRR);

      return output;
    }
@@ -1605,11 +1608,11 @@ public class SemanticAnalyzer extends Ba
      RowResolver inputRR = inputCtx.getRowResolver();
      Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new FilterDesc(genExprNodeDesc(condn, inputRR), false), new RowSchema(
- inputRR.getColumnInfos()), input), inputRR);
+ inputRR.getColumnInfos()), input), inputRR);

      if (LOG.isDebugEnabled()) {
        LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: "
- + inputRR.toString());
+ + inputRR.toString());
      }
      return output;
    }
@@ -1677,8 +1680,8 @@ public class SemanticAnalyzer extends Ba
          col_list.add(expr);
          output.put(tmp[0], tmp[1],
              new ColumnInfo(getColumnInternalName(pos), colInfo.getType(),
- colInfo.getTabAlias(), colInfo.getIsVirtualCol(),
- colInfo.isHiddenVirtualCol()));
+ colInfo.getTabAlias(), colInfo.getIsVirtualCol(),
+ colInfo.isHiddenVirtualCol()));
          pos = Integer.valueOf(pos.intValue() + 1);
          matched++;

@@ -1800,7 +1803,7 @@ public class SemanticAnalyzer extends Ba
            tblDesc.getProperties().setProperty(serdeConstants.LINE_DELIM, lineDelim);
            if (!lineDelim.equals("\n") && !lineDelim.equals("10")) {
              throw new SemanticException(generateErrorMessage(rowChild,
- ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg()));
+ ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg()));
            }
            break;
          default:
@@ -1885,7 +1888,7 @@ public class SemanticAnalyzer extends Ba
            String intName = getColumnInternalName(i);
            ColumnInfo colInfo = new ColumnInfo(intName, TypeInfoUtils
                .getTypeInfoFromTypeString(getTypeStringFromAST((ASTNode) child
- .getChild(1))), null, false);
+ .getChild(1))), null, false);
            colInfo.setAlias(colAlias);
            outputCols.add(colInfo);
          }
@@ -1978,8 +1981,8 @@ public class SemanticAnalyzer extends Ba

      Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new ScriptDesc(
- fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())),
- inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo),
+ fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())),
+ inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo),
          new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch);

      return output;
@@ -2038,7 +2041,7 @@ public class SemanticAnalyzer extends Ba
    private List<Integer> getGroupingSetsForRollup(int size) {
      List<Integer> groupingSetKeys = new ArrayList<Integer>();
      for (int i = 0; i <= size; i++) {
- groupingSetKeys.add((1 << i) - 1);
+ groupingSetKeys.add((1 << i) - 1);
      }
      return groupingSetKeys;
    }
@@ -2046,7 +2049,7 @@ public class SemanticAnalyzer extends Ba
    private List<Integer> getGroupingSetsForCube(int size) {
      int count = 1 << size;
      List<Integer> results = new ArrayList<Integer>(count);
- for(int i = 0; i < count; ++i) {
+ for (int i = 0; i < count; ++i) {
        results.add(i);
      }
      return results;
@@ -2083,16 +2086,16 @@ public class SemanticAnalyzer extends Ba
      if (root != null) {
        for (int i = 0; i < root.getChildCount(); ++i) {
          ASTNode child = (ASTNode) root.getChild(i);
- if(child.getType() != HiveParser.TOK_GROUPING_SETS_EXPRESSION) {
+ if (child.getType() != HiveParser.TOK_GROUPING_SETS_EXPRESSION) {
            continue;
          }
          int bitmap = 0;
          for (int j = 0; j < child.getChildCount(); ++j) {
            String treeAsString = child.getChild(j).toStringTree();
            Integer pos = exprPos.get(treeAsString);
- if(pos == null) {
+ if (pos == null) {
              throw new SemanticException(
- generateErrorMessage((ASTNode)child.getChild(j),
+ generateErrorMessage((ASTNode) child.getChild(j),
                      ErrorMsg.HIVE_GROUPING_SETS_EXPR_NOT_IN_GROUPBY.getErrorCodedMsg()));
            }
            bitmap = setBit(bitmap, pos);
@@ -2100,7 +2103,7 @@ public class SemanticAnalyzer extends Ba
          result.add(bitmap);
        }
      }
- if(checkForNoAggr(result)) {
+ if (checkForNoAggr(result)) {
        throw new SemanticException(
            ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOFUNC.getMsg());
      }
@@ -2109,7 +2112,7 @@ public class SemanticAnalyzer extends Ba

    private boolean checkForNoAggr(List<Integer> bitmaps) {
      boolean ret = true;
- for(int mask : bitmaps) {
+ for (int mask : bitmaps) {
        ret &= mask == 0;
      }
      return ret;
@@ -2173,7 +2176,7 @@ public class SemanticAnalyzer extends Ba
      ASTNode root = (ASTNode) selExpr.getChild(0);
      if (root.getType() == HiveParser.TOK_TABLE_OR_COL) {
        colAlias =
- BaseSemanticAnalyzer.unescapeIdentifier(root.getChild(0).getText());
+ BaseSemanticAnalyzer.unescapeIdentifier(root.getChild(0).getText());
        colRef[0] = tabAlias;
        colRef[1] = colAlias;
        return colRef;
@@ -2195,23 +2198,23 @@ public class SemanticAnalyzer extends Ba
        }
      }

- //if specified generate alias using func name
+ // if specified generate alias using func name
      if (includeFuncName && (root.getType() == HiveParser.TOK_FUNCTION)) {

        String expr_flattened = root.toStringTree();

- //remove all TOK tokens
+ // remove all TOK tokens
        String expr_no_tok = expr_flattened.replaceAll("TOK_\\S+", "");

- //remove all non alphanumeric letters, replace whitespace spans with underscore
- String expr_formatted = expr_no_tok.replaceAll("\\W", " ").trim().replaceAll("\\s+", "_");
+ // remove all non alphanumeric letters, replace whitespace spans with underscore
+ String expr_formatted = expr_no_tok.replaceAll("\\W", " ").trim().replaceAll("\\s+", "_");

- //limit length to 20 chars
- if(expr_formatted.length()>AUTOGEN_COLALIAS_PRFX_MAXLENGTH) {
+ // limit length to 20 chars
+ if (expr_formatted.length() > AUTOGEN_COLALIAS_PRFX_MAXLENGTH) {
          expr_formatted = expr_formatted.substring(0, AUTOGEN_COLALIAS_PRFX_MAXLENGTH);
        }

- //append colnum to make it unique
+ // append colnum to make it unique
        colAlias = expr_formatted.concat("_" + colNum);
      }

@@ -2274,7 +2277,7 @@ public class SemanticAnalyzer extends Ba
      }

      boolean isInTransform = (selExprList.getChild(posn).getChild(0).getType() ==
- HiveParser.TOK_TRANSFORM);
+ HiveParser.TOK_TRANSFORM);
      if (isInTransform) {
        queryProperties.setUsesScript(true);
        globalLimitCtx.setHasTransformOrUDTF(true);
@@ -2314,14 +2317,14 @@ public class SemanticAnalyzer extends Ba
        // Only support a single expression when it's a UDTF
        if (selExprList.getChildCount() > 1) {
          throw new SemanticException(generateErrorMessage(
- (ASTNode) selExprList.getChild(1),
- ErrorMsg.UDTF_MULTIPLE_EXPR.getMsg()));
+ (ASTNode) selExprList.getChild(1),
+ ErrorMsg.UDTF_MULTIPLE_EXPR.getMsg()));
        }
        // Require an AS for UDTFs for column aliases
        ASTNode selExpr = (ASTNode) selExprList.getChild(posn);
        if (selExpr.getChildCount() < 2) {
          throw new SemanticException(generateErrorMessage(udtfExpr,
- ErrorMsg.UDTF_REQUIRE_AS.getMsg()));
+ ErrorMsg.UDTF_REQUIRE_AS.getMsg()));
        }
        // Get the column / table aliases from the expression. Start from 1 as
        // 0 is the TOK_FUNCTION
@@ -2382,8 +2385,8 @@ public class SemanticAnalyzer extends Ba
        // AST's are slightly different.
        if (!isInTransform && !isUDTF && child.getChildCount() > 2) {
          throw new SemanticException(generateErrorMessage(
- (ASTNode) child.getChild(2),
- ErrorMsg.INVALID_AS.getMsg()));
+ (ASTNode) child.getChild(2),
+ ErrorMsg.INVALID_AS.getMsg()));
        }

        // The real expression
@@ -2399,7 +2402,7 @@ public class SemanticAnalyzer extends Ba
          // Get rid of TOK_SELEXPR
          expr = (ASTNode) child.getChild(0);
          String[] colRef = getColAlias(child, autogenColAliasPrfxLbl, inputRR,
- autogenColAliasPrfxIncludeFuncName, i);
+ autogenColAliasPrfxIncludeFuncName, i);
          tabAlias = colRef[0];
          colAlias = colRef[1];
          if (hasAsClause) {
@@ -2452,9 +2455,9 @@ public class SemanticAnalyzer extends Ba
          }

          ColumnInfo colInfo = new ColumnInfo(getColumnInternalName(pos),
- exp.getWritableObjectInspector(), tabAlias, false);
+ exp.getWritableObjectInspector(), tabAlias, false);
          colInfo.setSkewedCol((exp instanceof ExprNodeColumnDesc) ? ((ExprNodeColumnDesc) exp)
- .isSkewedCol() : false);
+ .isSkewedCol() : false);
          out_rwsch.put(tabAlias, colAlias, colInfo);

          pos = Integer.valueOf(pos.intValue() + 1);
@@ -2477,7 +2480,7 @@ public class SemanticAnalyzer extends Ba

      Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
          new SelectDesc(col_list, columnNames, selectStar), new RowSchema(
- out_rwsch.getColumnInfos()), input), out_rwsch);
+ out_rwsch.getColumnInfos()), input), out_rwsch);

      output.setColumnExprMap(colExprMap);
      if (isInTransform) {
@@ -2557,7 +2560,7 @@ public class SemanticAnalyzer extends Ba
        boolean isDistinct, boolean isAllColumns)
        throws SemanticException {
      ArrayList<ObjectInspector> originalParameterTypeInfos =
- getWritableObjectInspector(aggParameters);
+ getWritableObjectInspector(aggParameters);
      GenericUDAFEvaluator result = FunctionRegistry.getGenericUDAFEvaluator(
          aggName, originalParameterTypeInfos, isDistinct, isAllColumns);
      if (null == result) {
@@ -2733,9 +2736,9 @@ public class SemanticAnalyzer extends Ba
        List<String> inputKeyCols = ((ReduceSinkDesc)
            reduceSinkOperatorInfo.getConf()).getOutputKeyColumnNames();
        if (inputKeyCols.size() > 0) {
- lastKeyColName = inputKeyCols.get(inputKeyCols.size()-1);
+ lastKeyColName = inputKeyCols.get(inputKeyCols.size() - 1);
        }
- reduceValues = ((ReduceSinkDesc)reduceSinkOperatorInfo.getConf()).getValueCols();
+ reduceValues = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getValueCols();
      }
      int numDistinctUDFs = 0;
      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
@@ -2752,7 +2755,7 @@ public class SemanticAnalyzer extends Ba
        for (int i = 1; i < value.getChildCount(); i++) {
          ASTNode paraExpr = (ASTNode) value.getChild(i);
          ColumnInfo paraExprInfo =
- groupByInputRowResolver.getExpression(paraExpr);
+ groupByInputRowResolver.getExpression(paraExpr);
          if (paraExprInfo == null) {
            throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(paraExpr));
          }
@@ -2763,8 +2766,8 @@ public class SemanticAnalyzer extends Ba
            // if aggr is distinct, the parameter is name is constructed as
            // KEY.lastKeyColName:<tag>._colx
            paraExpression = Utilities.ReduceField.KEY.name() + "." +
- lastKeyColName + ":" + numDistinctUDFs + "." +
- getColumnInternalName(i-1);
+ lastKeyColName + ":" + numDistinctUDFs + "." +
+ getColumnInternalName(i - 1);

          }

@@ -2806,13 +2809,14 @@ public class SemanticAnalyzer extends Ba
        }
      }
      float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
- float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+ float memoryThreshold = HiveConf
+ .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);

      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false,groupByMemoryUsage,memoryThreshold, null, false, 0),
- new RowSchema(groupByOutputRowResolver.getColumnInfos()),
- reduceSinkOperatorInfo), groupByOutputRowResolver);
+ new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
+ false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+ new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+ reduceSinkOperatorInfo), groupByOutputRowResolver);
      op.setColumnExprMap(colExprMap);
      return op;
    }
@@ -2823,7 +2827,7 @@ public class SemanticAnalyzer extends Ba
    // For eg: consider: select key, value, count(1) from T group by key, value with rollup.
    // Assuming map-side aggregation and no skew, the plan would look like:
    //
- // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink
+ // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink
    //
    // This function is called for GroupBy2 to pass the additional grouping keys introduced by
    // GroupBy1 for the grouping set (corresponding to the rollup).
@@ -2834,9 +2838,9 @@ public class SemanticAnalyzer extends Ba
        Map<String, ExprNodeDesc> colExprMap) throws SemanticException {
      // For grouping sets, add a dummy grouping key
      String groupingSetColumnName =
- groupByInputRowResolver.get(null, VirtualColumn.GROUPINGID.getName()).getInternalName();
+ groupByInputRowResolver.get(null, VirtualColumn.GROUPINGID.getName()).getInternalName();
      ExprNodeDesc inputExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
- groupingSetColumnName, null, false);
+ groupingSetColumnName, null, false);
      groupByKeys.add(inputExpr);

      String field = getColumnInternalName(groupByKeys.size() - 1);
@@ -2854,7 +2858,7 @@ public class SemanticAnalyzer extends Ba
    // For eg: consider: select key, value, count(1) from T group by key, value with rollup.
    // Assuming map-side aggregation and no skew, the plan would look like:
    //
- // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink
+ // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink
    //
    // This function is called for ReduceSink to add the additional grouping keys introduced by
    // GroupBy1 into the reduce keys.
@@ -2865,16 +2869,16 @@ public class SemanticAnalyzer extends Ba
        Map<String, ExprNodeDesc> colExprMap) throws SemanticException {
      // add a key for reduce sink
      String groupingSetColumnName =
- reduceSinkInputRowResolver.get(null, VirtualColumn.GROUPINGID.getName()).getInternalName();
+ reduceSinkInputRowResolver.get(null, VirtualColumn.GROUPINGID.getName()).getInternalName();
      ExprNodeDesc inputExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
- groupingSetColumnName, null, false);
+ groupingSetColumnName, null, false);
      reduceKeys.add(inputExpr);

      outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
      String field = Utilities.ReduceField.KEY.toString() + "."
- + getColumnInternalName(reduceKeys.size() - 1);
+ + getColumnInternalName(reduceKeys.size() - 1);
      ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
- reduceKeys.size() - 1).getTypeInfo(), null, true);
+ reduceKeys.size() - 1).getTypeInfo(), null, true);
      reduceSinkOutputRowResolver.put(null, VirtualColumn.GROUPINGID.getName(), colInfo);
      colExprMap.put(colInfo.getInternalName(), inputExpr);
    }
@@ -2891,6 +2895,12 @@ public class SemanticAnalyzer extends Ba
     * genericUDAFEvaluator.
     * @param distPartAggr
     * partial aggregation for distincts
+ * @param groupingSets
+ * list of grouping sets
+ * @param groupingSetsPresent
+ * whether grouping sets are present in this query
+ * @param groupingSetsConsumedCurrentMR
+ * whether grouping sets are consumed by this group by
     * @return the new GroupByOperator
     */
    @SuppressWarnings("nls")
@@ -2898,7 +2908,9 @@ public class SemanticAnalyzer extends Ba
        String dest, Operator reduceSinkOperatorInfo, GroupByDesc.Mode mode,
        Map<String, GenericUDAFEvaluator> genericUDAFEvaluators,
        boolean distPartAgg,
- boolean groupingSetsPresent) throws SemanticException {
+ List<Integer> groupingSets,
+ boolean groupingSetsPresent,
+ boolean groupingSetsNeedAdditionalMRJob) throws SemanticException {
      ArrayList<String> outputColumnNames = new ArrayList<String>();
      RowResolver groupByInputRowResolver = opParseCtx
          .get(reduceSinkOperatorInfo).getRowResolver();
@@ -2926,14 +2938,39 @@ public class SemanticAnalyzer extends Ba
        colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
      }

+ // This is only needed if a new grouping set key is being created
+ int groupingSetsPosition = 0;
+
      // For grouping sets, add a dummy grouping key
      if (groupingSetsPresent) {
- addGroupingSetKey(
- groupByKeys,
- groupByInputRowResolver,
- groupByOutputRowResolver,
- outputColumnNames,
- colExprMap);
+ // Consider the query: select a,b, count(1) from T group by a,b with cube;
+ // where it is being executed in a single map-reduce job
+ // The plan is TableScan -> GroupBy1 -> ReduceSink -> GroupBy2 -> FileSink
+ // GroupBy1 already added the grouping id as part of the row
+ // This function is called for GroupBy2 to add grouping id as part of the groupby keys
+ if (!groupingSetsNeedAdditionalMRJob) {
+ addGroupingSetKey(
+ groupByKeys,
+ groupByInputRowResolver,
+ groupByOutputRowResolver,
+ outputColumnNames,
+ colExprMap);
+ }
+ else {
+ groupingSetsPosition = groupByKeys.size();
+ // The grouping set has not yet been processed. Create a new grouping key
+ // Consider the query: select a,b, count(1) from T group by a,b with cube;
+ // where it is being executed in 2 map-reduce jobs
+ // The plan for 1st MR is TableScan -> GroupBy1 -> ReduceSink -> GroupBy2 -> FileSink
+ // GroupBy1/ReduceSink worked as if grouping sets were not present
+ // This function is called for GroupBy2 to create new rows for grouping sets
+ // For each input row (a,b), 4 rows are created for the example above:
+ // (a,b), (a,null), (null, b), (null, null)
+ createNewGroupingKey(groupByKeys,
+ outputColumnNames,
+ groupByOutputRowResolver,
+ colExprMap);
+ }
      }

      HashMap<String, ASTNode> aggregationTrees = parseInfo
@@ -2946,9 +2983,9 @@ public class SemanticAnalyzer extends Ba
        List<String> inputKeyCols = ((ReduceSinkDesc)
            reduceSinkOperatorInfo.getConf()).getOutputKeyColumnNames();
        if (inputKeyCols.size() > 0) {
- lastKeyColName = inputKeyCols.get(inputKeyCols.size()-1);
+ lastKeyColName = inputKeyCols.get(inputKeyCols.size() - 1);
        }
- reduceValues = ((ReduceSinkDesc)reduceSinkOperatorInfo.getConf()).getValueCols();
+ reduceValues = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getValueCols();
      }
      int numDistinctUDFs = 0;
      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
@@ -2973,7 +3010,7 @@ public class SemanticAnalyzer extends Ba
          for (int i = 1; i < value.getChildCount(); i++) {
            ASTNode paraExpr = (ASTNode) value.getChild(i);
            ColumnInfo paraExprInfo =
- groupByInputRowResolver.getExpression(paraExpr);
+ groupByInputRowResolver.getExpression(paraExpr);
            if (paraExprInfo == null) {
              throw new SemanticException(ErrorMsg.INVALID_COLUMN
                  .getMsg(paraExpr));
@@ -2985,8 +3022,8 @@ public class SemanticAnalyzer extends Ba
              // if aggr is distinct, the parameter is name is constructed as
              // KEY.lastKeyColName:<tag>._colx
              paraExpression = Utilities.ReduceField.KEY.name() + "." +
- lastKeyColName + ":" + numDistinctUDFs + "."
- + getColumnInternalName(i-1);
+ lastKeyColName + ":" + numDistinctUDFs + "."
+ + getColumnInternalName(i - 1);

            }

@@ -3013,7 +3050,7 @@ public class SemanticAnalyzer extends Ba
          assert (paraExpression != null);
          aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(),
              paraExpression, paraExprInfo.getTabAlias(), paraExprInfo
- .getIsVirtualCol()));
+ .getIsVirtualCol()));
        }
        if (isDistinct) {
          numDistinctUDFs++;
@@ -3044,20 +3081,50 @@ public class SemanticAnalyzer extends Ba
            field, udaf.returnType, "", false));
      }
      float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
- float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+ float memoryThreshold = HiveConf
+ .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);

- // Nothing special needs to be done for grouping sets.
- // This is the final group by operator, so multiple rows corresponding to the
+ // Nothing special needs to be done for grouping sets if
+ // this is the final group by operator, and multiple rows corresponding to the
      // grouping sets have been generated upstream.
+ // However, if an addition MR job has been created to handle grouping sets,
+ // additional rows corresponding to grouping sets need to be created here.
      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- distPartAgg,groupByMemoryUsage,memoryThreshold, null, false, 0),
- new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo),
- groupByOutputRowResolver);
+ new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
+ distPartAgg, groupByMemoryUsage, memoryThreshold,
+ groupingSets,
+ groupingSetsPresent && groupingSetsNeedAdditionalMRJob,
+ groupingSetsPosition),
+ new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo),
+ groupByOutputRowResolver);
      op.setColumnExprMap(colExprMap);
      return op;
    }

+ /*
+ * Create a new grouping key for grouping id.
+ * A dummy grouping id. is added. At runtime, the group by operator
+ * creates 'n' rows per input row, where 'n' is the number of grouping sets.
+ */
+ private void createNewGroupingKey(List<ExprNodeDesc> groupByKeys,
+ List<String> outputColumnNames,
+ RowResolver groupByOutputRowResolver,
+ Map<String, ExprNodeDesc> colExprMap) {
+ // The value for the constant does not matter. It is replaced by the grouping set
+ // value for the actual implementation
+ ExprNodeConstantDesc constant = new ExprNodeConstantDesc("0");
+ groupByKeys.add(constant);
+ String field = getColumnInternalName(groupByKeys.size() - 1);
+ outputColumnNames.add(field);
+ groupByOutputRowResolver.put(null, VirtualColumn.GROUPINGID.getName(),
+ new ColumnInfo(
+ field,
+ TypeInfoFactory.stringTypeInfo,
+ null,
+ true));
+ colExprMap.put(field, constant);
+ }
+
    /**
     * Generate the map-side GroupByOperator for the Query Block
     * (qb.getParseInfo().getXXX(dest)). The new GroupByOperator will be a child
@@ -3093,13 +3160,13 @@ public class SemanticAnalyzer extends Ba
      for (int i = 0; i < grpByExprs.size(); ++i) {
        ASTNode grpbyExpr = grpByExprs.get(i);
        ExprNodeDesc grpByExprNode = genExprNodeDesc(grpbyExpr,
- groupByInputRowResolver);
+ groupByInputRowResolver);

        groupByKeys.add(grpByExprNode);
        String field = getColumnInternalName(i);
        outputColumnNames.add(field);
        groupByOutputRowResolver.putExpression(grpbyExpr,
- new ColumnInfo(field, grpByExprNode.getTypeInfo(), "", false));
+ new ColumnInfo(field, grpByExprNode.getTypeInfo(), "", false));
        colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
      }

@@ -3111,48 +3178,38 @@ public class SemanticAnalyzer extends Ba
      // For eg: consider: select key, value, count(1) from T group by key, value with rollup.
      // Assuming map-side aggregation and no skew, the plan would look like:
      //
- // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink
+ // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink
      //
      // This function is called for GroupBy1 to create an additional grouping key
      // for the grouping set (corresponding to the rollup).
      if (groupingSetsPresent) {
- // The value for the constant does not matter. It is replaced by the grouping set
- // value for the actual implementation
- ExprNodeConstantDesc constant = new ExprNodeConstantDesc("0");
- groupByKeys.add(constant);
- String field = getColumnInternalName(groupByKeys.size() - 1);
- outputColumnNames.add(field);
- groupByOutputRowResolver.put(null, VirtualColumn.GROUPINGID.getName(),
- new ColumnInfo(
- field,
- TypeInfoFactory.stringTypeInfo,
- null,
- true));
- colExprMap.put(field, constant);
+ createNewGroupingKey(groupByKeys,
+ outputColumnNames,
+ groupByOutputRowResolver,
+ colExprMap);
      }

      // If there is a distinctFuncExp, add all parameters to the reduceKeys.
      if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) {
        List<ASTNode> list = parseInfo.getDistinctFuncExprsForClause(dest);
- for(ASTNode value: list) {
+ for (ASTNode value : list) {
          // 0 is function name
          for (int i = 1; i < value.getChildCount(); i++) {
            ASTNode parameter = (ASTNode) value.getChild(i);
            if (groupByOutputRowResolver.getExpression(parameter) == null) {
              ExprNodeDesc distExprNode = genExprNodeDesc(parameter,
- groupByInputRowResolver);
+ groupByInputRowResolver);
              groupByKeys.add(distExprNode);
- String field = getColumnInternalName(groupByKeys.size()-1);
+ String field = getColumnInternalName(groupByKeys.size() - 1);
              outputColumnNames.add(field);
              groupByOutputRowResolver.putExpression(parameter, new ColumnInfo(
- field, distExprNode.getTypeInfo(), "", false));
+ field, distExprNode.getTypeInfo(), "", false));
              colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
            }
          }
        }
      }

-
      // For each aggregation
      HashMap<String, ASTNode> aggregationTrees = parseInfo
          .getAggregationExprsForClause(dest);
@@ -3167,7 +3224,7 @@ public class SemanticAnalyzer extends Ba
        for (int i = 1; i < value.getChildCount(); i++) {
          ASTNode paraExpr = (ASTNode) value.getChild(i);
          ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr,
- groupByInputRowResolver);
+ groupByInputRowResolver);

          aggParameters.add(paraExprNode);
        }
@@ -3196,13 +3253,14 @@ public class SemanticAnalyzer extends Ba
        }
      }
      float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
- float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+ float memoryThreshold = HiveConf
+ .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false,groupByMemoryUsage,memoryThreshold,
- groupingSetKeys, groupingSetsPresent, groupingSetsPosition),
- new RowSchema(groupByOutputRowResolver.getColumnInfos()),
- inputOperatorInfo), groupByOutputRowResolver);
+ new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
+ false, groupByMemoryUsage, memoryThreshold,
+ groupingSetKeys, groupingSetsPresent, groupingSetsPosition),
+ new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+ inputOperatorInfo), groupByOutputRowResolver);
      op.setColumnExprMap(colExprMap);
      return op;
    }
@@ -3244,18 +3302,18 @@ public class SemanticAnalyzer extends Ba
      List<String> outputValueColumnNames = new ArrayList<String>();

      ArrayList<ExprNodeDesc> reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest,
- reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
- colExprMap);
+ reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+ colExprMap);

      // add a key for reduce sink
      if (groupingSetsPresent) {
        // Process grouping set for the reduce sink operator
        processGroupingSetReduceSinkOperator(
- reduceSinkInputRowResolver,
- reduceSinkOutputRowResolver,
- reduceKeys,
- outputKeyColumnNames,
- colExprMap);
+ reduceSinkInputRowResolver,
+ reduceSinkOutputRowResolver,
+ reduceKeys,
+ outputKeyColumnNames,
+ colExprMap);

        if (changeNumPartitionFields) {
          numPartitionFields++;
@@ -3292,14 +3350,14 @@ public class SemanticAnalyzer extends Ba
      }

      ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(
- PlanUtils.getReduceSinkDesc(reduceKeys,
- groupingSetsPresent ? grpByExprs.size() + 1 : grpByExprs.size(),
- reduceValues, distinctColIndices,
- outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
- numReducers),
- new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), inputOperatorInfo),
- reduceSinkOutputRowResolver);
+ OperatorFactory.getAndMakeChild(
+ PlanUtils.getReduceSinkDesc(reduceKeys,
+ groupingSetsPresent ? grpByExprs.size() + 1 : grpByExprs.size(),
+ reduceValues, distinctColIndices,
+ outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
+ numReducers),
+ new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), inputOperatorInfo),
+ reduceSinkOutputRowResolver);
      rsOp.setColumnExprMap(colExprMap);
      return rsOp;
    }
@@ -3314,7 +3372,7 @@ public class SemanticAnalyzer extends Ba
      for (int i = 0; i < grpByExprs.size(); ++i) {
        ASTNode grpbyExpr = grpByExprs.get(i);
        ExprNodeDesc inputExpr = genExprNodeDesc(grpbyExpr,
- reduceSinkInputRowResolver);
+ reduceSinkInputRowResolver);
        reduceKeys.add(inputExpr);
        if (reduceSinkOutputRowResolver.getExpression(grpbyExpr) == null) {
          outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
@@ -3333,7 +3391,8 @@ public class SemanticAnalyzer extends Ba
      return reduceKeys;
    }

- private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest,
+ private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo,
+ String dest,
        List<ExprNodeDesc> reduceKeys, RowResolver reduceSinkInputRowResolver,
        RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames)
        throws SemanticException {
@@ -3369,8 +3428,8 @@ public class SemanticAnalyzer extends Ba
            distinctIndices.add(ri);
            String name = getColumnInternalName(numExprs);
            String field = Utilities.ReduceField.KEY.toString() + "." + colName
- + ":" + i
- + "." + name;
+ + ":" + i
+ + "." + name;
            ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false);
            reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
            numExprs++;
@@ -3470,10 +3529,10 @@ public class SemanticAnalyzer extends Ba

      ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
          OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
- grpByExprs.size(), reduceValues, distinctColIndices,
- outputKeyColumnNames, outputValueColumnNames, true, -1, grpByExprs.size(),
- -1), new RowSchema(reduceSinkOutputRowResolver
- .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
+ grpByExprs.size(), reduceValues, distinctColIndices,
+ outputKeyColumnNames, outputValueColumnNames, true, -1, grpByExprs.size(),
+ -1), new RowSchema(reduceSinkOutputRowResolver
+ .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
      rsOp.setColumnExprMap(colExprMap);
      return rsOp;
    }
@@ -3494,10 +3553,10 @@ public class SemanticAnalyzer extends Ba
        return nodes;
      }
      for (int i = 0; i < node.getChildCount(); i++) {
- ASTNode child = (ASTNode)node.getChild(i);
+ ASTNode child = (ASTNode) node.getChild(i);
        if (child.getType() == HiveParser.TOK_TABLE_OR_COL && child.getChild(0) != null &&
            inputRR.get(null,
- BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText())) != null) {
+ BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText())) != null) {
          nodes.add(child);
        } else {
          nodes.addAll(getColumnExprsFromASTNode(child, inputRR));
@@ -3558,11 +3617,11 @@ public class SemanticAnalyzer extends Ba
        // Note that partitioning fields dont need to change, since it is either
        // partitioned randomly, or by all grouping keys + distinct keys
        processGroupingSetReduceSinkOperator(
- reduceSinkInputRowResolver2,
- reduceSinkOutputRowResolver2,
- reduceKeys,
- outputColumnNames,
- colExprMap);
+ reduceSinkInputRowResolver2,
+ reduceSinkOutputRowResolver2,
+ reduceKeys,
+ outputColumnNames,
+ colExprMap);
      }

      // Get partial aggregation results and store in reduceValues
@@ -3586,9 +3645,9 @@ public class SemanticAnalyzer extends Ba

      ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
          OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
- reduceValues, outputColumnNames, true, -1, numPartitionFields,
- numReducers), new RowSchema(reduceSinkOutputRowResolver2
- .getColumnInfos()), groupByOperatorInfo),
+ reduceValues, outputColumnNames, true, -1, numPartitionFields,
+ numReducers), new RowSchema(reduceSinkOutputRowResolver2
+ .getColumnInfos()), groupByOperatorInfo),
          reduceSinkOutputRowResolver2);

      rsOp.setColumnExprMap(colExprMap);
@@ -3645,11 +3704,11 @@ public class SemanticAnalyzer extends Ba
      // For grouping sets, add a dummy grouping key
      if (groupingSetsPresent) {
        addGroupingSetKey(
- groupByKeys,
- groupByInputRowResolver2,
- groupByOutputRowResolver2,
- outputColumnNames,
- colExprMap);
+ groupByKeys,
+ groupByInputRowResolver2,
+ groupByOutputRowResolver2,
+ outputColumnNames,
+ colExprMap);
      }

      HashMap<String, ASTNode> aggregationTrees = parseInfo
@@ -3665,7 +3724,7 @@ public class SemanticAnalyzer extends Ba
        assert (paraExpression != null);
        aggParameters.add(new ExprNodeColumnDesc(paraExprInfo.getType(),
            paraExpression, paraExprInfo.getTabAlias(), paraExprInfo
- .getIsVirtualCol()));
+ .getIsVirtualCol()));

        String aggName = unescapeIdentifier(value.getChild(0).getText());

@@ -3683,7 +3742,7 @@ public class SemanticAnalyzer extends Ba
                udaf.genericUDAFEvaluator,
                udaf.convertedParameters,
                (mode != GroupByDesc.Mode.FINAL && value.getToken().getType() ==
- HiveParser.TOK_FUNCTIONDI),
+ HiveParser.TOK_FUNCTIONDI),
                amode));
        String field = getColumnInternalName(groupByKeys.size()
            + aggregations.size() - 1);
@@ -3692,13 +3751,14 @@ public class SemanticAnalyzer extends Ba
            field, udaf.returnType, "", false));
      }
      float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
- float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+ float memoryThreshold = HiveConf
+ .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);

      Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false,groupByMemoryUsage,memoryThreshold, null, false, 0),
- new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
- reduceSinkOperatorInfo2), groupByOutputRowResolver2);
+ new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
+ false, groupByMemoryUsage, memoryThreshold, null, false, 0),
+ new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
+ reduceSinkOperatorInfo2), groupByOutputRowResolver2);
      op.setColumnExprMap(colExprMap);
      return op;
    }
@@ -3736,7 +3796,7 @@ public class SemanticAnalyzer extends Ba

      int numReducers = -1;
      ObjectPair<List<ASTNode>, List<Integer>> grpByExprsGroupingSets =
- getGroupByGroupingSetsForClause(parseInfo, dest);
+ getGroupByGroupingSetsForClause(parseInfo, dest);

      List<ASTNode> grpByExprs = grpByExprsGroupingSets.getFirst();
      List<Integer> groupingSets = grpByExprsGroupingSets.getSecond();
@@ -3752,15 +3812,15 @@ public class SemanticAnalyzer extends Ba

      // ////// 1. Generate ReduceSinkOperator
      Operator reduceSinkOperatorInfo =
- genGroupByPlanReduceSinkOperator(qb,
- dest,
- input,
- grpByExprs,
- grpByExprs.size(),
- false,
- numReducers,
- false,
- false);
+ genGroupByPlanReduceSinkOperator(qb,
+ dest,
+ input,
+ grpByExprs,
+ grpByExprs.size(),
+ false,
+ numReducers,
+ false,
+ false);

      // ////// 2. Generate GroupbyOperator
      Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
@@ -3788,7 +3848,7 @@ public class SemanticAnalyzer extends Ba
        if (whereExpr != null) {
          OpParseContext inputCtx = opParseCtx.get(input);
          RowResolver inputRR = inputCtx.getRowResolver();
- ExprNodeDesc current = genExprNodeDesc((ASTNode)whereExpr.getChild(0), inputRR);
+ ExprNodeDesc current = genExprNodeDesc((ASTNode) whereExpr.getChild(0), inputRR);

          // Check the list of where expressions already added so they aren't duplicated
          ExprNodeDesc.ExprNodeDescEqualityWrapper currentWrapped =
@@ -3825,8 +3885,8 @@ public class SemanticAnalyzer extends Ba
        FilterDesc orFilterDesc = new FilterDesc(previous, false);

        selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild(
- orFilterDesc, new RowSchema(
- inputRR.getColumnInfos()), input), inputRR);
+ orFilterDesc, new RowSchema(
+ inputRR.getColumnInfos()), input), inputRR);
      }

      // insert a select operator here used by the ColumnPruner to reduce
@@ -3905,20 +3965,21 @@ public class SemanticAnalyzer extends Ba

      // ////// Generate GroupbyOperator for a map-side partial aggregation
      Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
- new LinkedHashMap<String, GenericUDAFEvaluator>();
+ new LinkedHashMap<String, GenericUDAFEvaluator>();

      QBParseInfo parseInfo = qb.getParseInfo();

      // ////// 2. Generate GroupbyOperator
      Operator groupByOperatorInfo = genGroupByPlanGroupByOperator1(parseInfo,
- dest, input, GroupByDesc.Mode.HASH, genericUDAFEvaluators, true, false);
+ dest, input, GroupByDesc.Mode.HASH, genericUDAFEvaluators, true,
+ null, false, false);

      int numReducers = -1;
      List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);

      // ////// 3. Generate ReduceSinkOperator2
      Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
- parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false);
+ parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false);

      // ////// 4. Generate GroupbyOperator2
      Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(parseInfo,
@@ -3976,7 +4037,7 @@ public class SemanticAnalyzer extends Ba
      QBParseInfo parseInfo = qb.getParseInfo();

      ObjectPair<List<ASTNode>, List<Integer>> grpByExprsGroupingSets =
- getGroupByGroupingSetsForClause(parseInfo, dest);
+ getGroupByGroupingSetsForClause(parseInfo, dest);

      List<ASTNode> grpByExprs = grpByExprsGroupingSets.getFirst();
      List<Integer> groupingSets = grpByExprsGroupingSets.getSecond();
@@ -3996,22 +4057,22 @@ public class SemanticAnalyzer extends Ba
      // operator. We set the numPartitionColumns to -1 for this purpose. This is
      // captured by WritableComparableHiveObject.hashCode() function.
      Operator reduceSinkOperatorInfo =
- genGroupByPlanReduceSinkOperator(qb,
- dest,
- input,
- grpByExprs,
- (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? -1 : Integer.MAX_VALUE),
- false,
- -1,
- false,
- false);
+ genGroupByPlanReduceSinkOperator(qb,
+ dest,
+ input,
+ grpByExprs,
+ (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? -1 : Integer.MAX_VALUE),
+ false,
+ -1,
+ false,
+ false);

      // ////// 2. Generate GroupbyOperator
      Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
- new LinkedHashMap<String, GenericUDAFEvaluator>();
+ new LinkedHashMap<String, GenericUDAFEvaluator>();
      GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanGroupByOperator(
- parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1,
- genericUDAFEvaluators);
+ parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1,
+ genericUDAFEvaluators);

      int numReducers = -1;
      if (grpByExprs.isEmpty()) {
@@ -4020,12 +4081,12 @@ public class SemanticAnalyzer extends Ba

      // ////// 3. Generate ReduceSinkOperator2
      Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
- parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false);
+ parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false);

      // ////// 4. Generate GroupbyOperator2
      Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(parseInfo,
- dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL,
- genericUDAFEvaluators, false);
+ dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL,
+ genericUDAFEvaluators, false);

      return groupByOperatorInfo2;
    }
@@ -4046,13 +4107,13 @@ public class SemanticAnalyzer extends Ba
    static private void extractColumns(Set<String> colNamesExprs,
        ExprNodeDesc exprNode) throws SemanticException {
      if (exprNode instanceof ExprNodeColumnDesc) {
- colNamesExprs.add(((ExprNodeColumnDesc)exprNode).getColumn());
+ colNamesExprs.add(((ExprNodeColumnDesc) exprNode).getColumn());
        return;
      }

      if (exprNode instanceof ExprNodeGenericFuncDesc) {
- ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc)exprNode;
- for (ExprNodeDesc childExpr: funcDesc.getChildExprs()) {
+ ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) exprNode;
+ for (ExprNodeDesc childExpr : funcDesc.getChildExprs()) {
          extractColumns(colNamesExprs, childExpr);
        }
      }
@@ -4071,7 +4132,7 @@ public class SemanticAnalyzer extends Ba
    private void checkExpressionsForGroupingSet(List<ASTNode> grpByExprs,
        List<ASTNode> distinctGrpByExprs,
        Map<String, ASTNode> aggregationTrees,
- RowResolver inputRowResolver) throws SemanticException {
+ RowResolver inputRowResolver) throws SemanticException {

      Set<String> colNamesGroupByExprs = new HashSet<String>();
      Set<String> colNamesGroupByDistinctExprs = new HashSet<String>();
@@ -4085,7 +4146,7 @@ public class SemanticAnalyzer extends Ba

      // If there is a distinctFuncExp, add all parameters to the reduceKeys.
      if (!distinctGrpByExprs.isEmpty()) {
- for(ASTNode value: distinctGrpByExprs) {
+ for (ASTNode value : distinctGrpByExprs) {
          // 0 is function name
          for (int i = 1; i < value.getChildCount(); i++) {
            ASTNode parameter = (ASTNode) value.getChild(i);
@@ -4095,8 +4156,7 @@ public class SemanticAnalyzer extends Ba
          }

          if (hasCommonElement(colNamesGroupByExprs, colNamesGroupByDistinctExprs)) {
- throw
- new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg());
+ throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg());
          }
        }
      }
@@ -4114,8 +4174,7 @@ public class SemanticAnalyzer extends Ba
        }

        if (hasCommonElement(colNamesGroupByExprs, colNamesAggregateParameters)) {
- throw
- new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg());
+ throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg());
        }
      }
    }
@@ -4127,52 +4186,92 @@ public class SemanticAnalyzer extends Ba
     * spray by the group by key, and sort by the distinct key (if any), and
     * compute aggregates based on actual aggregates
     *
- * The agggregation evaluation functions are as follows: Mapper:
- * iterate/terminatePartial (mode = HASH)
+ * The agggregation evaluation functions are as follows:
     *
+ * No grouping sets:
+ * Group By Operator:
+ * grouping keys: group by expressions if no DISTINCT
+ * grouping keys: group by expressions + distinct keys if DISTINCT
+ * Mapper: iterate/terminatePartial (mode = HASH)
     * Partitioning Key: grouping key
+ * Sorting Key: grouping key if no DISTINCT
+ * grouping + distinct key if DISTINCT
+ * Reducer: iterate/terminate if DISTINCT
+ * merge/terminate if NO DISTINCT (mode MERGEPARTIAL)
+ *
+ * Grouping Sets:
+ * Group By Operator:
+ * grouping keys: group by expressions + grouping id. if no DISTINCT
+ * grouping keys: group by expressions + grouping id. + distinct keys if DISTINCT
+ * Mapper: iterate/terminatePartial (mode = HASH)
+ * Partitioning Key: grouping key + grouping id.
+ * Sorting Key: grouping key + grouping id. if no DISTINCT
+ * grouping + grouping id. + distinct key if DISTINCT
+ * Reducer: iterate/terminate if DISTINCT
+ * merge/terminate if NO DISTINCT (mode MERGEPARTIAL)
+ *
+ * Grouping Sets with an additional MR job introduced (distincts are not allowed):
+ * Group By Operator:
+ * grouping keys: group by expressions
+ * Mapper: iterate/terminatePartial (mode = HASH)
+ * Partitioning Key: grouping key
+ * Sorting Key: grouping key
+ * Reducer: merge/terminate (mode MERGEPARTIAL)
+ * Group by Operator:
+ * grouping keys: group by expressions + add a new grouping id. key
     *
- * Sorting Key: grouping key if no DISTINCT grouping + distinct key if
- * DISTINCT
- *
- * Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode
- * = MERGEPARTIAL)
+ * STAGE 2
+ * Partitioning Key: grouping key + grouping id.
+ * Sorting Key: grouping key + grouping id.
+ * Reducer: merge/terminate (mode = FINAL)
+ * Group by Operator:
+ * grouping keys: group by expressions + grouping id.
     */
    @SuppressWarnings("nls")
- private Operator genGroupByPlanMapAggr1MR(String dest, QB qb,
+ private Operator genGroupByPlanMapAggrNoSkew(String dest, QB qb,
        Operator inputOperatorInfo) throws SemanticException {

      QBParseInfo parseInfo = qb.getParseInfo();
      ObjectPair<List<ASTNode>, List<Integer>> grpByExprsGroupingSets =
- getGroupByGroupingSetsForClause(parseInfo, dest);
+ getGroupByGroupingSetsForClause(parseInfo, dest);

      List<ASTNode> grpByExprs = grpByExprsGroupingSets.getFirst();
      List<Integer> groupingSets = grpByExprsGroupingSets.getSecond();
      boolean groupingSetsPresent = !groupingSets.isEmpty();

+ int newMRJobGroupingSetsThreshold =
+ conf.getIntVar(HiveConf.ConfVars.HIVE_NEW_JOB_GROUPING_SET_CARDINALITY);
+
      if (groupingSetsPresent) {
        checkExpressionsForGroupingSet(grpByExprs,
- parseInfo.getDistinctFuncExprsForClause(dest),
- parseInfo.getAggregationExprsForClause(dest),
- opParseCtx.get(inputOperatorInfo).getRowResolver());
+ parseInfo.getDistinctFuncExprsForClause(dest),
+ parseInfo.getAggregationExprsForClause(dest),
+ opParseCtx.get(inputOperatorInfo).getRowResolver());
      }

      // ////// Generate GroupbyOperator for a map-side partial aggregation
      Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
- new LinkedHashMap<String, GenericUDAFEvaluator>();
+ new LinkedHashMap<String, GenericUDAFEvaluator>();
+
+ // Is the grouping sets data consumed in the current in MR job, or
+ // does it need an additional MR job
+ boolean groupingSetsNeedAdditionalMRJob =
+ groupingSetsPresent && groupingSets.size() > newMRJobGroupingSetsThreshold ?
+ true : false;
+
      GroupByOperator groupByOperatorInfo =
- (GroupByOperator) genGroupByPlanMapGroupByOperator(
- qb,
- dest,
- grpByExprs,
- inputOperatorInfo,
- GroupByDesc.Mode.HASH,
- genericUDAFEvaluators,
- groupingSets,
- groupingSetsPresent);
+ (GroupByOperator) genGroupByPlanMapGroupByOperator(
+ qb,
+ dest,
+ grpByExprs,
+ inputOperatorInfo,
+ GroupByDesc.Mode.HASH,
+ genericUDAFEvaluators,
+ groupingSets,
+ groupingSetsPresent && !groupingSetsNeedAdditionalMRJob);

      groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get(
- inputOperatorInfo).getRowResolver().getTableNames());
+ inputOperatorInfo).getRowResolver().getTableNames());
      int numReducers = -1;

      // Optimize the scenario when there are no grouping keys - only 1 reducer is
@@ -4182,27 +4281,64 @@ public class SemanticAnalyzer extends Ba
      }

      // ////// Generate ReduceSink Operator
+ boolean isDistinct = !qb.getParseInfo().getDistinctFuncExprsForClause(dest).isEmpty();
+
+ // Distincts are not allowed with an additional mr job
+ if (groupingSetsNeedAdditionalMRJob && isDistinct) {
+ String errorMsg = "The number of rows per input row due to grouping sets is "
+ + groupingSets.size();
+ throw new SemanticException(
+ ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_DISTINCTS.getMsg(errorMsg));
+ }
+
      Operator reduceSinkOperatorInfo =
- genGroupByPlanReduceSinkOperator(qb,
- dest,
- groupByOperatorInfo,
- grpByExprs,
- grpByExprs.size(),
- true,
- numReducers,
- true,
- groupingSetsPresent);
-
- // This is a 1-stage map-reduce processing of the groupby. Tha map-side
- // aggregates was just used to
- // reduce output data. In case of distincts, partial results are not used,
- // and so iterate is again
- // invoked on the reducer. In case of non-distincts, partial results are
- // used, and merge is invoked
- // on the reducer.
- return genGroupByPlanGroupByOperator1(parseInfo, dest,
- reduceSinkOperatorInfo, GroupByDesc.Mode.MERGEPARTIAL,
- genericUDAFEvaluators, false, groupingSetsPresent);
+ genGroupByPlanReduceSinkOperator(qb,
+ dest,
+ groupByOperatorInfo,
+ grpByExprs,
+ grpByExprs.size(),
+ true,
+ numReducers,
+ true,
+ groupingSetsPresent && !groupingSetsNeedAdditionalMRJob);
+
+ // Does it require a new MR job for grouping sets
+ if (!groupingSetsPresent || !groupingSetsNeedAdditionalMRJob) {
+ // This is a 1-stage map-reduce processing of the groupby. Tha map-side
+ // aggregates was just used to
+ // reduce output data. In case of distincts, partial results are not used,
+ // and so iterate is again
+ // invoked on the reducer. In case of non-distincts, partial results are
+ // used, and merge is invoked
+ // on the reducer.
+ return genGroupByPlanGroupByOperator1(parseInfo, dest,
+ reduceSinkOperatorInfo, GroupByDesc.Mode.MERGEPARTIAL,
+ genericUDAFEvaluators, false,
+ groupingSets, groupingSetsPresent, groupingSetsNeedAdditionalMRJob);
+ }
+ else
+ {
+ // Add 'n' rows corresponding to the grouping sets. For each row, create 'n' rows,
+ // one for each grouping set key. Since map-side aggregation has already been performed,
+ // the number of rows would have been reduced. Moreover, the rows corresponding to the
+ // grouping keys come together, so there is a higher chance of finding the rows in the hash
+ // table.
+ Operator groupByOperatorInfo2 =
+ genGroupByPlanGroupByOperator1(parseInfo, dest,
+ reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS,
+ genericUDAFEvaluators, false,
+ groupingSets, groupingSetsPresent, groupingSetsNeedAdditionalMRJob);
+
+ // ////// Generate ReduceSinkOperator2
+ Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
+ parseInfo, dest, groupByOperatorInfo2, grpByExprs.size() + 1, numReducers,
+ groupingSetsPresent);
+
+ // ////// Generate GroupbyOperator3
+ return genGroupByPlanGroupByOperator2MR(parseInfo, dest,
+ reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL,
+ genericUDAFEvaluators, groupingSetsPresent);
+ }
    }

    /**
@@ -4215,25 +4351,48 @@ public class SemanticAnalyzer extends Ba
     * distinct is present) in hope of getting a uniform distribution, and compute
     * partial aggregates grouped by the reduction key (grouping key + distinct
     * key). Evaluate partial aggregates first, and spray by the grouping key to
- * compute actual aggregates in the second phase. The agggregation evaluation
- * functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
- *
- * Partitioning Key: random() if no DISTINCT grouping + distinct key if
- * DISTINCT
+ * compute actual aggregates in the second phase.
     *
- * Sorting Key: grouping key if no DISTINCT grouping + distinct key if
- * DISTINCT
+ * The agggregation evaluation functions are as follows:
     *
- * Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO
- * DISTINCT (mode = MERGEPARTIAL)
+ * No grouping sets:
+ * STAGE 1
+ * Group by Operator:
+ * grouping keys: group by expressions if no DISTINCT
+ * grouping keys: group by expressions + distinct keys if DISTINCT
+ * Mapper: iterate/terminatePartial (mode = HASH)
+ * Partitioning Key: random() if no DISTINCT
+ * grouping + distinct key if DISTINCT
+ * Sorting Key: grouping key if no DISTINCT
+ * grouping + distinct key if DISTINCT
+ * Reducer: iterate/terminatePartial if DISTINCT
+ * merge/terminatePartial if NO DISTINCT (mode = MERGEPARTIAL)
+ * Group by Operator:
+ * grouping keys: group by expressions
     *
     * STAGE 2
+ * Partitioning Key: grouping key
+ * Sorting Key: grouping key
+ * Reducer: merge/terminate (mode = FINAL)
     *
- * Partitioining Key: grouping key
- *
- * Sorting Key: grouping key if no DISTINCT grouping + distinct key if
- * DISTINCT
+ * In the presence of grouping sets, the agggregation evaluation functions are as follows:
+ * STAGE 1
+ * Group by Operator:
+ * grouping keys: group by expressions + grouping id. if no DISTINCT
+ * grouping keys: group by expressions + + grouping id. + distinct keys if DISTINCT
+ * Mapper: iterate/terminatePartial (mode = HASH)
+ * Partitioning Key: random() if no DISTINCT
+ * grouping + grouping id. + distinct key if DISTINCT
+ * Sorting Key: grouping key + grouping id. if no DISTINCT
+ * grouping + grouping id. + distinct key if DISTINCT
+ * Reducer: iterate/terminatePartial if DISTINCT
+ * merge/terminatePartial if NO DISTINCT (mode = MERGEPARTIAL)
+ * Group by Operator:
+ * grouping keys: group by expressions + grouping id.
     *
+ * STAGE 2
+ * Partitioning Key: grouping key
+ * Sorting Key: grouping key + grouping id.
     * Reducer: merge/terminate (mode = FINAL)
     */
    @SuppressWarnings("nls")
@@ -4243,7 +4402,7 @@ public class SemanticAnalyzer extends Ba
      QBParseInfo parseInfo = qb.getParseInfo();

      ObjectPair<List<ASTNode>, List<Integer>> grpByExprsGroupingSets =
- getGroupByGroupingSetsForClause(parseInfo, dest);
+ getGroupByGroupingSetsForClause(parseInfo, dest);

      List<ASTNode> grpByExprs = grpByExprsGroupingSets.getFirst();
      List<Integer> groupingSets = grpByExprsGroupingSets.getSecond();
@@ -4251,18 +4410,29 @@ public class SemanticAnalyzer extends Ba

      if (groupingSetsPresent) {
        checkExpressionsForGroupingSet(grpByExprs,
- parseInfo.getDistinctFuncExprsForClause(dest),
- parseInfo.getAggregationExprsForClause(dest),
- opParseCtx.get(inputOperatorInfo).getRowResolver());
+ parseInfo.getDistinctFuncExprsForClause(dest),
+ parseInfo.getAggregationExprsForClause(dest),
+ opParseCtx.get(inputOperatorInfo).getRowResolver());
+
+ int newMRJobGroupingSetsThreshold =
+ conf.getIntVar(HiveConf.ConfVars.HIVE_NEW_JOB_GROUPING_SET_CARDINALITY);
+
+ // Turn off skew if an additional MR job is required anyway for grouping sets.
+ if (groupingSets.size() > newMRJobGroupingSetsThreshold) {
+ String errorMsg = "The number of rows per input row due to grouping sets is "
+ + groupingSets.size();
+ throw new SemanticException(
+ ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_SKEW.getMsg(errorMsg));
+ }
      }

      // ////// Generate GroupbyOperator for a map-side partial aggregation
      Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
- new LinkedHashMap<String, GenericUDAFEvaluator>();
+ new LinkedHashMap<String, GenericUDAFEvaluator>();
      GroupByOperator groupByOperatorInfo =
- (GroupByOperator) genGroupByPlanMapGroupByOperator(
- qb, dest, grpByExprs, inputOperatorInfo, GroupByDesc.Mode.HASH,
- genericUDAFEvaluators, groupingSets, groupingSetsPresent);
+ (GroupByOperator) genGroupByPlanMapGroupByOperator(
+ qb, dest, grpByExprs, inputOperatorInfo, GroupByDesc.Mode.HASH,
+ genericUDAFEvaluators, groupingSets, groupingSetsPresent);

      groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get(
          inputOperatorInfo).getRowResolver().getTableNames());
@@ -4274,20 +4444,21 @@ public class SemanticAnalyzer extends Ba

        // ////// Generate ReduceSink Operator
        Operator reduceSinkOperatorInfo =
- genGroupByPlanReduceSinkOperator(qb,
- dest,
- groupByOperatorInfo,
- grpByExprs,
- distinctFuncExprs.isEmpty() ? -1 : Integer.MAX_VALUE,
- false,
- -1,
- true,
- groupingSetsPresent);
+ genGroupByPlanReduceSinkOperator(qb,
+ dest,
+ groupByOperatorInfo,
+ grpByExprs,
+ distinctFuncExprs.isEmpty() ? -1 : Integer.MAX_VALUE,
+ false,
+ -1,
+ true,
+ groupingSetsPresent);

        // ////// Generate GroupbyOperator for a partial aggregation
        Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo,
- dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS,
- genericUDAFEvaluators, false, groupingSetsPresent);
+ dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS,
+ genericUDAFEvaluators, false,
+ groupingSets, groupingSetsPresent, false);

        int numReducers = -1;
        if (grpByExprs.isEmpty()) {
@@ -4296,31 +4467,31 @@ public class SemanticAnalyzer extends Ba

        // ////// Generate ReduceSinkOperator2
        Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
- parseInfo, dest, groupByOperatorInfo2, grpByExprs.size(), numReducers,
- groupingSetsPresent);
+ parseInfo, dest, groupByOperatorInfo2, grpByExprs.size(), numReducers,
+ groupingSetsPresent);

        // ////// Generate GroupbyOperator3
        return genGroupByPlanGroupByOperator2MR(parseInfo, dest,
- reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL,
- genericUDAFEvaluators, groupingSetsPresent);
+ reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL,
+ genericUDAFEvaluators, groupingSetsPresent);
      } else {
        // If there are no grouping keys, grouping sets cannot be present
        assert !groupingSetsPresent;

        // ////// Generate ReduceSink Operator
        Operator reduceSinkOperatorInfo =
- genGroupByPlanReduceSinkOperator(qb,
- dest,
- groupByOperatorInfo,
- grpByExprs,
- grpByExprs.size(),
- false,
- 1,
- true,
- groupingSetsPresent);
+ genGroupByPlanReduceSinkOperator(qb,
+ dest,
+ groupByOperatorInfo,
+ grpByExprs,
+ grpByExprs.size(),
+ false,
+ 1,
+ true,
+ groupingSetsPresent);

        return genGroupByPlanGroupByOperator2MR(parseInfo, dest,
- reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators, false);
+ reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators, false);
      }
    }

@@ -4347,10 +4518,10 @@ public class SemanticAnalyzer extends Ba
    }

    private int getReducersBucketing(int totalFiles, int maxReducers) {
- int numFiles = totalFiles/maxReducers;
+ int numFiles = totalFiles / maxReducers;
      while (true) {
- if (totalFiles%numFiles == 0) {
- return totalFiles/numFiles;
+ if (totalFiles % numFiles == 0) {
+ return totalFiles / numFiles;
        }
        numFiles++;
      }
@@ -4359,8 +4530,8 @@ public class SemanticAnalyzer extends Ba
    private static class SortBucketRSCtx {
      ArrayList<ExprNodeDesc> partnCols;
      boolean multiFileSpray;
- int numFiles;
- int totalFiles;
+ int numFiles;
+ int totalFiles;

      public SortBucketRSCtx() {
        partnCols = null;
@@ -4377,7 +4548,8 @@ public class SemanticAnalyzer extends Ba
      }

      /**
- * @param partnCols the partnCols to set
+ * @param partnCols
+ * the partnCols to set
       */
      public void setPartnCols(ArrayList<ExprNodeDesc> partnCols) {
        this.partnCols = partnCols;
@@ -4391,7 +4563,8 @@ public class SemanticAnalyzer extends Ba
      }

      /**
- * @param multiFileSpray the multiFileSpray to set
+ * @param multiFileSpray
+ * the multiFileSpray to set
       */
      public void setMultiFileSpray(boolean multiFileSpray) {
        this.multiFileSpray = multiFileSpray;
@@ -4405,7 +4578,8 @@ public class SemanticAnalyzer extends Ba
      }

      /**
- * @param numFiles the numFiles to set
+ * @param numFiles
+ * the numFiles to set
       */
      public void setNumFiles(int numFiles) {
        this.numFiles = numFiles;
@@ -4419,7 +4593,8 @@ public class SemanticAnalyzer extends Ba
      }

      /**
- * @param totalFiles the totalFiles to set
+ * @param totalFiles
+ * the totalFiles to set
       */
      public void setTotalFiles(int totalFiles) {
        this.totalFiles = totalFiles;
@@ -4427,9 +4602,8 @@ public class SemanticAnalyzer extends Ba
    }

    @SuppressWarnings("nls")
- private Operator genBucketingSortingDest(String dest, Operator input, QB qb, TableDesc table_desc,
- Table dest_tab, SortBucketRSCtx ctx)
- throws SemanticException {
+ private Operator genBucketingSortingDest(String dest, Operator input, QB qb,
+ TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException {

      // If the table is bucketed, and bucketing is enforced, do the following:
      // If the number of buckets is smaller than the number of maximum reducers,
@@ -4438,20 +4612,21 @@ public class SemanticAnalyzer extends Ba
      // spray the data into multiple buckets. That way, we can support a very large
      // number of buckets without needing a very large number of reducers.
      boolean enforceBucketing = false;
- boolean enforceSorting = false;
+ boolean enforceSorting = false;
      ArrayList<ExprNodeDesc> partnCols = new ArrayList<ExprNodeDesc>();
      ArrayList<ExprNodeDesc> partnColsNoConvert = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
      ArrayList<Integer> sortOrders = new ArrayList<Integer>();
      boolean multiFileSpray = false;
- int numFiles = 1;
- int totalFiles = 1;
+ int numFiles = 1;
+ int totalFiles = 1;

      if ((dest_tab.getNumBuckets() > 0) &&
          (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
        enforceBucketing = true;
        partnCols = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
- partnColsNoConvert = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, false);
+ partnColsNoConvert = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
+ false);
      }

      if ((dest_tab.getSortCols() != null) &&
@@ -4471,7 +4646,7 @@ public class SemanticAnalyzer extends Ba
        if (conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS) > 0) {
          maxReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
        }
- int numBuckets = dest_tab.getNumBuckets();
+ int numBuckets = dest_tab.getNumBuckets();
        if (numBuckets > maxReducers) {
          multiFileSpray = true;
          totalFiles = numBuckets;
@@ -4481,7 +4656,7 @@ public class SemanticAnalyzer extends Ba
          else {
            // find the number of reducers such that it is a divisor of totalFiles
            maxReducers = getReducersBucketing(totalFiles, maxReducers);
- numFiles = totalFiles/maxReducers;
+ numFiles = totalFiles / maxReducers;
          }
        }
        else {
@@ -4500,6 +4675,7 @@ public class SemanticAnalyzer extends Ba

    /**
     * Check for HOLD_DDLTIME hint.
+ *
     * @param qb
     * @return true if HOLD_DDLTIME is set, false otherwise.
     */
@@ -4525,7 +4701,7 @@ public class SemanticAnalyzer extends Ba
      QBMetaData qbm = qb.getMetaData();
      Integer dest_type = qbm.getDestTypeForAlias(dest);

- Table dest_tab = null; // destination table if any
+ Table dest_tab = null; // destination table if any
      Partition dest_part = null;// destination partition if any
      String queryTmpdir = null; // the intermediate destination directory
      Path dest_path = null; // the final destination directory
@@ -4546,7 +4722,7 @@ public class SemanticAnalyzer extends Ba
        // Is the user trying to insert into a external tables
        if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
            (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
- throw new SemanticException(
+ throw new SemanticException(
              ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
        }

@@ -4556,17 +4732,17 @@ public class SemanticAnalyzer extends Ba
        // check for partition
        List<FieldSchema> parts = dest_tab.getPartitionKeys();
        if (parts != null && parts.size() > 0) { // table is partitioned
- if (partSpec== null || partSpec.size() == 0) { // user did NOT specify partition
+ if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
            throw new SemanticException(generateErrorMessage(
- qb.getParseInfo().getDestForClause(dest),
- ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
+ qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
          }
          // the HOLD_DDLTIIME hint should not be used with dynamic partition since the
          // newly generated partitions should always update their DDLTIME
          if (holdDDLTime) {
            throw new SemanticException(generateErrorMessage(
- qb.getParseInfo().getDestForClause(dest),
- ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg()));
+ qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg()));
          }
          dpCtx = qbm.getDPCtx(dest);
          if (dpCtx == null) {
@@ -4580,8 +4756,8 @@ public class SemanticAnalyzer extends Ba
          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
            if (dpCtx.getNumDPCols() > 0 &&
                (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES) ||
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) &&
- Utilities.supportCombineFileInputFormat() == false) {
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) &&
+ Utilities.supportCombineFileInputFormat() == false) {
              // Do not support merge for Hadoop versions (pre-0.20) that do not
              // support CombineHiveInputFormat
              HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
@@ -4592,8 +4768,8 @@ public class SemanticAnalyzer extends Ba

          } else { // QBMetaData.DEST_PARTITION capture the all-SP case
            throw new SemanticException(generateErrorMessage(
- qb.getParseInfo().getDestForClause(dest),
- ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
+ qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
          }
          if (dpCtx.getSPPath() != null) {
            dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
@@ -4663,7 +4839,7 @@ public class SemanticAnalyzer extends Ba
          else {
            try {
              String ppath = dpCtx.getSPPath();
- ppath = ppath.substring(0, ppath.length()-1);
+ ppath = ppath.substring(0, ppath.length() - 1);
              DummyPartition p =
                  new DummyPartition(dest_tab, dest_tab.getDbName()
                      + "@" + dest_tab.getTableName() + "@" + ppath,
@@ -4683,15 +4859,15 @@ public class SemanticAnalyzer extends Ba
        dest_tab = dest_part.getTable();
        if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
            dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) {
- throw new SemanticException(
+ throw new SemanticException(
              ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
        }

        Path tabPath = dest_tab.getPath();
        Path partPath = dest_part.getPartitionPath();

- // if the table is in a different dfs than the partition,
- // replace the partition's dfs with the table's dfs.
+ // if the table is in a different dfs than the partition,
+ // replace the partition's dfs with the table's dfs.
        dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
            .getAuthority(), partPath.toUri().getPath());

@@ -4719,8 +4895,8 @@ public class SemanticAnalyzer extends Ba
            Partition part = db.getPartition(dest_tab, dest_part.getSpec(), false);
            if (part == null) {
              throw new SemanticException(generateErrorMessage(
- qb.getParseInfo().getDestForClause(dest),
- ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg()));
+ qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.HOLD_DDLTIME_ON_NONEXIST_PARTITIONS.getMsg()));
            }
          } catch (HiveException e) {
            throw new SemanticException(e);
@@ -4920,16 +5096,16 @@ public class SemanticAnalyzer extends Ba
      }

      Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(fileSinkDesc,
- fsRS, input), inputRR);
+ fsRS, input), inputRR);

      if (ltd != null && SessionState.get() != null) {
        SessionState.get().getLineageState()
- .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator)output);
+ .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator) output);
      }

      if (LOG.isDebugEnabled()) {
        LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
- + dest_path + " row schema: " + inputRR.toString());
+ + dest_path + " row schema: " + inputRR.toString());
      }

      return output;
@@ -4959,7 +5135,7 @@ public class SemanticAnalyzer extends Ba
      int inColumnCnt = rowFields.size();
      int outColumnCnt = tableFields.size();
      if (dynPart && dpCtx != null) {
- outColumnCnt += dpCtx.getNumDPCols();
+ outColumnCnt += dpCtx.getNumDPCols();
      }

      if (inColumnCnt != outColumnCnt) {
@@ -4967,7 +5143,7 @@ public class SemanticAnalyzer extends Ba
            + " columns, but query has " + inColumnCnt + " columns.";
        throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
            qb.getParseInfo().getDestForClause(dest), reason));
- } else if (dynPart && dpCtx != null){
+ } else if (dynPart && dpCtx != null) {
        // create the mapping from input ExprNode to dest table DP column
        dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
      }
@@ -4998,8 +5174,8 @@ public class SemanticAnalyzer extends Ba
          // JSON-format.
          if (!tableFieldTypeInfo.equals(rowFieldTypeInfo)
              && !(isLazySimpleSerDe
- && tableFieldTypeInfo.getCategory().equals(Category.PRIMITIVE) && tableFieldTypeInfo
- .equals(TypeInfoFactory.stringTypeInfo))) {
+ && tableFieldTypeInfo.getCategory().equals(Category.PRIMITIVE) && tableFieldTypeInfo
+ .equals(TypeInfoFactory.stringTypeInfo))) {
            // need to do some conversions here
            converted = true;
            if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
@@ -5024,7 +5200,7 @@ public class SemanticAnalyzer extends Ba
      // deal with dynamic partition columns: convert ExprNodeDesc type to String??
      if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
        // DP columns starts with tableFields.size()

[... 1192 lines stripped ...]

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 2 of 3 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJan 9, '13 at 5:59p
activeJan 9, '13 at 5:59p
posts3
users1
websitehive.apache.org

1 user in discussion

Kevinwilfong: 3 posts

People

Translate

site design / logo © 2021 Grokbase