FAQ
Repository: hive
Updated Branches:
   refs/heads/master 6ec72de79 -> 990416249


HIVE-11320 ACID enable predicate pushdown for insert-only delta file (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/99041624
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/99041624
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/99041624

Branch: refs/heads/master
Commit: 990416249833e722ca8a32dd9dd425883da0caaf
Parents: 6ec72de
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Tue Jul 21 11:42:14 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Tue Jul 21 11:42:14 2015 -0700

----------------------------------------------------------------------
  .../hive/ql/io/orc/OrcRawRecordMerger.java | 20 ++++--
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 68 ++++++++++++++++++--
  2 files changed, 75 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/99041624/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 2f11611..58b85ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -478,10 +478,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{

      // we always want to read all of the deltas
      eventOptions.range(0, Long.MAX_VALUE);
- // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
- // it can produce wrong results (if the latest valid version of the record is filtered out by
- // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
- eventOptions.searchArgument(null, null);
      if (deltaDirectory != null) {
        for(Path delta: deltaDirectory) {
          ReaderKey key = new ReaderKey();
@@ -492,8 +488,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          if (length != -1 && fs.exists(deltaFile)) {
            Reader deltaReader = OrcFile.createReader(deltaFile,
                OrcFile.readerOptions(conf).maxLength(length));
- ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
- maxKey, eventOptions, deltaDir.getStatementId());
+ Reader.Options deltaEventOptions = null;
+ if(eventOptions.getSearchArgument() != null) {
+ // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
+ // it can produce wrong results (if the latest valid version of the record is filtered out by
+ // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
+ // unless the delta only has insert events
+ OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(deltaReader);
+ if(acidStats.deletes > 0 || acidStats.updates > 0) {
+ deltaEventOptions = eventOptions.clone().searchArgument(null, null);
+ }
+ }
+ ReaderPair deltaPair;
+ deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
+ maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
            if (deltaPair.nextRecord != null) {
              readers.put(key, deltaPair);
            }

http://git-wip-us.apache.org/repos/asf/hive/blob/99041624/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 33ca998..57e4fb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.hive.ql;

  import org.apache.commons.io.FileUtils;
  import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.orc.FileDump;
  import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
  import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
  import org.junit.After;
  import org.junit.Assert;
  import org.junit.Before;
@@ -36,13 +34,11 @@ import org.junit.Test;
  import org.junit.rules.TestName;

  import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Comparator;
  import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;

  /**
   * TODO: this should be merged with TestTxnCommands once that is checked in
@@ -55,7 +51,7 @@ public class TestTxnCommands2 {
    ).getPath().replaceAll("\\\\", "/");
    private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
    //bucket count for test tables; set it to 1 for easier debugging
- private static int BUCKET_COUNT = 2;
+ private static int BUCKET_COUNT = 1;
    @Rule
    public TestName testName = new TestName();
    private HiveConf hiveConf;
@@ -122,6 +118,64 @@ public class TestTxnCommands2 {
        FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
      }
    }
+ @Test
+ public void testOrcPPD() throws Exception {
+ testOrcPPD(true);
+ }
+ @Test
+ public void testOrcNoPPD() throws Exception {
+ testOrcPPD(false);
+ }
+ private void testOrcPPD(boolean enablePPD) throws Exception {
+ boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD
+ int[][] tableData = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean stop = new AtomicBoolean();
+ AtomicBoolean looped = new AtomicBoolean();
+ stop.set(true);
+ t.init(stop, looped);
+ t.run();
+ //now we have base_0001 file
+ int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
+ //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
+ runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8");
+ //now we have delta_0003_0003_0000 with delete events (can't push predicate)
+ runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9");
+ //and another delta with update op
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b");
+ int [][] resultData = {{3,4},{5,6},{9,11}};
+ Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
+ }
+ @Ignore("alter table")
+ @Test
+ public void testAlterTable() throws Exception {
+ int[][] tableData = {{1,2}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean stop = new AtomicBoolean();
+ AtomicBoolean looped = new AtomicBoolean();
+ stop.set(true);
+ t.init(stop, looped);
+ t.run();
+ int[][] tableData2 = {{5,6}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
+ List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b");
+
+ runStatementOnDriver("alter table " + Table.ACIDTBL + " add columns(c int)");
+ int[][] moreTableData = {{7,8,9}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData));
+ List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c");
+ }
    @Ignore("not needed but useful for testing")
    @Test
    public void testNonAcidInsert() throws Exception {

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 6 of 68 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJul 21, '15 at 12:17a
activeJul 31, '15 at 12:43a
posts68
users8
websitehive.apache.org

People

Translate

site design / logo © 2021 Grokbase