FAQ
Author: hashutosh
Date: Mon Oct 7 06:43:23 2013
New Revision: 1529772

URL: http://svn.apache.org/r1529772
Log:
HIVE-5391 : make ORC predicate pushdown work with vectorization (Sergey Shelukhin via Ashutosh Chauhan)

Added:
     hive/trunk/ql/src/test/queries/clientpositive/vectorization_pushdown.q
     hive/trunk/ql/src/test/results/clientpositive/vectorization_pushdown.q.out
Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1529772&r1=1529771&r2=1529772&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Oct 7 06:43:23 2013
@@ -27,6 +27,7 @@ import java.util.Map;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
  import java.util.concurrent.TimeUnit;
+
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
@@ -90,38 +91,12 @@ public class OrcInputFormat implements

      OrcRecordReader(Reader file, Configuration conf,
                      long offset, long length) throws IOException {
- String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
- String columnNamesString =
- conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
- String[] columnNames = null;
- SearchArgument sarg = null;
        List<OrcProto.Type> types = file.getTypes();
- if (types.size() == 0) {
- numColumns = 0;
- } else {
- numColumns = types.get(0).getSubtypesCount();
- }
- columnNames = new String[types.size()];
- LOG.info("included column ids = " +
- conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "null"));
- LOG.info("included columns names = " +
- conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "null"));
- boolean[] includeColumn = findIncludedColumns(types, conf);
- if (serializedPushdown != null && columnNamesString != null) {
- sarg = SearchArgument.FACTORY.create
- (Utilities.deserializeExpression(serializedPushdown, conf));
- LOG.info("ORC pushdown predicate: " + sarg);
- String[] neededColumnNames = columnNamesString.split(",");
- int i = 0;
- for(int columnId: types.get(0).getSubtypesList()) {
- if (includeColumn == null || includeColumn[columnId]) {
- columnNames[columnId] = neededColumnNames[i++];
- }
- }
- } else {
- LOG.info("No ORC pushdown predicate");
- }
- this.reader = file.rows(offset, length, includeColumn, sarg, columnNames);
+ numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+ boolean[] includedColumns = findIncludedColumns(types, conf);
+ String[] columnNames = getIncludedColumnNames(types, includedColumns, conf);
+ SearchArgument sarg = createSarg(types, conf);
+ this.reader = file.rows(offset, length, includedColumns, sarg, columnNames);
        this.offset = offset;
        this.length = length;
      }
@@ -187,15 +162,48 @@ public class OrcInputFormat implements
      }
    }

+ public static SearchArgument createSarg(List<OrcProto.Type> types, Configuration conf) {
+ String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ if (serializedPushdown == null
+ || conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) == null) {
+ LOG.info("No ORC pushdown predicate");
+ return null;
+ }
+ SearchArgument sarg = SearchArgument.FACTORY.create
+ (Utilities.deserializeExpression(serializedPushdown, conf));
+ LOG.info("ORC pushdown predicate: " + sarg);
+ return sarg;
+ }
+
+ public static String[] getIncludedColumnNames(
+ List<OrcProto.Type> types, boolean[] includedColumns, Configuration conf) {
+ String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ LOG.info("included columns names = " + columnNamesString);
+ if (columnNamesString == null || conf.get(TableScanDesc.FILTER_EXPR_CONF_STR) == null) {
+ return null;
+ }
+ String[] neededColumnNames = columnNamesString.split(",");
+ int i = 0;
+ String[] columnNames = new String[types.size()];
+ for(int columnId: types.get(0).getSubtypesList()) {
+ if (includedColumns == null || includedColumns[columnId]) {
+ columnNames[columnId] = neededColumnNames[i++];
+ }
+ }
+ return columnNames;
+ }
+
    /**
     * Take the configuration and figure out which columns we need to include.
     * @param types the types of the file
     * @param conf the configuration
     * @return true for each column that should be included
     */
- static boolean[] findIncludedColumns(List<OrcProto.Type> types,
- Configuration conf) {
- if (ColumnProjectionUtils.isReadAllColumns(conf)) {
+ public static boolean[] findIncludedColumns(List<OrcProto.Type> types, Configuration conf) {
+ String includedStr = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+ LOG.info("included column ids = " + includedStr);
+ if (ColumnProjectionUtils.isReadAllColumns(conf) ||
+ includedStr == null || includedStr.trim().length() == 0) {
        return null;
      } else {
        int numColumns = types.size();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1529772&r1=1529771&r2=1529772&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Mon Oct 7 06:43:23 2013
@@ -31,8 +31,8 @@ import org.apache.hadoop.hive.ql.exec.ve
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
  import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
  import org.apache.hadoop.io.NullWritable;
  import org.apache.hadoop.mapred.FileInputFormat;
  import org.apache.hadoop.mapred.FileSplit;
@@ -58,12 +58,14 @@ public class VectorizedOrcInputFormat ex

      VectorizedOrcRecordReader(Reader file, Configuration conf,
          FileSplit fileSplit) throws IOException {
+ List<OrcProto.Type> types = file.getTypes();
+ boolean[] includedColumns = OrcInputFormat.findIncludedColumns(types, conf);
+ String[] columnNames = OrcInputFormat.getIncludedColumnNames(types, includedColumns, conf);
+ SearchArgument sarg = OrcInputFormat.createSarg(types, conf);

        this.offset = fileSplit.getStart();
        this.length = fileSplit.getLength();
- this.reader = file.rows(offset, length,
- findIncludedColumns(file.getTypes(), conf));
-
+ this.reader = file.rows(offset, length, includedColumns, sarg, columnNames);
        try {
          rbCtx = new VectorizedRowBatchCtx();
          rbCtx.init(conf, fileSplit);
@@ -134,63 +136,6 @@ public class VectorizedOrcInputFormat ex
      setMinSplitSize(16 * 1024);
    }

- /**
- * Recurse down into a type subtree turning on all of the sub-columns.
- *
- * @param types
- * the types of the file
- * @param result
- * the global view of columns that should be included
- * @param typeId
- * the root of tree to enable
- */
- private static void includeColumnRecursive(List<OrcProto.Type> types,
- boolean[] result,
- int typeId) {
- result[typeId] = true;
- OrcProto.Type type = types.get(typeId);
- int children = type.getSubtypesCount();
- for (int i = 0; i < children; ++i) {
- includeColumnRecursive(types, result, type.getSubtypes(i));
- }
- }
-
- /**
- * Take the configuration and figure out which columns we need to include.
- *
- * @param types
- * the types of the file
- * @param conf
- * the configuration
- * @return true for each column that should be included
- */
- private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
- Configuration conf) {
- String includedStr =
- conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
- if (includedStr == null || includedStr.trim().length() == 0) {
- return null;
- } else {
- int numColumns = types.size();
- boolean[] result = new boolean[numColumns];
- result[0] = true;
- OrcProto.Type root = types.get(0);
- List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
- for (int i = 0; i < root.getSubtypesCount(); ++i) {
- if (included.contains(i)) {
- includeColumnRecursive(types, result, root.getSubtypes(i));
- }
- }
- // if we are filtering at least one column, return the boolean array
- for (boolean include : result) {
- if (!include) {
- return result;
- }
- }
- return null;
- }
- }
-
    @Override
    public RecordReader<NullWritable, VectorizedRowBatch>
        getRecordReader(InputSplit inputSplit, JobConf conf,

Added: hive/trunk/ql/src/test/queries/clientpositive/vectorization_pushdown.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/vectorization_pushdown.q?rev=1529772&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/vectorization_pushdown.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/vectorization_pushdown.q Mon Oct 7 06:43:23 2013
@@ -0,0 +1,4 @@
+SET hive.vectorized.execution.enabled=true;
+SET hive.optimize.index.filter=true;
+explain SELECT AVG(cbigint) FROM alltypesorc WHERE cbigint < cdouble;
+SELECT AVG(cbigint) FROM alltypesorc WHERE cbigint < cdouble;

Added: hive/trunk/ql/src/test/results/clientpositive/vectorization_pushdown.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vectorization_pushdown.q.out?rev=1529772&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vectorization_pushdown.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/vectorization_pushdown.q.out Mon Oct 7 06:43:23 2013
@@ -0,0 +1,81 @@
+WARNING: Comparing a bigint and a double may result in a loss of precision.
+PREHOOK: query: explain SELECT AVG(cbigint) FROM alltypesorc WHERE cbigint < cdouble
+PREHOOK: type: QUERY
+POSTHOOK: query: explain SELECT AVG(cbigint) FROM alltypesorc WHERE cbigint < cdouble
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION AVG (TOK_TABLE_OR_COL cbigint)))) (TOK_WHERE (< (TOK_TABLE_OR_COL cbigint) (TOK_TABLE_OR_COL cdouble)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ alltypesorc
+ TableScan
+ alias: alltypesorc
+ filterExpr:
+ expr: (cbigint < cdouble)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (cbigint < cdouble)
+ type: boolean
+ Vectorized execution: true
+ Select Operator
+ expressions:
+ expr: cbigint
+ type: bigint
+ outputColumnNames: cbigint
+ Vectorized execution: true
+ Group By Operator
+ aggregations:
+ expr: avg(cbigint)
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ Vectorized execution: true
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: struct<count:bigint,sum:double>
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: avg(VALUE._col0)
+ bucketGroup: false
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: double
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+WARNING: Comparing a bigint and a double may result in a loss of precision.
+PREHOOK: query: SELECT AVG(cbigint) FROM alltypesorc WHERE cbigint < cdouble
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT AVG(cbigint) FROM alltypesorc WHERE cbigint < cdouble
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+#### A masked pattern was here ####
+-1.4670720493864927E9

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 7, '13 at 6:43a
activeOct 7, '13 at 6:43a
posts1
users1
websitehive.apache.org

1 user in discussion

Hashutosh: 1 post

People

Translate

site design / logo © 2021 Grokbase