FAQ
Repository: hive
Updated Branches:
   refs/heads/master 6ec72de79 -> 990416249


HIVE-11320 ACID enable predicate pushdown for insert-only delta file (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/99041624
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/99041624
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/99041624

Branch: refs/heads/master
Commit: 990416249833e722ca8a32dd9dd425883da0caaf
Parents: 6ec72de
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Tue Jul 21 11:42:14 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Tue Jul 21 11:42:14 2015 -0700

----------------------------------------------------------------------
  .../hive/ql/io/orc/OrcRawRecordMerger.java | 20 ++++--
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 68 ++++++++++++++++++--
  2 files changed, 75 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/99041624/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 2f11611..58b85ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -478,10 +478,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{

      // we always want to read all of the deltas
      eventOptions.range(0, Long.MAX_VALUE);
- // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
- // it can produce wrong results (if the latest valid version of the record is filtered out by
- // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
- eventOptions.searchArgument(null, null);
      if (deltaDirectory != null) {
        for(Path delta: deltaDirectory) {
          ReaderKey key = new ReaderKey();
@@ -492,8 +488,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          if (length != -1 && fs.exists(deltaFile)) {
            Reader deltaReader = OrcFile.createReader(deltaFile,
                OrcFile.readerOptions(conf).maxLength(length));
- ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
- maxKey, eventOptions, deltaDir.getStatementId());
+ Reader.Options deltaEventOptions = null;
+ if(eventOptions.getSearchArgument() != null) {
+ // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
+ // it can produce wrong results (if the latest valid version of the record is filtered out by
+ // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
+ // unless the delta only has insert events
+ OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(deltaReader);
+ if(acidStats.deletes > 0 || acidStats.updates > 0) {
+ deltaEventOptions = eventOptions.clone().searchArgument(null, null);
+ }
+ }
+ ReaderPair deltaPair;
+ deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
+ maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
            if (deltaPair.nextRecord != null) {
              readers.put(key, deltaPair);
            }

http://git-wip-us.apache.org/repos/asf/hive/blob/99041624/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 33ca998..57e4fb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.hive.ql;

  import org.apache.commons.io.FileUtils;
  import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.orc.FileDump;
  import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
  import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
  import org.junit.After;
  import org.junit.Assert;
  import org.junit.Before;
@@ -36,13 +34,11 @@ import org.junit.Test;
  import org.junit.rules.TestName;

  import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Comparator;
  import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;

  /**
   * TODO: this should be merged with TestTxnCommands once that is checked in
@@ -55,7 +51,7 @@ public class TestTxnCommands2 {
    ).getPath().replaceAll("\\\\", "/");
    private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
    //bucket count for test tables; set it to 1 for easier debugging
- private static int BUCKET_COUNT = 2;
+ private static int BUCKET_COUNT = 1;
    @Rule
    public TestName testName = new TestName();
    private HiveConf hiveConf;
@@ -122,6 +118,64 @@ public class TestTxnCommands2 {
        FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
      }
    }
+ @Test
+ public void testOrcPPD() throws Exception {
+ testOrcPPD(true);
+ }
+ @Test
+ public void testOrcNoPPD() throws Exception {
+ testOrcPPD(false);
+ }
+ private void testOrcPPD(boolean enablePPD) throws Exception {
+ boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
+ int[][] tableData = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean stop = new AtomicBoolean();
+ AtomicBoolean looped = new AtomicBoolean();
+ stop.set(true);
+ t.init(stop, looped);
+ t.run();
+ //now we have base_0001 file
+ int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
+ //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
+ //now we have delta_0003_0003_0000 with delete events (can't push predicate)
+ runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9");
+ //and another delta with update op
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
+ int [][] resultData = {{3,4},{5,6},{9,11}};
+ Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
+ }
+ @Ignore("alter table")
+ @Test
+ public void testAlterTable() throws Exception {
+ int[][] tableData = {{1,2}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean stop = new AtomicBoolean();
+ AtomicBoolean looped = new AtomicBoolean();
+ stop.set(true);
+ t.init(stop, looped);
+ t.run();
+ int[][] tableData2 = {{5,6}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b");
+
+ runStatementOnDriver("alter table " + Table.ACIDTBL + " add columns(c int)");
+ int[][] moreTableData = {{7,8,9}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData));
+ List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c");
+ }
    @Ignore("not needed but useful for testing")
    @Test
    public void testNonAcidInsert() throws Exception {

Search Discussions

  • Ekoifman at Jul 21, 2015 at 6:57 pm
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 684d0e5e1 -> 77aefd6c8


    HIVE-11320 ACID enable predicate pushdown for insert-only delta file (Eugene Koifman, reviewed by Alan Gates)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77aefd6c
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77aefd6c
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77aefd6c

    Branch: refs/heads/branch-1
    Commit: 77aefd6c8d0a59bdc20a3ba74ccec1e955888fcb
    Parents: 684d0e5
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Tue Jul 21 11:57:03 2015 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Tue Jul 21 11:57:03 2015 -0700

    ----------------------------------------------------------------------
      .../hive/ql/io/orc/OrcRawRecordMerger.java | 20 ++++--
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 68 ++++++++++++++++++--
      2 files changed, 75 insertions(+), 13 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/77aefd6c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    index 2f11611..58b85ef 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    @@ -478,10 +478,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{

          // we always want to read all of the deltas
          eventOptions.range(0, Long.MAX_VALUE);
    - // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
    - // it can produce wrong results (if the latest valid version of the record is filtered out by
    - // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
    - eventOptions.searchArgument(null, null);
          if (deltaDirectory != null) {
            for(Path delta: deltaDirectory) {
              ReaderKey key = new ReaderKey();
    @@ -492,8 +488,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              if (length != -1 && fs.exists(deltaFile)) {
                Reader deltaReader = OrcFile.createReader(deltaFile,
                    OrcFile.readerOptions(conf).maxLength(length));
    - ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
    - maxKey, eventOptions, deltaDir.getStatementId());
    + Reader.Options deltaEventOptions = null;
    + if(eventOptions.getSearchArgument() != null) {
    + // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
    + // it can produce wrong results (if the latest valid version of the record is filtered out by
    + // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
    + // unless the delta only has insert events
    + OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(deltaReader);
    + if(acidStats.deletes > 0 || acidStats.updates > 0) {
    + deltaEventOptions = eventOptions.clone().searchArgument(null, null);
    + }
    + }
    + ReaderPair deltaPair;
    + deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
    + maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
                if (deltaPair.nextRecord != null) {
                  readers.put(key, deltaPair);
                }

    http://git-wip-us.apache.org/repos/asf/hive/blob/77aefd6c/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index 33ca998..57e4fb9 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -20,13 +20,11 @@ package org.apache.hadoop.hive.ql;

      import org.apache.commons.io.FileUtils;
      import org.apache.hadoop.fs.FileUtil;
    -import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    -import org.apache.hadoop.hive.ql.io.AcidUtils;
    -import org.apache.hadoop.hive.ql.io.orc.FileDump;
      import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
      import org.apache.hadoop.hive.ql.session.SessionState;
    +import org.apache.hadoop.hive.ql.txn.compactor.Worker;
      import org.junit.After;
      import org.junit.Assert;
      import org.junit.Before;
    @@ -36,13 +34,11 @@ import org.junit.Test;
      import org.junit.rules.TestName;

      import java.io.File;
    -import java.io.FileNotFoundException;
    -import java.io.FileOutputStream;
    -import java.io.FilenameFilter;
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.Comparator;
      import java.util.List;
    +import java.util.concurrent.atomic.AtomicBoolean;

      /**
       * TODO: this should be merged with TestTxnCommands once that is checked in
    @@ -55,7 +51,7 @@ public class TestTxnCommands2 {
        ).getPath().replaceAll("\\\\", "/");
        private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
        //bucket count for test tables; set it to 1 for easier debugging
    - private static int BUCKET_COUNT = 2;
    + private static int BUCKET_COUNT = 1;
        @Rule
        public TestName testName = new TestName();
        private HiveConf hiveConf;
    @@ -122,6 +118,64 @@ public class TestTxnCommands2 {
            FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
          }
        }
    + @Test
    + public void testOrcPPD() throws Exception {
    + testOrcPPD(true);
    + }
    + @Test
    + public void testOrcNoPPD() throws Exception {
    + testOrcPPD(false);
    + }
    + private void testOrcPPD(boolean enablePPD) throws Exception {
    + boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
    + int[][] tableData = {{1,2},{3,4}};
    + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
    + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
    + Worker t = new Worker();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean stop = new AtomicBoolean();
    + AtomicBoolean looped = new AtomicBoolean();
    + stop.set(true);
    + t.init(stop, looped);
    + t.run();
    + //now we have base_0001 file
    + int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
    + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
    + //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
    + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
    + //now we have delta_0003_0003_0000 with delete events (can't push predicate)
    + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9");
    + //and another delta with update op
    + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
    + int [][] resultData = {{3,4},{5,6},{9,11}};
    + Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
    + }
    + @Ignore("alter table")
    + @Test
    + public void testAlterTable() throws Exception {
    + int[][] tableData = {{1,2}};
    + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
    + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
    + Worker t = new Worker();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean stop = new AtomicBoolean();
    + AtomicBoolean looped = new AtomicBoolean();
    + stop.set(true);
    + t.init(stop, looped);
    + t.run();
    + int[][] tableData2 = {{5,6}};
    + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
    + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b");
    +
    + runStatementOnDriver("alter table " + Table.ACIDTBL + " add columns(c int)");
    + int[][] moreTableData = {{7,8,9}};
    + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData));
    + List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c");
    + }
        @Ignore("not needed but useful for testing")
        @Test
        public void testNonAcidInsert() throws Exception {

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJul 21, '15 at 6:42p
activeJul 21, '15 at 6:57p
posts2
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 2 posts

People

Translate

site design / logo © 2021 Grokbase