FAQ
Repository: hive
Updated Branches:
   refs/heads/branch-1 b71c687f0 -> e26697daa


HIVE-11357 ACID enable predicate pushdown for insert-only delta file 2 (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e26697da
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e26697da
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e26697da

Branch: refs/heads/branch-1
Commit: e26697daaf16f56754386aec0ef1404eeea4936e
Parents: b71c687
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Fri Aug 28 12:29:29 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Fri Aug 28 12:29:29 2015 -0700

----------------------------------------------------------------------
  .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 18 ++++-
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 85 ++++++++++++++++----
  2 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e26697da/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index e5bd640..68da26c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -127,7 +127,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,

    /**
     * When picking the hosts for a split that crosses block boundaries,
- * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the
+ * drop any host that has fewer than MIN_INCLUDED_LOCATION of the
     * number of bytes available on the host with the most.
     * If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the
     * split will contain host2 (100% of host2) and host3 (90% of host2). Host1
@@ -1246,6 +1246,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
      } else {
        bucket = (int) split.getStart();
        reader = null;
+ if(deltas != null && deltas.length > 0) {
+ Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+ FileSystem fs = readerOptions.getFilesystem();
+ if(fs == null) {
+ fs = path.getFileSystem(options.getConfiguration());
+ }
+ if(fs.exists(bucketPath)) {
+ /* w/o schema evolution (which ACID doesn't support yet) all delta
+ files have the same schema, so choosing the 1st one*/
+ final List<OrcProto.Type> types =
+ OrcFile.createReader(bucketPath, readerOptions).getTypes();
+ readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
+ setSearchArgument(readOptions, types, conf, split.isOriginal());
+ }
+ }
      }
      String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
                                  Long.MAX_VALUE + ":");

http://git-wip-us.apache.org/repos/asf/hive/blob/e26697da/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 58c2fca..5aa2500 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
  import org.apache.hadoop.fs.FileUtil;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
  import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
  import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -51,7 +52,7 @@ public class TestTxnCommands2 {
    ).getPath().replaceAll("\\\\", "/");
    private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
    //bucket count for test tables; set it to 1 for easier debugging
- private static int BUCKET_COUNT = 1;
+ private static int BUCKET_COUNT = 2;
    @Rule
    public TestName testName = new TestName();
    private HiveConf hiveConf;
@@ -107,7 +108,6 @@ public class TestTxnCommands2 {
    public void tearDown() throws Exception {
      try {
        if (d != null) {
- // runStatementOnDriver("set autocommit true");
          dropTables();
          d.destroy();
          d.close();
@@ -126,13 +126,51 @@ public class TestTxnCommands2 {
    public void testOrcNoPPD() throws Exception {
      testOrcPPD(false);
    }
- private void testOrcPPD(boolean enablePPD) throws Exception {
+
+ /**
+ * this is run 2 times: 1 with PPD on, 1 with off
+ * Also, the queries are such that if we were to push predicate down to an update/delete delta,
+ * the test would produce wrong results
+ * @param enablePPD
+ * @throws Exception
+ */
+ private void testOrcPPD(boolean enablePPD) throws Exception {
      boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
      hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
- int[][] tableData = {{1,2},{3,4}};
- runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
- List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
- runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ //create delta_0001_0001_0000 (should push predicate here)
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
+ List<String> explain;
+ String query = "update " + Table.ACIDTBL + " set b = 5 where a = 3";
+ if (enablePPD) {
+ explain = runStatementOnDriver("explain " + query);
+ /*
+ here is a portion of the above "explain". The "filterExpr:" in the TableScan is the pushed predicate
+ w/o PPD, the line is simply not there, otherwise the plan is the same
+ Map Operator Tree:,
+ TableScan,
+ alias: acidtbl,
+ filterExpr: (a = 3) (type: boolean),
+ Filter Operator,
+ predicate: (a = 3) (type: boolean),
+ Select Operator,
+ ...
+ */
+ assertPredicateIsPushed("filterExpr: (a = 3)", explain);
+ }
+ //create delta_0002_0002_0000 (can't push predicate)
+ runStatementOnDriver(query);
+ query = "select a,b from " + Table.ACIDTBL + " where b = 4 order by a,b";
+ if (enablePPD) {
+ /*at this point we have 2 delta files, 1 for insert 1 for update
+ * we should push predicate into 1st one but not 2nd. If the following 'select' were to
+ * push into the 'update' delta, we'd filter out {3,5} before doing merge and thus
+ * produce {3,4} as the value for 2nd row. The right result is 0-rows.*/
+ explain = runStatementOnDriver("explain " + query);
+ assertPredicateIsPushed("filterExpr: (b = 4)", explain);
+ }
+ List<String> rs0 = runStatementOnDriver(query);
+ Assert.assertEquals("Read failed", 0, rs0.size());
+ runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'");
      Worker t = new Worker();
      t.setThreadId((int) t.getId());
      t.setHiveConf(hiveConf);
@@ -142,18 +180,37 @@ public class TestTxnCommands2 {
      t.init(stop, looped);
      t.run();
      //now we have base_0001 file
- int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
+ int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}};
      runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
- //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
+ //now we have delta_0003_0003_0000 with inserts only (ok to push predicate)
+ if (enablePPD) {
+ explain = runStatementOnDriver("explain delete from " + Table.ACIDTBL + " where a=7 and b=8");
+ assertPredicateIsPushed("filterExpr: ((a = 7) and (b = 8))", explain);
+ }
      runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
- //now we have delta_0003_0003_0000 with delete events (can't push predicate)
- runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9");
- //and another delta with update op
- List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
- int [][] resultData = {{3,4},{5,6},{9,11}};
+ //now we have delta_0004_0004_0000 with delete events
+
+ /*(can't push predicate to 'delete' delta)
+ * if we were to push to 'delete' delta, we'd filter out all rows since the 'row' is always NULL for
+ * delete events and we'd produce data as if the delete never happened*/
+ query = "select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b";
+ if(enablePPD) {
+ explain = runStatementOnDriver("explain " + query);
+ assertPredicateIsPushed("filterExpr: (a > 1)", explain);
+ }
+ List<String> rs1 = runStatementOnDriver(query);
+ int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}};
      Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
      hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
    }
+ private static void assertPredicateIsPushed(String ppd, List<String> queryPlan) {
+ for(String line : queryPlan) {
+ if(line != null && line.contains(ppd)) {
+ return;
+ }
+ }
+ Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
+ }
    @Ignore("alter table")
    @Test
    public void testAlterTable() throws Exception {

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 2 of 2 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedAug 28, '15 at 7:19p
activeAug 28, '15 at 7:30p
posts2
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 2 posts

People

Translate

site design / logo © 2021 Grokbase