FAQ
Repository: hive
Updated Branches:
   refs/heads/master 5c94bda99 -> 66feedc55


http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index bebac54..11e5333 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -281,7 +281,7 @@ public class TestWorker extends CompactorTest {
      // Find the new delta file and make sure it has the right contents
      boolean sawNewDelta = false;
      for (int i = 0; i < stat.length; i++) {
- if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) {
          sawNewDelta = true;
          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
          Assert.assertEquals(2, buckets.length);
@@ -296,6 +296,10 @@ public class TestWorker extends CompactorTest {
      Assert.assertTrue(sawNewDelta);
    }

+ /**
+ * todo: fix https://issues.apache.org/jira/browse/HIVE-9995
+ * @throws Exception
+ */
    @Test
    public void minorWithOpenInMiddle() throws Exception {
      LOG.debug("Starting minorWithOpenInMiddle");
@@ -321,15 +325,18 @@ public class TestWorker extends CompactorTest {
      // There should still now be 5 directories in the location
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(5, stat.length);
+ boolean is130 = this instanceof TestWorker2;
+ Assert.assertEquals(is130 ? 5 : 4, stat.length);

      // Find the new delta file and make sure it has the right contents
      Arrays.sort(stat);
      Assert.assertEquals("base_20", stat[0].getPath().getName());
- Assert.assertEquals("delta_0000021_0000022", stat[1].getPath().getName());
- Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
- Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
- Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
+ if(is130) {//in1.3.0 orig delta is delta_00021_00022_0000 and compacted one is delta_00021_00022...
+ Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
+ }
+ Assert.assertEquals(makeDeltaDirName(21, 22), stat[1 + (is130 ? 1 : 0)].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[2 + (is130 ? 1 : 0)].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[3 + (is130 ? 1 : 0)].getPath().getName());
    }

    @Test
@@ -362,10 +369,10 @@ public class TestWorker extends CompactorTest {
      // Find the new delta file and make sure it has the right contents
      Arrays.sort(stat);
      Assert.assertEquals("base_20", stat[0].getPath().getName());
- Assert.assertEquals("delta_0000021_0000027", stat[1].getPath().getName());
- Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
- Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
- Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
+ Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
    }

    @Test
@@ -398,7 +405,7 @@ public class TestWorker extends CompactorTest {
      // Find the new delta file and make sure it has the right contents
      boolean sawNewDelta = false;
      for (int i = 0; i < stat.length; i++) {
- if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) {
          sawNewDelta = true;
          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
          Assert.assertEquals(2, buckets.length);
@@ -441,7 +448,7 @@ public class TestWorker extends CompactorTest {
      // Find the new delta file and make sure it has the right contents
      boolean sawNewDelta = false;
      for (int i = 0; i < stat.length; i++) {
- if (stat[i].getPath().getName().equals("delta_0000001_0000004")) {
+ if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4))) {
          sawNewDelta = true;
          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
          Assert.assertEquals(2, buckets.length);
@@ -661,7 +668,7 @@ public class TestWorker extends CompactorTest {
      // Find the new delta file and make sure it has the right contents
      boolean sawNewDelta = false;
      for (int i = 0; i < stat.length; i++) {
- if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) {
          sawNewDelta = true;
          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
          Assert.assertEquals(2, buckets.length);
@@ -760,9 +767,9 @@ public class TestWorker extends CompactorTest {
      Arrays.sort(stat);
      Assert.assertEquals("base_0000022", stat[0].getPath().getName());
      Assert.assertEquals("base_20", stat[1].getPath().getName());
- Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
- Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
- Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
    }

    @Test
@@ -796,9 +803,13 @@ public class TestWorker extends CompactorTest {
      Arrays.sort(stat);
      Assert.assertEquals("base_0000027", stat[0].getPath().getName());
      Assert.assertEquals("base_20", stat[1].getPath().getName());
- Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
- Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
- Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+ }
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
    }

    @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
new file mode 100644
index 0000000..3b5283a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
@@ -0,0 +1,16 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+/**
+ * Same as TestWorker but tests delta file names in Hive 1.3.0 format
+ */
+public class TestWorker2 extends TestWorker {
+
+ public TestWorker2() throws Exception {
+ super();
+ }
+
+ @Override
+ boolean useHive130DeltaDirName() {
+ return true;
+ }
+}

Search Discussions

  • Ekoifman at Jul 13, 2015 at 4:11 pm
    HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66feedc5
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66feedc5
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66feedc5

    Branch: refs/heads/master
    Commit: 66feedc5569de959a383e0a58d9e8768bbad0e2c
    Parents: 5c94bda
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Mon Jul 13 09:11:28 2015 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Mon Jul 13 09:11:28 2015 -0700

    ----------------------------------------------------------------------
      .../streaming/AbstractRecordWriter.java | 4 +-
      .../streaming/mutate/worker/MutatorImpl.java | 4 +-
      .../java/org/apache/hadoop/hive/ql/Driver.java | 1 +
      .../hadoop/hive/ql/io/AcidInputFormat.java | 60 +++++++-
      .../hadoop/hive/ql/io/AcidOutputFormat.java | 49 +++++-
      .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 152 +++++++++++++++----
      .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 19 +--
      .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 20 +--
      .../hadoop/hive/ql/io/orc/OrcNewSplit.java | 13 +-
      .../hive/ql/io/orc/OrcRawRecordMerger.java | 66 ++++++--
      .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 58 +++++++
      .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 16 +-
      .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 20 ++-
      .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 4 +
      .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 +
      .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 +-
      .../hadoop/hive/ql/plan/FileSinkDesc.java | 27 +++-
      .../hive/ql/txn/compactor/CompactorMR.java | 4 +-
      .../hive/ql/exec/TestFileSinkOperator.java | 3 +-
      .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 73 ++++++++-
      .../hive/ql/io/orc/TestInputOutputFormat.java | 13 +-
      .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 57 ++++---
      .../hive/ql/io/orc/TestOrcRecordUpdater.java | 6 +-
      .../hive/ql/txn/compactor/CompactorTest.java | 20 ++-
      .../hive/ql/txn/compactor/TestCleaner.java | 8 +-
      .../hive/ql/txn/compactor/TestCleaner2.java | 14 ++
      .../hive/ql/txn/compactor/TestInitiator.java | 4 +
      .../hive/ql/txn/compactor/TestWorker.java | 49 +++---
      .../hive/ql/txn/compactor/TestWorker2.java | 16 ++
      29 files changed, 645 insertions(+), 141 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    ----------------------------------------------------------------------
    diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    index ed46bca..c959222 100644
    --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    @@ -143,7 +143,9 @@ abstract class AbstractRecordWriter implements RecordWriter {
                            .inspector(getSerde().getObjectInspector())
                            .bucket(bucketId)
                            .minimumTransactionId(minTxnId)
    - .maximumTransactionId(maxTxnID));
    + .maximumTransactionId(maxTxnID)
    + .statementId(-1)
    + .finalDestination(partitionPath));
          } catch (SerDeException e) {
            throw new SerializationError("Failed to get object inspector from Serde "
                    + getSerde().getClass().getName(), e);

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
    ----------------------------------------------------------------------
    diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
    index 0fe41d5..52062f8 100644
    --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
    +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
    @@ -78,7 +78,9 @@ public class MutatorImpl implements Mutator {
                  .bucket(bucketId)
                  .minimumTransactionId(transactionId)
                  .maximumTransactionId(transactionId)
    - .recordIdColumn(recordIdColumn));
    + .recordIdColumn(recordIdColumn)
    + .finalDestination(partitionPath)
    + .statementId(-1));
        }

      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    index 934cb42..b74e5fa 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    @@ -988,6 +988,7 @@ public class Driver implements CommandProcessor {
              if (acidSinks != null) {
                for (FileSinkDesc desc : acidSinks) {
                  desc.setTransactionId(txnId);
    + desc.setStatementId(txnMgr.getStatementId());
                }
              }


    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    index e1d2395..24506b7 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    @@ -22,13 +22,19 @@ import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.common.ValidTxnList;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.io.Writable;
      import org.apache.hadoop.io.WritableComparable;
      import org.apache.hadoop.mapred.InputFormat;
      import org.apache.hadoop.mapred.InputSplit;
      import org.apache.hadoop.mapred.RecordReader;
      import org.apache.hadoop.mapred.Reporter;

    +import java.io.DataInput;
    +import java.io.DataOutput;
      import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;

      /**
       * The interface required for input formats that what to support ACID
    @@ -62,7 +68,7 @@ import java.io.IOException;
       * <li>New format -
       * <pre>
       * $partition/base_$tid/$bucket
    - * delta_$tid_$tid/$bucket
    + * delta_$tid_$tid_$stid/$bucket
       * </pre></li>
       * </ul>
       * <p>
    @@ -71,6 +77,8 @@ import java.io.IOException;
       * stored sorted by the original transaction id (ascending), bucket (ascending),
       * row id (ascending), and current transaction id (descending). Thus the files
       * can be merged by advancing through the files in parallel.
    + * The stid is unique id (within the transaction) of the statement that created
    + * this delta file.
       * <p>
       * The base files include all transactions from the beginning of time
       * (transaction id 0) to the transaction in the directory name. Delta
    @@ -91,7 +99,7 @@ import java.io.IOException;
       * For row-at-a-time processing, KEY can conveniently pass RowId into the operator
       * pipeline. For vectorized execution the KEY could perhaps represent a range in the batch.
       * Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return
    - * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined
    + * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader} is defined
       * to provide access to the RowId. Other implementations of AcidInputFormat can use either
       * mechanism.
       * </p>
    @@ -101,6 +109,54 @@ import java.io.IOException;
      public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
          extends InputFormat<KEY, VALUE>, InputFormatChecker {

    + static final class DeltaMetaData implements Writable {
    + private long minTxnId;
    + private long maxTxnId;
    + private List<Integer> stmtIds;
    +
    + public DeltaMetaData() {
    + this(0,0,null);
    + }
    + DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
    + this.minTxnId = minTxnId;
    + this.maxTxnId = maxTxnId;
    + this.stmtIds = stmtIds;
    + }
    + long getMinTxnId() {
    + return minTxnId;
    + }
    + long getMaxTxnId() {
    + return maxTxnId;
    + }
    + List<Integer> getStmtIds() {
    + return stmtIds;
    + }
    + @Override
    + public void write(DataOutput out) throws IOException {
    + out.writeLong(minTxnId);
    + out.writeLong(maxTxnId);
    + out.writeInt(stmtIds.size());
    + if(stmtIds == null) {
    + return;
    + }
    + for(Integer id : stmtIds) {
    + out.writeInt(id);
    + }
    + }
    + @Override
    + public void readFields(DataInput in) throws IOException {
    + minTxnId = in.readLong();
    + maxTxnId = in.readLong();
    + int numStatements = in.readInt();
    + if(numStatements <= 0) {
    + return;
    + }
    + stmtIds = new ArrayList<>();
    + for(int i = 0; i < numStatements; i++) {
    + stmtIds.add(in.readInt());
    + }
    + }
    + }
        /**
         * Options for controlling the record readers.
         */

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    index 0d537e1..dd90a95 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    @@ -39,7 +39,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
        /**
         * Options to control how the files are written
         */
    - public static class Options {
    + public static class Options implements Cloneable {
          private final Configuration configuration;
          private FileSystem fs;
          private ObjectInspector inspector;
    @@ -53,7 +53,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
          private PrintStream dummyStream = null;
          private boolean oldStyle = false;
          private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id
    -
    + //unique within a transaction
    + private int statementId = 0;
    + private Path finalDestination;
          /**
           * Create the options object.
           * @param conf Use the given configuration
    @@ -63,6 +65,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
          }

          /**
    + * shallow clone
    + */
    + @Override
    + public Options clone() {
    + try {
    + return (Options)super.clone();
    + }
    + catch(CloneNotSupportedException ex) {
    + throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex);
    + }
    + }
    + /**
           * Use the given ObjectInspector for each record written.
           * @param inspector the inspector to use.
           * @return this
    @@ -185,6 +199,31 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
            return this;
          }

    + /**
    + * @since 1.3.0
    + * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
    + * This is primarily needed for testing to make sure 1.3 code can still read files created
    + * by older code. Also used by Comactor.
    + */
    + public Options statementId(int id) {
    + if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) {
    + throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId);
    + }
    + if(id < -1) {
    + throw new IllegalArgumentException("Illegal statementId value: " + id);
    + }
    + this.statementId = id;
    + return this;
    + }
    + /**
    + * @param p where the data for this operation will eventually end up;
    + * basically table or partition directory in FS
    + */
    + public Options finalDestination(Path p) {
    + this.finalDestination = p;
    + return this;
    + }
    +
          public Configuration getConfiguration() {
            return configuration;
          }
    @@ -236,6 +275,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
          boolean getOldStyle() {
            return oldStyle;
          }
    + public int getStatementId() {
    + return statementId;
    + }
    + public Path getFinalDestination() {
    + return finalDestination;
    + }
        }

        /**

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    index 2214733..c7e0780 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    @@ -67,6 +67,15 @@ public class AcidUtils {
        };
        public static final String BUCKET_DIGITS = "%05d";
        public static final String DELTA_DIGITS = "%07d";
    + /**
    + * 10K statements per tx. Probably overkill ... since that many delta files
    + * would not be good for performance
    + */
    + public static final String STATEMENT_DIGITS = "%04d";
    + /**
    + * This must be in sync with {@link #STATEMENT_DIGITS}
    + */
    + public static final int MAX_STATEMENTS_PER_TXN = 10000;
        public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
        public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
        public static final PathFilter originalBucketFilter = new PathFilter() {
    @@ -79,7 +88,7 @@ public class AcidUtils {
        private AcidUtils() {
          // NOT USED
        }
    - private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
    + private static final Log LOG = LogFactory.getLog(AcidUtils.class);

        private static final Pattern ORIGINAL_PATTERN =
            Pattern.compile("[0-9]+_[0-9]+");
    @@ -104,12 +113,23 @@ public class AcidUtils {
              BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
        }

    - private static String deltaSubdir(long min, long max) {
    + /**
    + * This is format of delta dir name prior to Hive 1.3.x
    + */
    + public static String deltaSubdir(long min, long max) {
          return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
              String.format(DELTA_DIGITS, max);
        }

        /**
    + * Each write statement in a transaction creates its own delta dir.
    + * @since 1.3.x
    + */
    + public static String deltaSubdir(long min, long max, int statementId) {
    + return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
    + }
    +
    + /**
         * Create a filename for a bucket file.
         * @param directory the partition directory
         * @param options the options for writing the bucket
    @@ -124,9 +144,15 @@ public class AcidUtils {
          } else if (options.isWritingBase()) {
            subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
                options.getMaximumTransactionId());
    + } else if(options.getStatementId() == -1) {
    + //when minor compaction runs, we collapse per statement delta files inside a single
    + //transaction so we no longer need a statementId in the file name
    + subdir = deltaSubdir(options.getMinimumTransactionId(),
    + options.getMaximumTransactionId());
          } else {
            subdir = deltaSubdir(options.getMinimumTransactionId(),
    - options.getMaximumTransactionId());
    + options.getMaximumTransactionId(),
    + options.getStatementId());
          }
          return createBucketFile(new Path(directory, subdir), options.getBucket());
        }
    @@ -214,14 +240,24 @@ public class AcidUtils {
        }

        public static class ParsedDelta implements Comparable<ParsedDelta> {
    - final long minTransaction;
    - final long maxTransaction;
    - final FileStatus path;
    + private final long minTransaction;
    + private final long maxTransaction;
    + private final FileStatus path;
    + //-1 is for internal (getAcidState()) purposes and means the delta dir
    + //had no statement ID
    + private final int statementId;

    + /**
    + * for pre 1.3.x delta files
    + */
          ParsedDelta(long min, long max, FileStatus path) {
    + this(min, max, path, -1);
    + }
    + ParsedDelta(long min, long max, FileStatus path, int statementId) {
            this.minTransaction = min;
            this.maxTransaction = max;
            this.path = path;
    + this.statementId = statementId;
          }

          public long getMinTransaction() {
    @@ -236,6 +272,16 @@ public class AcidUtils {
            return path.getPath();
          }

    + public int getStatementId() {
    + return statementId == -1 ? 0 : statementId;
    + }
    +
    + /**
    + * Compactions (Major/Minor) merge deltas/bases but delete of old files
    + * happens in a different process; thus it's possible to have bases/deltas with
    + * overlapping txnId boundaries. The sort order helps figure out the "best" set of files
    + * to use to get data.
    + */
          @Override
          public int compareTo(ParsedDelta parsedDelta) {
            if (minTransaction != parsedDelta.minTransaction) {
    @@ -250,7 +296,22 @@ public class AcidUtils {
              } else {
                return -1;
              }
    - } else {
    + }
    + else if(statementId != parsedDelta.statementId) {
    + /**
    + * We want deltas after minor compaction (w/o statementId) to sort
    + * earlier so that getAcidState() considers compacted files (into larger ones) obsolete
    + * Before compaction, include deltas with all statementIds for a given txnId
    + * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory}
    + */
    + if(statementId < parsedDelta.statementId) {
    + return -1;
    + }
    + else {
    + return 1;
    + }
    + }
    + else {
              return path.compareTo(parsedDelta.path);
            }
          }
    @@ -271,46 +332,72 @@ public class AcidUtils {

        /**
         * Convert the list of deltas into an equivalent list of begin/end
    - * transaction id pairs.
    + * transaction id pairs. Assumes {@code deltas} is sorted.
         * @param deltas
         * @return the list of transaction ids to serialize
         */
    - public static List<Long> serializeDeltas(List<ParsedDelta> deltas) {
    - List<Long> result = new ArrayList<Long>(deltas.size() * 2);
    - for(ParsedDelta delta: deltas) {
    - result.add(delta.minTransaction);
    - result.add(delta.maxTransaction);
    + public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) {
    + List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
    + AcidInputFormat.DeltaMetaData last = null;
    + for(ParsedDelta parsedDelta : deltas) {
    + if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) {
    + last.getStmtIds().add(parsedDelta.getStatementId());
    + continue;
    + }
    + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>());
    + result.add(last);
    + if(parsedDelta.statementId >= 0) {
    + last.getStmtIds().add(parsedDelta.getStatementId());
    + }
          }
          return result;
        }

        /**
         * Convert the list of begin/end transaction id pairs to a list of delta
    - * directories.
    + * directories. Note that there may be multiple delta files for the exact same txn range starting
    + * with 1.3.x;
    + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
         * @param root the root directory
         * @param deltas list of begin/end transaction id pairs
         * @return the list of delta paths
         */
    - public static Path[] deserializeDeltas(Path root, List<Long> deltas) {
    - int deltaSize = deltas.size() / 2;
    - Path[] result = new Path[deltaSize];
    - for(int i = 0; i < deltaSize; ++i) {
    - result[i] = new Path(root, deltaSubdir(deltas.get(i * 2),
    - deltas.get(i * 2 + 1)));
    + public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
    + List<Path> results = new ArrayList<Path>(deltas.size());
    + for(AcidInputFormat.DeltaMetaData dmd : deltas) {
    + if(dmd.getStmtIds().isEmpty()) {
    + results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
    + continue;
    + }
    + for(Integer stmtId : dmd.getStmtIds()) {
    + results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
    + }
          }
    - return result;
    + return results.toArray(new Path[results.size()]);
        }

    - static ParsedDelta parseDelta(FileStatus path) {
    - String filename = path.getPath().getName();
    + private static ParsedDelta parseDelta(FileStatus path) {
    + ParsedDelta p = parsedDelta(path.getPath());
    + return new ParsedDelta(p.getMinTransaction(),
    + p.getMaxTransaction(), path, p.statementId);
    + }
    + public static ParsedDelta parsedDelta(Path deltaDir) {
    + String filename = deltaDir.getName();
          if (filename.startsWith(DELTA_PREFIX)) {
            String rest = filename.substring(DELTA_PREFIX.length());
            int split = rest.indexOf('_');
    + int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
            long min = Long.parseLong(rest.substring(0, split));
    - long max = Long.parseLong(rest.substring(split + 1));
    - return new ParsedDelta(min, max, path);
    + long max = split2 == -1 ?
    + Long.parseLong(rest.substring(split + 1)) :
    + Long.parseLong(rest.substring(split + 1, split2));
    + if(split2 == -1) {
    + return new ParsedDelta(min, max, null);
    + }
    + int statementId = Integer.parseInt(rest.substring(split2 + 1));
    + return new ParsedDelta(min, max, null, statementId);
          }
    - throw new IllegalArgumentException(path + " does not start with " +
    + throw new IllegalArgumentException(deltaDir + " does not start with " +
                                             DELTA_PREFIX);
        }

    @@ -407,15 +494,24 @@ public class AcidUtils {

          Collections.sort(working);
          long current = bestBaseTxn;
    + int lastStmtId = -1;
          for(ParsedDelta next: working) {
            if (next.maxTransaction > current) {
              // are any of the new transactions ones that we care about?
              if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
    - ValidTxnList.RangeResponse.NONE) {
    + ValidTxnList.RangeResponse.NONE) {
                deltas.add(next);
                current = next.maxTransaction;
    + lastStmtId = next.statementId;
              }
    - } else {
    + }
    + else if(next.maxTransaction == current && lastStmtId >= 0) {
    + //make sure to get all deltas within a single transaction; multi-statement txn
    + //generate multiple delta files with the same txnId range
    + //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
    + deltas.add(next);
    + }
    + else {
              obsolete.add(next.path);
            }
          }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    index 7ad5aa0..50ba740 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    @@ -297,31 +297,32 @@ public final class HiveFileFormatUtils {
          // TODO not 100% sure about this. This call doesn't set the compression type in the conf
          // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
          // sure if this is correct or not.
    - return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
    - bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
    + return getRecordUpdater(jc, acidOutputFormat,
    + bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
        }


        private static RecordUpdater getRecordUpdater(JobConf jc,
                                                      AcidOutputFormat<?, ?> acidOutputFormat,
    - boolean isCompressed,
    - long txnId,
                                                      int bucket,
                                                      ObjectInspector inspector,
                                                      Properties tableProp,
                                                      Path outPath,
                                                      Reporter reporter,
    - int rowIdColNum) throws IOException {
    + int rowIdColNum,
    + FileSinkDesc conf) throws IOException {
          return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
    - .isCompressed(isCompressed)
    + .isCompressed(conf.getCompressed())
              .tableProperties(tableProp)
              .reporter(reporter)
              .writingBase(false)
    - .minimumTransactionId(txnId)
    - .maximumTransactionId(txnId)
    + .minimumTransactionId(conf.getTransactionId())
    + .maximumTransactionId(conf.getTransactionId())
              .bucket(bucket)
              .inspector(inspector)
    - .recordIdColumn(rowIdColNum));
    + .recordIdColumn(rowIdColNum)
    + .statementId(conf.getStatementId())
    + .finalDestination(conf.getDestPath()));
        }

        public static PartitionDesc getPartitionDescFromPathRecursively(

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    index 8864013..3a9e64e 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    @@ -439,13 +439,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
          private final FileStatus file;
          private final FileInfo fileInfo;
          private final boolean isOriginal;
    - private final List<Long> deltas;
    + private final List<DeltaMetaData> deltas;
          private final boolean hasBase;

          SplitInfo(Context context, FileSystem fs,
              FileStatus file, FileInfo fileInfo,
              boolean isOriginal,
    - List<Long> deltas,
    + List<DeltaMetaData> deltas,
              boolean hasBase, Path dir, boolean[] covered) throws IOException {
            super(dir, context.numBuckets, deltas, covered);
            this.context = context;
    @@ -467,12 +467,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
          FileSystem fs;
          List<FileStatus> files;
          boolean isOriginal;
    - List<Long> deltas;
    + List<DeltaMetaData> deltas;
          Path dir;
          boolean[] covered;

          public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
    - boolean isOriginal, List<Long> deltas, boolean[] covered) {
    + boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
            this.context = context;
            this.dir = dir;
            this.fs = fs;
    @@ -543,14 +543,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
        static final class BISplitStrategy extends ACIDSplitStrategy {
          List<FileStatus> fileStatuses;
          boolean isOriginal;
    - List<Long> deltas;
    + List<DeltaMetaData> deltas;
          FileSystem fs;
          Context context;
          Path dir;

          public BISplitStrategy(Context context, FileSystem fs,
              Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
    - List<Long> deltas, boolean[] covered) {
    + List<DeltaMetaData> deltas, boolean[] covered) {
            super(dir, context.numBuckets, deltas, covered);
            this.context = context;
            this.fileStatuses = fileStatuses;
    @@ -587,11 +587,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         */
        static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
          Path dir;
    - List<Long> deltas;
    + List<DeltaMetaData> deltas;
          boolean[] covered;
          int numBuckets;

    - public ACIDSplitStrategy(Path dir, int numBuckets, List<Long> deltas, boolean[] covered) {
    + public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
            this.dir = dir;
            this.numBuckets = numBuckets;
            this.deltas = deltas;
    @@ -640,7 +640,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
            final SplitStrategy splitStrategy;
            AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
                context.conf, context.transactionList);
    - List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
    + List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
            Path base = dirInfo.getBaseDirectory();
            List<FileStatus> original = dirInfo.getOriginalFiles();
            boolean[] covered = new boolean[context.numBuckets];
    @@ -718,7 +718,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
          private Metadata metadata;
          private List<OrcProto.Type> types;
          private final boolean isOriginal;
    - private final List<Long> deltas;
    + private final List<DeltaMetaData> deltas;
          private final boolean hasBase;
          private OrcFile.WriterVersion writerVersion;
          private long projColsUncompressedSize;

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    index da23544..b58c880 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    @@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
      import java.util.ArrayList;
      import java.util.List;

    +import org.apache.hadoop.hive.ql.io.AcidInputFormat;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.WritableUtils;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    @@ -37,7 +38,7 @@ public class OrcNewSplit extends FileSplit {
        private boolean hasFooter;
        private boolean isOriginal;
        private boolean hasBase;
    - private final List<Long> deltas = new ArrayList<Long>();
    + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
        private OrcFile.WriterVersion writerVersion;

        protected OrcNewSplit(){
    @@ -67,8 +68,8 @@ public class OrcNewSplit extends FileSplit {
              (hasFooter ? OrcSplit.FOOTER_FLAG : 0);
          out.writeByte(flags);
          out.writeInt(deltas.size());
    - for(Long delta: deltas) {
    - out.writeLong(delta);
    + for(AcidInputFormat.DeltaMetaData delta: deltas) {
    + delta.write(out);
          }
          if (hasFooter) {
            // serialize FileMetaInfo fields
    @@ -101,7 +102,9 @@ public class OrcNewSplit extends FileSplit {
          deltas.clear();
          int numDeltas = in.readInt();
          for(int i=0; i < numDeltas; i++) {
    - deltas.add(in.readLong());
    + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
    + dmd.readFields(in);
    + deltas.add(dmd);
          }
          if (hasFooter) {
            // deserialize FileMetaInfo fields
    @@ -137,7 +140,7 @@ public class OrcNewSplit extends FileSplit {
          return hasBase;
        }

    - public List<Long> getDeltas() {
    + public List<AcidInputFormat.DeltaMetaData> getDeltas() {
          return deltas;
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    index 728118a..2f11611 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    @@ -72,41 +72,55 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
        /**
         * A RecordIdentifier extended with the current transaction id. This is the
         * key of our merge sort with the originalTransaction, bucket, and rowId
    - * ascending and the currentTransaction descending. This means that if the
    + * ascending and the currentTransaction, statementId descending. This means that if the
         * reader is collapsing events to just the last update, just the first
         * instance of each record is required.
         */
        final static class ReaderKey extends RecordIdentifier{
          private long currentTransactionId;
    + private int statementId;//sort on this descending, like currentTransactionId

          public ReaderKey() {
    - this(-1, -1, -1, -1);
    + this(-1, -1, -1, -1, 0);
          }

          public ReaderKey(long originalTransaction, int bucket, long rowId,
                           long currentTransactionId) {
    + this(originalTransaction, bucket, rowId, currentTransactionId, 0);
    + }
    + /**
    + * @param statementId - set this to 0 if N/A
    + */
    + public ReaderKey(long originalTransaction, int bucket, long rowId,
    + long currentTransactionId, int statementId) {
            super(originalTransaction, bucket, rowId);
            this.currentTransactionId = currentTransactionId;
    + this.statementId = statementId;
          }

          @Override
          public void set(RecordIdentifier other) {
            super.set(other);
            currentTransactionId = ((ReaderKey) other).currentTransactionId;
    + statementId = ((ReaderKey) other).statementId;
          }

          public void setValues(long originalTransactionId,
                                int bucket,
                                long rowId,
    - long currentTransactionId) {
    + long currentTransactionId,
    + int statementId) {
            setValues(originalTransactionId, bucket, rowId);
            this.currentTransactionId = currentTransactionId;
    + this.statementId = statementId;
          }

          @Override
          public boolean equals(Object other) {
            return super.equals(other) &&
    - currentTransactionId == ((ReaderKey) other).currentTransactionId;
    + currentTransactionId == ((ReaderKey) other).currentTransactionId
    + && statementId == ((ReaderKey) other).statementId//consistent with compareTo()
    + ;
          }

          @Override
    @@ -118,6 +132,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                if (currentTransactionId != oth.currentTransactionId) {
                  return currentTransactionId < oth.currentTransactionId ? +1 : -1;
                }
    + if(statementId != oth.statementId) {
    + return statementId < oth.statementId ? +1 : -1;
    + }
              } else {
                return -1;
              }
    @@ -125,6 +142,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            return sup;
          }

    + /**
    + * This means 1 txn modified the same row more than once
    + */
    + private boolean isSameRow(ReaderKey other) {
    + return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
    + }
    +
          public long getCurrentTransactionId() {
            return currentTransactionId;
          }
    @@ -142,7 +166,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          public String toString() {
            return "{originalTxn: " + getTransactionId() + ", bucket: " +
                getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
    - currentTransactionId + "}";
    + currentTransactionId + ", statementId: "+ statementId + "}";
          }
        }

    @@ -159,6 +183,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          final ReaderKey key;
          final RecordIdentifier maxKey;
          final int bucket;
    + private final int statementId;

          /**
           * Create a reader that reads from the first key larger than minKey to any
    @@ -170,17 +195,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           * @param maxKey only return keys less than or equal to maxKey if it is
           * non-null
           * @param options options to provide to read the rows.
    + * @param statementId id of SQL statement within a transaction
           * @throws IOException
           */
          ReaderPair(ReaderKey key, Reader reader, int bucket,
                     RecordIdentifier minKey, RecordIdentifier maxKey,
    - ReaderImpl.Options options) throws IOException {
    + ReaderImpl.Options options, int statementId) throws IOException {
            this.reader = reader;
            this.key = key;
            this.maxKey = maxKey;
            this.bucket = bucket;
            // TODO use stripe statistics to jump over stripes
            recordReader = reader.rowsOptions(options);
    + this.statementId = statementId;
            // advance the reader until we reach the minimum key
            do {
              next(nextRecord);
    @@ -195,7 +222,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
                  OrcRecordUpdater.getBucket(nextRecord),
                  OrcRecordUpdater.getRowId(nextRecord),
    - OrcRecordUpdater.getCurrentTransaction(nextRecord));
    + OrcRecordUpdater.getCurrentTransaction(nextRecord),
    + statementId);

              // if this record is larger than maxKey, we need to stop
              if (maxKey != null && key.compareRow(maxKey) > 0) {
    @@ -223,7 +251,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
                             RecordIdentifier minKey, RecordIdentifier maxKey,
                             Reader.Options options) throws IOException {
    - super(key, reader, bucket, minKey, maxKey, options);
    + super(key, reader, bucket, minKey, maxKey, options, 0);
          }

          @Override
    @@ -263,7 +291,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                nextRecord.setFieldValue(OrcRecordUpdater.ROW,
                    recordReader.next(OrcRecordUpdater.getRow(next)));
              }
    - key.setValues(0L, bucket, nextRowId, 0L);
    + key.setValues(0L, bucket, nextRowId, 0L, 0);
              if (maxKey != null && key.compareRow(maxKey) > 0) {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("key " + key + " > maxkey " + maxKey);
    @@ -415,7 +443,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          this.offset = options.getOffset();
          this.length = options.getLength();
          this.validTxnList = validTxnList;
    - // modify the optins to reflect the event instead of the base row
    + // modify the options to reflect the event instead of the base row
          Reader.Options eventOptions = createEventOptions(options);
          if (reader == null) {
            baseReader = null;
    @@ -438,7 +466,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                                            options);
            } else {
              pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
    - eventOptions);
    + eventOptions, 0);
            }

            // if there is at least one record, put it in the map
    @@ -458,13 +486,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            for(Path delta: deltaDirectory) {
              ReaderKey key = new ReaderKey();
              Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
    + AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
              FileSystem fs = deltaFile.getFileSystem(conf);
              long length = getLastFlushLength(fs, deltaFile);
              if (length != -1 && fs.exists(deltaFile)) {
                Reader deltaReader = OrcFile.createReader(deltaFile,
                    OrcFile.readerOptions(conf).maxLength(length));
                ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
    - maxKey, eventOptions);
    + maxKey, eventOptions, deltaDir.getStatementId());
                if (deltaPair.nextRecord != null) {
                  readers.put(key, deltaPair);
                }
    @@ -580,9 +609,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              continue;
            }

    + /*for multi-statement txns, you may have multiple events for the same
    + * row in the same (current) transaction. We want to collapse these to just the last one
    + * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the
    + * same row in the same txn. There is no benefit passing along anything except the last
    + * event. If we did want to pass it along, we'd have to include statementId in the row
    + * returned so that compaction could write it out or make minor minor compaction understand
    + * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any
    + * value in this.*/
    + boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
            // if we are collapsing, figure out if this is a new row
    - if (collapse) {
    - keysSame = prevKey.compareRow(recordIdentifier) == 0;
    + if (collapse || isSameRow) {
    + keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
              if (!keysSame) {
                prevKey.set(recordIdentifier);
              }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    index b576496..e4651b8 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    @@ -89,6 +89,7 @@ public class OrcRecordUpdater implements RecordUpdater {
        private final IntWritable bucket = new IntWritable();
        private final LongWritable rowId = new LongWritable();
        private long insertedRows = 0;
    + private long rowIdOffset = 0;
        // This records how many rows have been inserted or deleted. It is separate from insertedRows
        // because that is monotonically increasing to give new unique row ids.
        private long rowCountDelta = 0;
    @@ -263,6 +264,41 @@ public class OrcRecordUpdater implements RecordUpdater {
          item.setFieldValue(ROW_ID, rowId);
        }

    + /**
    + * To handle multiple INSERT... statements in a single transaction, we want to make sure
    + * to generate unique {@code rowId} for all inserted rows of the transaction.
    + * @return largest rowId created by previous statements (maybe 0)
    + * @throws IOException
    + */
    + private long findRowIdOffsetForInsert() throws IOException {
    + /*
    + * 1. need to know bucket we are writing to
    + * 2. need to know which delta dir it's in
    + * Then,
    + * 1. find the same bucket file in previous delta dir for this txn
    + * 2. read the footer and get AcidStats which has insert count
    + * 2.1 if AcidStats.inserts>0 done
    + * else go to previous delta file
    + * For example, consider insert/update/insert case...*/
    + if(options.getStatementId() <= 0) {
    + return 0;//there is only 1 statement in this transaction (so far)
    + }
    + for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
    + Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
    + if(!fs.exists(matchingBucket)) {
    + continue;
    + }
    + Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration()));
    + //no close() on Reader?!
    + AcidStats acidStats = parseAcidStats(reader);
    + if(acidStats.inserts > 0) {
    + return acidStats.inserts;
    + }
    + }
    + //if we got here, we looked at all delta files in this txn, prior to current statement and didn't
    + //find any inserts...
    + return 0;
    + }
        // Find the record identifier column (if there) and return a possibly new ObjectInspector that
        // will strain out the record id for the underlying writer.
        private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
    @@ -304,6 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater {
                recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
            rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
          }
    + else if(operation == INSERT_OPERATION) {
    + rowId += rowIdOffset;
    + }
          this.rowId.set(rowId);
          this.originalTransaction.set(originalTransaction);
          item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
    @@ -315,6 +354,9 @@ public class OrcRecordUpdater implements RecordUpdater {
        public void insert(long currentTransaction, Object row) throws IOException {
          if (this.currentTransaction.get() != currentTransaction) {
            insertedRows = 0;
    + //this method is almost no-op in hcatalog.streaming case since statementId == 0 is
    + //always true in that case
    + rowIdOffset = findRowIdOffsetForInsert();
          }
          addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
          rowCountDelta++;
    @@ -407,6 +449,22 @@ public class OrcRecordUpdater implements RecordUpdater {
          }
          return result;
        }
    + /**
    + * {@link KeyIndexBuilder} creates these
    + */
    + static AcidStats parseAcidStats(Reader reader) {
    + String statsSerialized;
    + try {
    + ByteBuffer val =
    + reader.getMetadataValue(OrcRecordUpdater.ACID_STATS)
    + .duplicate();
    + statsSerialized = utf8Decoder.decode(val).toString();
    + } catch (CharacterCodingException e) {
    + throw new IllegalArgumentException("Bad string encoding for " +
    + OrcRecordUpdater.ACID_STATS, e);
    + }
    + return new AcidStats(statsSerialized);
    + }

        static class KeyIndexBuilder implements OrcFile.WriterCallback {
          StringBuilder lastKey = new StringBuilder();

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    index 0c7dd40..8cf4cc0 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    @@ -26,6 +26,8 @@ import java.util.ArrayList;
      import java.util.List;

      import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.io.AcidInputFormat;
    +import org.apache.hadoop.hive.ql.io.AcidUtils;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.WritableUtils;
      import org.apache.hadoop.mapred.FileSplit;
    @@ -41,7 +43,7 @@ public class OrcSplit extends FileSplit {
        private boolean hasFooter;
        private boolean isOriginal;
        private boolean hasBase;
    - private final List<Long> deltas = new ArrayList<Long>();
    + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
        private OrcFile.WriterVersion writerVersion;
        private long projColsUncompressedSize;

    @@ -58,7 +60,7 @@ public class OrcSplit extends FileSplit {

        public OrcSplit(Path path, long offset, long length, String[] hosts,
            ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
    - List<Long> deltas, long projectedDataSize) {
    + List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
          super(path, offset, length, hosts);
          this.fileMetaInfo = fileMetaInfo;
          hasFooter = this.fileMetaInfo != null;
    @@ -78,8 +80,8 @@ public class OrcSplit extends FileSplit {
              (hasFooter ? FOOTER_FLAG : 0);
          out.writeByte(flags);
          out.writeInt(deltas.size());
    - for(Long delta: deltas) {
    - out.writeLong(delta);
    + for(AcidInputFormat.DeltaMetaData delta: deltas) {
    + delta.write(out);
          }
          if (hasFooter) {
            // serialize FileMetaInfo fields
    @@ -112,7 +114,9 @@ public class OrcSplit extends FileSplit {
          deltas.clear();
          int numDeltas = in.readInt();
          for(int i=0; i < numDeltas; i++) {
    - deltas.add(in.readLong());
    + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
    + dmd.readFields(in);
    + deltas.add(dmd);
          }
          if (hasFooter) {
            // deserialize FileMetaInfo fields
    @@ -148,7 +152,7 @@ public class OrcSplit extends FileSplit {
          return hasBase;
        }

    - public List<Long> getDeltas() {
    + public List<AcidInputFormat.DeltaMetaData> getDeltas() {
          return deltas;
        }


    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    index f8fff1a..445f606 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    @@ -52,6 +52,14 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        private DbLockManager lockMgr = null;
        private IMetaStoreClient client = null;
        private long txnId = 0;
    + /**
    + * assigns a unique monotonically increasing ID to each statement
    + * which is part of an open transaction. This is used by storage
    + * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
    + * to keep apart multiple writes of the same data within the same transaction
    + * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
    + */
    + private int statementId = -1;

        DbTxnManager() {
        }
    @@ -69,6 +77,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
          init();
          try {
            txnId = client.openTxn(user);
    + statementId = 0;
            LOG.debug("Opened txn " + txnId);
            return txnId;
          } catch (TException e) {
    @@ -222,7 +231,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            return null;
          }

    - List<HiveLock> locks = new ArrayList<HiveLock>(1);
    + List<HiveLock> locks = new ArrayList<HiveLock>(1);
    + if(txnId > 0) {
    + statementId++;
    + }
          LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
          ctx.setHiveLocks(locks);
          return lockState;
    @@ -249,6 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
                e);
          } finally {
            txnId = 0;
    + statementId = -1;
          }
        }

    @@ -270,6 +283,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
                e);
          } finally {
            txnId = 0;
    + statementId = -1;
          }
        }

    @@ -361,5 +375,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            }
          }
        }
    + @Override
    + public int getStatementId() {
    + return statementId;
    + }

      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    index 21ab8ee..1906982 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    @@ -54,6 +54,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
        }

        @Override
    + public int getStatementId() {
    + return 0;
    + }
    + @Override
        public HiveLockManager getLockManager() throws LockException {
          if (lockMgr == null) {
            boolean supportConcurrency =

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    index 2dd0c7d..6c3dc33 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    @@ -127,4 +127,7 @@ public interface HiveTxnManager {
         * @return true if this transaction manager does ACID
         */
        boolean supportsAcid();
    +
    + int getStatementId();
    +
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    index b02374e..8516631 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    @@ -6605,7 +6605,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
            rsCtx.getNumFiles(),
            rsCtx.getTotalFiles(),
            rsCtx.getPartnCols(),
    - dpCtx);
    + dpCtx,
    + dest_path);

          // If this is an insert, update, or delete on an ACID table then mark that so the
          // FileSinkOperator knows how to properly write to it.

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    index bb6cee5..f73b502 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    @@ -92,16 +92,21 @@ public class FileSinkDesc extends AbstractOperatorDesc {
        // Record what type of write this is. Default is non-ACID (ie old style).
        private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
        private long txnId = 0; // transaction id for this operation
    + private int statementId = -1;

        private transient Table table;
    + private Path destPath;

        public FileSinkDesc() {
        }

    + /**
    + * @param destPath - the final destination for data
    + */
        public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
            final boolean compressed, final int destTableId, final boolean multiFileSpray,
            final boolean canBeMerged, final int numFiles, final int totalFiles,
    - final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx) {
    + final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) {

          this.dirName = dirName;
          this.tableInfo = tableInfo;
    @@ -114,6 +119,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
          this.partitionCols = partitionCols;
          this.dpCtx = dpCtx;
          this.dpSortState = DPSortState.NONE;
    + this.destPath = destPath;
        }

        public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
    @@ -135,7 +141,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
        public Object clone() throws CloneNotSupportedException {
          FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
              destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
    - partitionCols, dpCtx);
    + partitionCols, dpCtx, destPath);
          ret.setCompressCodec(compressCodec);
          ret.setCompressType(compressType);
          ret.setGatherStats(gatherStats);
    @@ -231,9 +237,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
          return temporary;
        }

    - /**
    - * @param totalFiles the totalFiles to set
    - */
        public void setTemporary(boolean temporary) {
          this.temporary = temporary;
        }
    @@ -438,11 +441,23 @@ public class FileSinkDesc extends AbstractOperatorDesc {
        public void setTransactionId(long id) {
          txnId = id;
        }
    -
        public long getTransactionId() {
          return txnId;
        }

    + public void setStatementId(int id) {
    + statementId = id;
    + }
    + /**
    + * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)}
    + */
    + public int getStatementId() {
    + return statementId;
    + }
    + public Path getDestPath() {
    + return destPath;
    + }
    +
        public Table getTable() {
          return table;
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    index c5f2d4d..6c77ba4 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    @@ -545,7 +545,9 @@ public class CompactorMR {
                  .reporter(reporter)
                  .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
                  .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
    - .bucket(bucket);
    + .bucket(bucket)
    + .statementId(-1);//setting statementId == -1 makes compacted delta files use
    + //delta_xxxx_yyyy format

              // Instantiate the underlying output format
              @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    index e400778..c6ae030 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    @@ -303,7 +303,8 @@ public class TestFileSinkOperator {
            Map<String, String> partColNames = new HashMap<String, String>(1);
            partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
            dpCtx.setInputToDPCols(partColNames);
    - desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
    + //todo: does this need the finalDestination?
    + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
          } else {
            desc = new FileSinkDesc(basePath, tableDesc, false);
          }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    index 1e3df34..f8ded12 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    @@ -46,17 +46,23 @@ public class TestAcidUtils {
              AcidUtils.createFilename(p, options).toString());
          options.bucket(123);
          assertEquals("/tmp/00123_0",
    - AcidUtils.createFilename(p, options).toString());
    + AcidUtils.createFilename(p, options).toString());
          options.bucket(23)
              .minimumTransactionId(100)
              .maximumTransactionId(200)
              .writingBase(true)
              .setOldStyle(false);
          assertEquals("/tmp/base_0000200/bucket_00023",
    - AcidUtils.createFilename(p, options).toString());
    + AcidUtils.createFilename(p, options).toString());
          options.writingBase(false);
    + assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
    + AcidUtils.createFilename(p, options).toString());
    + options.statementId(-1);
          assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
    - AcidUtils.createFilename(p, options).toString());
    + AcidUtils.createFilename(p, options).toString());
    + options.statementId(7);
    + assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
    + AcidUtils.createFilename(p, options).toString());
        }

        @Test
    @@ -236,7 +242,6 @@ public class TestAcidUtils {
              new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
              new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
              new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
    - new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
              new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
          Path part = new MockPath(fs, "mock:/tbl/part1");
          AcidUtils.Directory dir =
    @@ -254,6 +259,45 @@ public class TestAcidUtils {
          assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString());
        }

    + /**
    + * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns
    + * @throws Exception
    + */
    + @Test
    + public void testOverlapingDelta2() throws Exception {
    + Configuration conf = new Configuration();
    + MockFileSystem fs = new MockFileSystem(conf,
    + new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
    + Path part = new MockPath(fs, "mock:/tbl/part1");
    + AcidUtils.Directory dir =
    + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
    + assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
    + List<FileStatus> obsolete = dir.getObsolete();
    + assertEquals(5, obsolete.size());
    + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString());
    + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
    + assertEquals(5, delts.size());
    + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString());
    + }
    +
        @Test
        public void deltasWithOpenTxnInRead() throws Exception {
          Configuration conf = new Configuration();
    @@ -268,6 +312,27 @@ public class TestAcidUtils {
          assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
        }

    + /**
    + * @since 1.3.0
    + * @throws Exception
    + */
    + @Test
    + public void deltasWithOpenTxnInRead2() throws Exception {
    + Configuration conf = new Configuration();
    + MockFileSystem fs = new MockFileSystem(conf,
    + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
    + Path part = new MockPath(fs, "mock:/tbl/part1");
    + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
    + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
    + assertEquals(2, delts.size());
    + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
    + }
    +
        @Test
        public void deltasWithOpenTxnsNotInCompact() throws Exception {
          Configuration conf = new Configuration();

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    index 56e5f9f..e96ab2a 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
      import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
      import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
      import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.hadoop.hive.ql.io.AcidInputFormat;
      import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
      import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
      import org.apache.hadoop.hive.ql.io.HiveInputFormat;
    @@ -927,7 +928,7 @@ public class TestInputOutputFormat {
          OrcInputFormat.SplitGenerator splitter =
              new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
                  fs.getFileStatus(new Path("/a/file")), null, true,
    - new ArrayList<Long>(), true, null, null));
    + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
          OrcSplit result = splitter.createSplit(0, 200, null);
          assertEquals(0, result.getStart());
          assertEquals(200, result.getLength());
    @@ -968,7 +969,7 @@ public class TestInputOutputFormat {
          OrcInputFormat.SplitGenerator splitter =
              new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
                  fs.getFileStatus(new Path("/a/file")), null, true,
    - new ArrayList<Long>(), true, null, null));
    + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
          List<OrcSplit> results = splitter.call();
          OrcSplit result = results.get(0);
          assertEquals(3, result.getStart());
    @@ -990,7 +991,7 @@ public class TestInputOutputFormat {
          conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
          context = new OrcInputFormat.Context(conf);
          splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
    - fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<Long>(),
    + fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(),
              true, null, null));
          results = splitter.call();
          for(int i=0; i < stripeSizes.length; ++i) {
    @@ -1497,7 +1498,7 @@ public class TestInputOutputFormat {
          Path partDir = new Path(conf.get("mapred.input.dir"));
          OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
              new AcidOutputFormat.Options(conf).maximumTransactionId(10)
    - .writingBase(true).bucket(0).inspector(inspector));
    + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir));
          for(int i=0; i < 100; ++i) {
            BigRow row = new BigRow(i);
            writer.insert(10, row);
    @@ -1648,7 +1649,7 @@ public class TestInputOutputFormat {
          // write a base file in partition 0
          OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
              new AcidOutputFormat.Options(conf).maximumTransactionId(10)
    - .writingBase(true).bucket(0).inspector(inspector));
    + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0]));
          for(int i=0; i < 10; ++i) {
            writer.insert(10, new MyRow(i, 2 * i));
          }
    @@ -1661,7 +1662,7 @@ public class TestInputOutputFormat {
          // write a delta file in partition 0
          writer = new OrcRecordUpdater(partDir[0],
              new AcidOutputFormat.Options(conf).maximumTransactionId(10)
    - .writingBase(true).bucket(1).inspector(inspector));
    + .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0]));
          for(int i=10; i < 20; ++i) {
            writer.insert(10, new MyRow(i, 2*i));
          }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    index 921e954..39f71f1 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    @@ -62,12 +62,12 @@ import static org.junit.Assert.assertNull;
      public class TestOrcRawRecordMerger {

        private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
    -
    +//todo: why is statementId -1?
        @Test
        public void testOrdering() throws Exception {
          ReaderKey left = new ReaderKey(100, 200, 1200, 300);
          ReaderKey right = new ReaderKey();
    - right.setValues(100, 200, 1000, 200);
    + right.setValues(100, 200, 1000, 200,1);
          assertTrue(right.compareTo(left) < 0);
          assertTrue(left.compareTo(right) > 0);
          assertEquals(false, left.equals(right));
    @@ -76,16 +76,16 @@ public class TestOrcRawRecordMerger {
          assertEquals(true, right.equals(left));
          right.setRowId(2000);
          assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 4);
    - right.setValues(100, 2, 3, 4);
    + left.setValues(1, 2, 3, 4,-1);
    + right.setValues(100, 2, 3, 4,-1);
          assertTrue(left.compareTo(right) < 0);
          assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 4);
    - right.setValues(1, 100, 3, 4);
    + left.setValues(1, 2, 3, 4,-1);
    + right.setValues(1, 100, 3, 4,-1);
          assertTrue(left.compareTo(right) < 0);
          assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 100);
    - right.setValues(1, 2, 3, 4);
    + left.setValues(1, 2, 3, 100,-1);
    + right.setValues(1, 2, 3, 4,-1);
          assertTrue(left.compareTo(right) < 0);
          assertTrue(right.compareTo(left) > 0);

    @@ -177,7 +177,7 @@ public class TestOrcRawRecordMerger {
          RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
          RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
          ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
    - new Reader.Options());
    + new Reader.Options(), 0);
          RecordReader recordReader = pair.recordReader;
          assertEquals(10, key.getTransactionId());
          assertEquals(20, key.getBucketId());
    @@ -203,7 +203,7 @@ public class TestOrcRawRecordMerger {
          Reader reader = createMockReader();

          ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
    - new Reader.Options());
    + new Reader.Options(), 0);
          RecordReader recordReader = pair.recordReader;
          assertEquals(10, key.getTransactionId());
          assertEquals(20, key.getBucketId());
    @@ -489,7 +489,7 @@ public class TestOrcRawRecordMerger {
          // write the empty base
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
              .inspector(inspector).bucket(BUCKET).writingBase(true)
    - .maximumTransactionId(100);
    + .maximumTransactionId(100).finalDestination(root);
          of.getRecordUpdater(root, options).close(false);

          ValidTxnList txnList = new ValidReadTxnList("200:");
    @@ -515,6 +515,10 @@ public class TestOrcRawRecordMerger {
         */
        @Test
        public void testNewBaseAndDelta() throws Exception {
    + testNewBaseAndDelta(false);
    + testNewBaseAndDelta(true);
    + }
    + private void testNewBaseAndDelta(boolean use130Format) throws Exception {
          final int BUCKET = 10;
          String[] values = new String[]{"first", "second", "third", "fourth",
                                         "fifth", "sixth", "seventh", "eighth",
    @@ -532,7 +536,10 @@ public class TestOrcRawRecordMerger {

          // write the base
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
    - .inspector(inspector).bucket(BUCKET);
    + .inspector(inspector).bucket(BUCKET).finalDestination(root);
    + if(!use130Format) {
    + options.statementId(-1);
    + }
          RecordUpdater ru = of.getRecordUpdater(root,
              options.writingBase(true).maximumTransactionId(100));
          for(String v: values) {
    @@ -554,7 +561,8 @@ public class TestOrcRawRecordMerger {
          AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);

          assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
    - assertEquals(new Path(root, "delta_0000200_0000200"),
    + assertEquals(new Path(root, use130Format ?
    + AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
              directory.getCurrentDirectories().get(0).getPath());

          Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
    @@ -829,7 +837,7 @@ public class TestOrcRawRecordMerger {
          // write a delta
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
              .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5);
    + .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
          RecordUpdater ru = of.getRecordUpdater(root, options);
          values = new String[]{"0.0", null, null, "1.1", null, null, null,
              "ignore.7"};
    @@ -920,6 +928,7 @@ public class TestOrcRawRecordMerger {
          options.orcOptions(OrcFile.writerOptions(conf)
            .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
            .memory(mgr));
    + options.finalDestination(root);
          RecordUpdater ru = of.getRecordUpdater(root, options);
          String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
              "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
    @@ -1004,7 +1013,8 @@ public class TestOrcRawRecordMerger {
          AcidOutputFormat.Options options =
              new AcidOutputFormat.Options(conf)
                  .bucket(BUCKET).inspector(inspector).filesystem(fs)
    - .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
    + .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
    + .finalDestination(root);
          RecordUpdater ru = of.getRecordUpdater(root, options);
          String[] values = new String[]{"a", "b", "c", "d", "e"};
          for(int i=0; i < values.length; ++i) {
    @@ -1047,6 +1057,14 @@ public class TestOrcRawRecordMerger {
         */
        @Test
        public void testRecordReaderIncompleteDelta() throws Exception {
    + testRecordReaderIncompleteDelta(false);
    + testRecordReaderIncompleteDelta(true);
    + }
    + /**
    + *
    + * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
    + */
    + private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
          final int BUCKET = 1;
          Configuration conf = new Configuration();
          OrcOutputFormat of = new OrcOutputFormat();
    @@ -1063,7 +1081,10 @@ public class TestOrcRawRecordMerger {
          AcidOutputFormat.Options options =
              new AcidOutputFormat.Options(conf)
                  .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs);
    + .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
    + if(!use130Format) {
    + options.statementId(-1);
    + }
          RecordUpdater ru = of.getRecordUpdater(root, options);
          String[] values= new String[]{"1", "2", "3", "4", "5"};
          for(int i=0; i < values.length; ++i) {
    @@ -1110,8 +1131,8 @@ public class TestOrcRawRecordMerger {
          splits = inf.getSplits(job, 1);
          assertEquals(2, splits.length);
          rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
    - Path sideFile = new Path(root +
    - "/delta_0000010_0000019/bucket_00001_flush_length");
    + Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
    + AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
          assertEquals(true, fs.exists(sideFile));
          assertEquals(24, fs.getFileStatus(sideFile).getLen());


    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    index 22bd4b9..22030b4 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    @@ -97,7 +97,8 @@ public class TestOrcRecordUpdater {
              .minimumTransactionId(10)
              .maximumTransactionId(19)
              .inspector(inspector)
    - .reporter(Reporter.NULL);
    + .reporter(Reporter.NULL)
    + .finalDestination(root);
          RecordUpdater updater = new OrcRecordUpdater(root, options);
          updater.insert(11, new MyRow("first"));
          updater.insert(11, new MyRow("second"));
    @@ -197,7 +198,8 @@ public class TestOrcRecordUpdater {
              .maximumTransactionId(100)
              .inspector(inspector)
              .reporter(Reporter.NULL)
    - .recordIdColumn(1);
    + .recordIdColumn(1)
    + .finalDestination(root);
          RecordUpdater updater = new OrcRecordUpdater(root, options);
          updater.update(100, new MyRow("update", 30, 10, bucket));
          updater.delete(100, new MyRow("", 60, 40, bucket));

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    index 671e122..21adc9d 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    @@ -241,7 +241,7 @@ public abstract class CompactorTest {
          return sd;
        }

    - // I can't do this with @Before because I want to be able to control when the thead starts
    + // I can't do this with @Before because I want to be able to control when the thread starts
        private void startThread(char type, boolean stopAfterOne) throws Exception {
          startThread(type, stopAfterOne, new AtomicBoolean());
        }
    @@ -284,7 +284,7 @@ public abstract class CompactorTest {
          switch (type) {
            case BASE: filename = "base_" + maxTxn; break;
            case LENGTH_FILE: // Fall through to delta
    - case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
    + case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
            case LEGACY: break; // handled below
          }

    @@ -508,5 +508,21 @@ public abstract class CompactorTest {
          }
        }

    + /**
    + * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that
    + * the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats
    + * are used since new (1.3) code has to be able to read old files.
    + */
    + abstract boolean useHive130DeltaDirName();

    + String makeDeltaDirName(long minTxnId, long maxTxnId) {
    + return useHive130DeltaDirName() ?
    + AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
    + }
    + /**
    + * delta dir name after compaction
    + */
    + String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
    + return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    index ffdbb9a..0db732c 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    @@ -139,7 +139,7 @@ public class TestCleaner extends CompactorTest {
          boolean sawBase = false, sawDelta = false;
          for (Path p : paths) {
            if (p.getName().equals("base_20")) sawBase = true;
    - else if (p.getName().equals("delta_21_24")) sawDelta = true;
    + else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
            else Assert.fail("Unexpected file " + p.getName());
          }
          Assert.assertTrue(sawBase);
    @@ -177,7 +177,7 @@ public class TestCleaner extends CompactorTest {
          boolean sawBase = false, sawDelta = false;
          for (Path path : paths) {
            if (path.getName().equals("base_20")) sawBase = true;
    - else if (path.getName().equals("delta_21_24")) sawDelta = true;
    + else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true;
            else Assert.fail("Unexpected file " + path.getName());
          }
          Assert.assertTrue(sawBase);
    @@ -480,4 +480,8 @@ public class TestCleaner extends CompactorTest {
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(0, rsp.getCompactsSize());
        }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
    new file mode 100644
    index 0000000..c637dd1
    --- /dev/null
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
    @@ -0,0 +1,14 @@
    +package org.apache.hadoop.hive.ql.txn.compactor;
    +
    +/**
    + * Same as TestCleaner but tests delta file names in Hive 1.3.0 format
    + */
    +public class TestCleaner2 extends TestCleaner {
    + public TestCleaner2() throws Exception {
    + super();
    + }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    index 00b13de..0b0b1da 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    @@ -713,5 +713,9 @@ public class TestInitiator extends CompactorTest {
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(0, compacts.size());
        }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
    + }

      }
  • Ekoifman at Jul 13, 2015 at 4:31 pm
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 16d1b7459 -> c30ab4686


    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    index bebac54..11e5333 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    @@ -281,7 +281,7 @@ public class TestWorker extends CompactorTest {
          // Find the new delta file and make sure it has the right contents
          boolean sawNewDelta = false;
          for (int i = 0; i < stat.length; i++) {
    - if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
    + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) {
              sawNewDelta = true;
              FileStatus[] buckets = fs.listStatus(stat[i].getPath());
              Assert.assertEquals(2, buckets.length);
    @@ -296,6 +296,10 @@ public class TestWorker extends CompactorTest {
          Assert.assertTrue(sawNewDelta);
        }

    + /**
    + * todo: fix https://issues.apache.org/jira/browse/HIVE-9995
    + * @throws Exception
    + */
        @Test
        public void minorWithOpenInMiddle() throws Exception {
          LOG.debug("Starting minorWithOpenInMiddle");
    @@ -321,15 +325,18 @@ public class TestWorker extends CompactorTest {
          // There should still now be 5 directories in the location
          FileSystem fs = FileSystem.get(conf);
          FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
    - Assert.assertEquals(5, stat.length);
    + boolean is130 = this instanceof TestWorker2;
    + Assert.assertEquals(is130 ? 5 : 4, stat.length);

          // Find the new delta file and make sure it has the right contents
          Arrays.sort(stat);
          Assert.assertEquals("base_20", stat[0].getPath().getName());
    - Assert.assertEquals("delta_0000021_0000022", stat[1].getPath().getName());
    - Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
    - Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
    - Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
    + if(is130) {//in1.3.0 orig delta is delta_00021_00022_0000 and compacted one is delta_00021_00022...
    + Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
    + }
    + Assert.assertEquals(makeDeltaDirName(21, 22), stat[1 + (is130 ? 1 : 0)].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(23, 25), stat[2 + (is130 ? 1 : 0)].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(26, 27), stat[3 + (is130 ? 1 : 0)].getPath().getName());
        }

        @Test
    @@ -362,10 +369,10 @@ public class TestWorker extends CompactorTest {
          // Find the new delta file and make sure it has the right contents
          Arrays.sort(stat);
          Assert.assertEquals("base_20", stat[0].getPath().getName());
    - Assert.assertEquals("delta_0000021_0000027", stat[1].getPath().getName());
    - Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
    - Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
    - Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
    + Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
        }

        @Test
    @@ -398,7 +405,7 @@ public class TestWorker extends CompactorTest {
          // Find the new delta file and make sure it has the right contents
          boolean sawNewDelta = false;
          for (int i = 0; i < stat.length; i++) {
    - if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
    + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) {
              sawNewDelta = true;
              FileStatus[] buckets = fs.listStatus(stat[i].getPath());
              Assert.assertEquals(2, buckets.length);
    @@ -441,7 +448,7 @@ public class TestWorker extends CompactorTest {
          // Find the new delta file and make sure it has the right contents
          boolean sawNewDelta = false;
          for (int i = 0; i < stat.length; i++) {
    - if (stat[i].getPath().getName().equals("delta_0000001_0000004")) {
    + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4))) {
              sawNewDelta = true;
              FileStatus[] buckets = fs.listStatus(stat[i].getPath());
              Assert.assertEquals(2, buckets.length);
    @@ -661,7 +668,7 @@ public class TestWorker extends CompactorTest {
          // Find the new delta file and make sure it has the right contents
          boolean sawNewDelta = false;
          for (int i = 0; i < stat.length; i++) {
    - if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
    + if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) {
              sawNewDelta = true;
              FileStatus[] buckets = fs.listStatus(stat[i].getPath());
              Assert.assertEquals(2, buckets.length);
    @@ -760,9 +767,9 @@ public class TestWorker extends CompactorTest {
          Arrays.sort(stat);
          Assert.assertEquals("base_0000022", stat[0].getPath().getName());
          Assert.assertEquals("base_20", stat[1].getPath().getName());
    - Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
    - Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
    - Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
        }

        @Test
    @@ -796,9 +803,13 @@ public class TestWorker extends CompactorTest {
          Arrays.sort(stat);
          Assert.assertEquals("base_0000027", stat[0].getPath().getName());
          Assert.assertEquals("base_20", stat[1].getPath().getName());
    - Assert.assertEquals("delta_21_22", stat[2].getPath().getName());
    - Assert.assertEquals("delta_23_25", stat[3].getPath().getName());
    - Assert.assertEquals("delta_26_27", stat[4].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
    + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
    + }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
        }

        @Test

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
    new file mode 100644
    index 0000000..3b5283a
    --- /dev/null
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
    @@ -0,0 +1,16 @@
    +package org.apache.hadoop.hive.ql.txn.compactor;
    +
    +/**
    + * Same as TestWorker but tests delta file names in Hive 1.3.0 format
    + */
    +public class TestWorker2 extends TestWorker {
    +
    + public TestWorker2() throws Exception {
    + super();
    + }
    +
    + @Override
    + boolean useHive130DeltaDirName() {
    + return true;
    + }
    +}
  • Ekoifman at Jul 13, 2015 at 4:31 pm
    HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c30ab468
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c30ab468
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c30ab468

    Branch: refs/heads/branch-1
    Commit: c30ab4686cbfe73c3cf4552fa7e07c8ded3b4b17
    Parents: 16d1b74
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Mon Jul 13 09:31:17 2015 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Mon Jul 13 09:31:17 2015 -0700

    ----------------------------------------------------------------------
      .../streaming/AbstractRecordWriter.java | 4 +-
      .../java/org/apache/hadoop/hive/ql/Driver.java | 1 +
      .../hadoop/hive/ql/io/AcidInputFormat.java | 60 +++++++-
      .../hadoop/hive/ql/io/AcidOutputFormat.java | 49 +++++-
      .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 152 +++++++++++++++----
      .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 19 +--
      .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 20 +--
      .../hadoop/hive/ql/io/orc/OrcNewSplit.java | 13 +-
      .../hive/ql/io/orc/OrcRawRecordMerger.java | 66 ++++++--
      .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 58 +++++++
      .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 16 +-
      .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 20 ++-
      .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 4 +
      .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 +
      .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 +-
      .../hadoop/hive/ql/plan/FileSinkDesc.java | 27 +++-
      .../hive/ql/txn/compactor/CompactorMR.java | 4 +-
      .../hive/ql/exec/TestFileSinkOperator.java | 3 +-
      .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 73 ++++++++-
      .../hive/ql/io/orc/TestInputOutputFormat.java | 13 +-
      .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 57 ++++---
      .../hive/ql/io/orc/TestOrcRecordUpdater.java | 6 +-
      .../hive/ql/txn/compactor/CompactorTest.java | 20 ++-
      .../hive/ql/txn/compactor/TestCleaner.java | 8 +-
      .../hive/ql/txn/compactor/TestCleaner2.java | 14 ++
      .../hive/ql/txn/compactor/TestInitiator.java | 4 +
      .../hive/ql/txn/compactor/TestWorker.java | 49 +++---
      .../hive/ql/txn/compactor/TestWorker2.java | 16 ++
      28 files changed, 642 insertions(+), 140 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    ----------------------------------------------------------------------
    diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    index ed46bca..c959222 100644
    --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
    @@ -143,7 +143,9 @@ abstract class AbstractRecordWriter implements RecordWriter {
                            .inspector(getSerde().getObjectInspector())
                            .bucket(bucketId)
                            .minimumTransactionId(minTxnId)
    - .maximumTransactionId(maxTxnID));
    + .maximumTransactionId(maxTxnID)
    + .statementId(-1)
    + .finalDestination(partitionPath));
          } catch (SerDeException e) {
            throw new SerializationError("Failed to get object inspector from Serde "
                    + getSerde().getClass().getName(), e);

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    index e04165b..d161503 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    @@ -986,6 +986,7 @@ public class Driver implements CommandProcessor {
              if (acidSinks != null) {
                for (FileSinkDesc desc : acidSinks) {
                  desc.setTransactionId(txnId);
    + desc.setStatementId(txnMgr.getStatementId());
                }
              }


    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    index e1d2395..24506b7 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    @@ -22,13 +22,19 @@ import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.common.ValidTxnList;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import org.apache.hadoop.io.Writable;
      import org.apache.hadoop.io.WritableComparable;
      import org.apache.hadoop.mapred.InputFormat;
      import org.apache.hadoop.mapred.InputSplit;
      import org.apache.hadoop.mapred.RecordReader;
      import org.apache.hadoop.mapred.Reporter;

    +import java.io.DataInput;
    +import java.io.DataOutput;
      import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;

      /**
       * The interface required for input formats that what to support ACID
    @@ -62,7 +68,7 @@ import java.io.IOException;
       * <li>New format -
       * <pre>
       * $partition/base_$tid/$bucket
    - * delta_$tid_$tid/$bucket
    + * delta_$tid_$tid_$stid/$bucket
       * </pre></li>
       * </ul>
       * <p>
    @@ -71,6 +77,8 @@ import java.io.IOException;
       * stored sorted by the original transaction id (ascending), bucket (ascending),
       * row id (ascending), and current transaction id (descending). Thus the files
       * can be merged by advancing through the files in parallel.
    + * The stid is unique id (within the transaction) of the statement that created
    + * this delta file.
       * <p>
       * The base files include all transactions from the beginning of time
       * (transaction id 0) to the transaction in the directory name. Delta
    @@ -91,7 +99,7 @@ import java.io.IOException;
       * For row-at-a-time processing, KEY can conveniently pass RowId into the operator
       * pipeline. For vectorized execution the KEY could perhaps represent a range in the batch.
       * Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return
    - * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined
    + * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader} is defined
       * to provide access to the RowId. Other implementations of AcidInputFormat can use either
       * mechanism.
       * </p>
    @@ -101,6 +109,54 @@ import java.io.IOException;
      public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
          extends InputFormat<KEY, VALUE>, InputFormatChecker {

    + static final class DeltaMetaData implements Writable {
    + private long minTxnId;
    + private long maxTxnId;
    + private List<Integer> stmtIds;
    +
    + public DeltaMetaData() {
    + this(0,0,null);
    + }
    + DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
    + this.minTxnId = minTxnId;
    + this.maxTxnId = maxTxnId;
    + this.stmtIds = stmtIds;
    + }
    + long getMinTxnId() {
    + return minTxnId;
    + }
    + long getMaxTxnId() {
    + return maxTxnId;
    + }
    + List<Integer> getStmtIds() {
    + return stmtIds;
    + }
    + @Override
    + public void write(DataOutput out) throws IOException {
    + out.writeLong(minTxnId);
    + out.writeLong(maxTxnId);
    + out.writeInt(stmtIds.size());
    + if(stmtIds == null) {
    + return;
    + }
    + for(Integer id : stmtIds) {
    + out.writeInt(id);
    + }
    + }
    + @Override
    + public void readFields(DataInput in) throws IOException {
    + minTxnId = in.readLong();
    + maxTxnId = in.readLong();
    + int numStatements = in.readInt();
    + if(numStatements <= 0) {
    + return;
    + }
    + stmtIds = new ArrayList<>();
    + for(int i = 0; i < numStatements; i++) {
    + stmtIds.add(in.readInt());
    + }
    + }
    + }
        /**
         * Options for controlling the record readers.
         */

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    index 0d537e1..dd90a95 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
    @@ -39,7 +39,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
        /**
         * Options to control how the files are written
         */
    - public static class Options {
    + public static class Options implements Cloneable {
          private final Configuration configuration;
          private FileSystem fs;
          private ObjectInspector inspector;
    @@ -53,7 +53,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
          private PrintStream dummyStream = null;
          private boolean oldStyle = false;
          private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id
    -
    + //unique within a transaction
    + private int statementId = 0;
    + private Path finalDestination;
          /**
           * Create the options object.
           * @param conf Use the given configuration
    @@ -63,6 +65,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
          }

          /**
    + * shallow clone
    + */
    + @Override
    + public Options clone() {
    + try {
    + return (Options)super.clone();
    + }
    + catch(CloneNotSupportedException ex) {
    + throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex);
    + }
    + }
    + /**
           * Use the given ObjectInspector for each record written.
           * @param inspector the inspector to use.
           * @return this
    @@ -185,6 +199,31 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
            return this;
          }

    + /**
    + * @since 1.3.0
    + * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
    + * This is primarily needed for testing to make sure 1.3 code can still read files created
    + * by older code. Also used by Comactor.
    + */
    + public Options statementId(int id) {
    + if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) {
    + throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId);
    + }
    + if(id < -1) {
    + throw new IllegalArgumentException("Illegal statementId value: " + id);
    + }
    + this.statementId = id;
    + return this;
    + }
    + /**
    + * @param p where the data for this operation will eventually end up;
    + * basically table or partition directory in FS
    + */
    + public Options finalDestination(Path p) {
    + this.finalDestination = p;
    + return this;
    + }
    +
          public Configuration getConfiguration() {
            return configuration;
          }
    @@ -236,6 +275,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
          boolean getOldStyle() {
            return oldStyle;
          }
    + public int getStatementId() {
    + return statementId;
    + }
    + public Path getFinalDestination() {
    + return finalDestination;
    + }
        }

        /**

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    index 2214733..c7e0780 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    @@ -67,6 +67,15 @@ public class AcidUtils {
        };
        public static final String BUCKET_DIGITS = "%05d";
        public static final String DELTA_DIGITS = "%07d";
    + /**
    + * 10K statements per tx. Probably overkill ... since that many delta files
    + * would not be good for performance
    + */
    + public static final String STATEMENT_DIGITS = "%04d";
    + /**
    + * This must be in sync with {@link #STATEMENT_DIGITS}
    + */
    + public static final int MAX_STATEMENTS_PER_TXN = 10000;
        public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
        public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
        public static final PathFilter originalBucketFilter = new PathFilter() {
    @@ -79,7 +88,7 @@ public class AcidUtils {
        private AcidUtils() {
          // NOT USED
        }
    - private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
    + private static final Log LOG = LogFactory.getLog(AcidUtils.class);

        private static final Pattern ORIGINAL_PATTERN =
            Pattern.compile("[0-9]+_[0-9]+");
    @@ -104,12 +113,23 @@ public class AcidUtils {
              BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
        }

    - private static String deltaSubdir(long min, long max) {
    + /**
    + * This is format of delta dir name prior to Hive 1.3.x
    + */
    + public static String deltaSubdir(long min, long max) {
          return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
              String.format(DELTA_DIGITS, max);
        }

        /**
    + * Each write statement in a transaction creates its own delta dir.
    + * @since 1.3.x
    + */
    + public static String deltaSubdir(long min, long max, int statementId) {
    + return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
    + }
    +
    + /**
         * Create a filename for a bucket file.
         * @param directory the partition directory
         * @param options the options for writing the bucket
    @@ -124,9 +144,15 @@ public class AcidUtils {
          } else if (options.isWritingBase()) {
            subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
                options.getMaximumTransactionId());
    + } else if(options.getStatementId() == -1) {
    + //when minor compaction runs, we collapse per statement delta files inside a single
    + //transaction so we no longer need a statementId in the file name
    + subdir = deltaSubdir(options.getMinimumTransactionId(),
    + options.getMaximumTransactionId());
          } else {
            subdir = deltaSubdir(options.getMinimumTransactionId(),
    - options.getMaximumTransactionId());
    + options.getMaximumTransactionId(),
    + options.getStatementId());
          }
          return createBucketFile(new Path(directory, subdir), options.getBucket());
        }
    @@ -214,14 +240,24 @@ public class AcidUtils {
        }

        public static class ParsedDelta implements Comparable<ParsedDelta> {
    - final long minTransaction;
    - final long maxTransaction;
    - final FileStatus path;
    + private final long minTransaction;
    + private final long maxTransaction;
    + private final FileStatus path;
    + //-1 is for internal (getAcidState()) purposes and means the delta dir
    + //had no statement ID
    + private final int statementId;

    + /**
    + * for pre 1.3.x delta files
    + */
          ParsedDelta(long min, long max, FileStatus path) {
    + this(min, max, path, -1);
    + }
    + ParsedDelta(long min, long max, FileStatus path, int statementId) {
            this.minTransaction = min;
            this.maxTransaction = max;
            this.path = path;
    + this.statementId = statementId;
          }

          public long getMinTransaction() {
    @@ -236,6 +272,16 @@ public class AcidUtils {
            return path.getPath();
          }

    + public int getStatementId() {
    + return statementId == -1 ? 0 : statementId;
    + }
    +
    + /**
    + * Compactions (Major/Minor) merge deltas/bases but delete of old files
    + * happens in a different process; thus it's possible to have bases/deltas with
    + * overlapping txnId boundaries. The sort order helps figure out the "best" set of files
    + * to use to get data.
    + */
          @Override
          public int compareTo(ParsedDelta parsedDelta) {
            if (minTransaction != parsedDelta.minTransaction) {
    @@ -250,7 +296,22 @@ public class AcidUtils {
              } else {
                return -1;
              }
    - } else {
    + }
    + else if(statementId != parsedDelta.statementId) {
    + /**
    + * We want deltas after minor compaction (w/o statementId) to sort
    + * earlier so that getAcidState() considers compacted files (into larger ones) obsolete
    + * Before compaction, include deltas with all statementIds for a given txnId
    + * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory}
    + */
    + if(statementId < parsedDelta.statementId) {
    + return -1;
    + }
    + else {
    + return 1;
    + }
    + }
    + else {
              return path.compareTo(parsedDelta.path);
            }
          }
    @@ -271,46 +332,72 @@ public class AcidUtils {

        /**
         * Convert the list of deltas into an equivalent list of begin/end
    - * transaction id pairs.
    + * transaction id pairs. Assumes {@code deltas} is sorted.
         * @param deltas
         * @return the list of transaction ids to serialize
         */
    - public static List<Long> serializeDeltas(List<ParsedDelta> deltas) {
    - List<Long> result = new ArrayList<Long>(deltas.size() * 2);
    - for(ParsedDelta delta: deltas) {
    - result.add(delta.minTransaction);
    - result.add(delta.maxTransaction);
    + public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) {
    + List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
    + AcidInputFormat.DeltaMetaData last = null;
    + for(ParsedDelta parsedDelta : deltas) {
    + if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) {
    + last.getStmtIds().add(parsedDelta.getStatementId());
    + continue;
    + }
    + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>());
    + result.add(last);
    + if(parsedDelta.statementId >= 0) {
    + last.getStmtIds().add(parsedDelta.getStatementId());
    + }
          }
          return result;
        }

        /**
         * Convert the list of begin/end transaction id pairs to a list of delta
    - * directories.
    + * directories. Note that there may be multiple delta files for the exact same txn range starting
    + * with 1.3.x;
    + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
         * @param root the root directory
         * @param deltas list of begin/end transaction id pairs
         * @return the list of delta paths
         */
    - public static Path[] deserializeDeltas(Path root, List<Long> deltas) {
    - int deltaSize = deltas.size() / 2;
    - Path[] result = new Path[deltaSize];
    - for(int i = 0; i < deltaSize; ++i) {
    - result[i] = new Path(root, deltaSubdir(deltas.get(i * 2),
    - deltas.get(i * 2 + 1)));
    + public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
    + List<Path> results = new ArrayList<Path>(deltas.size());
    + for(AcidInputFormat.DeltaMetaData dmd : deltas) {
    + if(dmd.getStmtIds().isEmpty()) {
    + results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
    + continue;
    + }
    + for(Integer stmtId : dmd.getStmtIds()) {
    + results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
    + }
          }
    - return result;
    + return results.toArray(new Path[results.size()]);
        }

    - static ParsedDelta parseDelta(FileStatus path) {
    - String filename = path.getPath().getName();
    + private static ParsedDelta parseDelta(FileStatus path) {
    + ParsedDelta p = parsedDelta(path.getPath());
    + return new ParsedDelta(p.getMinTransaction(),
    + p.getMaxTransaction(), path, p.statementId);
    + }
    + public static ParsedDelta parsedDelta(Path deltaDir) {
    + String filename = deltaDir.getName();
          if (filename.startsWith(DELTA_PREFIX)) {
            String rest = filename.substring(DELTA_PREFIX.length());
            int split = rest.indexOf('_');
    + int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
            long min = Long.parseLong(rest.substring(0, split));
    - long max = Long.parseLong(rest.substring(split + 1));
    - return new ParsedDelta(min, max, path);
    + long max = split2 == -1 ?
    + Long.parseLong(rest.substring(split + 1)) :
    + Long.parseLong(rest.substring(split + 1, split2));
    + if(split2 == -1) {
    + return new ParsedDelta(min, max, null);
    + }
    + int statementId = Integer.parseInt(rest.substring(split2 + 1));
    + return new ParsedDelta(min, max, null, statementId);
          }
    - throw new IllegalArgumentException(path + " does not start with " +
    + throw new IllegalArgumentException(deltaDir + " does not start with " +
                                             DELTA_PREFIX);
        }

    @@ -407,15 +494,24 @@ public class AcidUtils {

          Collections.sort(working);
          long current = bestBaseTxn;
    + int lastStmtId = -1;
          for(ParsedDelta next: working) {
            if (next.maxTransaction > current) {
              // are any of the new transactions ones that we care about?
              if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
    - ValidTxnList.RangeResponse.NONE) {
    + ValidTxnList.RangeResponse.NONE) {
                deltas.add(next);
                current = next.maxTransaction;
    + lastStmtId = next.statementId;
              }
    - } else {
    + }
    + else if(next.maxTransaction == current && lastStmtId >= 0) {
    + //make sure to get all deltas within a single transaction; multi-statement txn
    + //generate multiple delta files with the same txnId range
    + //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
    + deltas.add(next);
    + }
    + else {
              obsolete.add(next.path);
            }
          }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    index 7ad5aa0..50ba740 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    @@ -297,31 +297,32 @@ public final class HiveFileFormatUtils {
          // TODO not 100% sure about this. This call doesn't set the compression type in the conf
          // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
          // sure if this is correct or not.
    - return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
    - bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
    + return getRecordUpdater(jc, acidOutputFormat,
    + bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
        }


        private static RecordUpdater getRecordUpdater(JobConf jc,
                                                      AcidOutputFormat<?, ?> acidOutputFormat,
    - boolean isCompressed,
    - long txnId,
                                                      int bucket,
                                                      ObjectInspector inspector,
                                                      Properties tableProp,
                                                      Path outPath,
                                                      Reporter reporter,
    - int rowIdColNum) throws IOException {
    + int rowIdColNum,
    + FileSinkDesc conf) throws IOException {
          return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
    - .isCompressed(isCompressed)
    + .isCompressed(conf.getCompressed())
              .tableProperties(tableProp)
              .reporter(reporter)
              .writingBase(false)
    - .minimumTransactionId(txnId)
    - .maximumTransactionId(txnId)
    + .minimumTransactionId(conf.getTransactionId())
    + .maximumTransactionId(conf.getTransactionId())
              .bucket(bucket)
              .inspector(inspector)
    - .recordIdColumn(rowIdColNum));
    + .recordIdColumn(rowIdColNum)
    + .statementId(conf.getStatementId())
    + .finalDestination(conf.getDestPath()));
        }

        public static PartitionDesc getPartitionDescFromPathRecursively(

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    index 7346bc4..6fe0044 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    @@ -443,13 +443,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
          private final FileStatus file;
          private final FileInfo fileInfo;
          private final boolean isOriginal;
    - private final List<Long> deltas;
    + private final List<DeltaMetaData> deltas;
          private final boolean hasBase;

          SplitInfo(Context context, FileSystem fs,
              FileStatus file, FileInfo fileInfo,
              boolean isOriginal,
    - List<Long> deltas,
    + List<DeltaMetaData> deltas,
              boolean hasBase, Path dir, boolean[] covered) throws IOException {
            super(dir, context.numBuckets, deltas, covered);
            this.context = context;
    @@ -471,12 +471,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
          FileSystem fs;
          List<FileStatus> files;
          boolean isOriginal;
    - List<Long> deltas;
    + List<DeltaMetaData> deltas;
          Path dir;
          boolean[] covered;

          public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
    - boolean isOriginal, List<Long> deltas, boolean[] covered) {
    + boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
            this.context = context;
            this.dir = dir;
            this.fs = fs;
    @@ -547,14 +547,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
        static final class BISplitStrategy extends ACIDSplitStrategy {
          List<FileStatus> fileStatuses;
          boolean isOriginal;
    - List<Long> deltas;
    + List<DeltaMetaData> deltas;
          FileSystem fs;
          Context context;
          Path dir;

          public BISplitStrategy(Context context, FileSystem fs,
              Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
    - List<Long> deltas, boolean[] covered) {
    + List<DeltaMetaData> deltas, boolean[] covered) {
            super(dir, context.numBuckets, deltas, covered);
            this.context = context;
            this.fileStatuses = fileStatuses;
    @@ -591,11 +591,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         */
        static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
          Path dir;
    - List<Long> deltas;
    + List<DeltaMetaData> deltas;
          boolean[] covered;
          int numBuckets;

    - public ACIDSplitStrategy(Path dir, int numBuckets, List<Long> deltas, boolean[] covered) {
    + public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
            this.dir = dir;
            this.numBuckets = numBuckets;
            this.deltas = deltas;
    @@ -644,7 +644,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
            final SplitStrategy splitStrategy;
            AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
                context.conf, context.transactionList);
    - List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
    + List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
            Path base = dirInfo.getBaseDirectory();
            List<FileStatus> original = dirInfo.getOriginalFiles();
            boolean[] covered = new boolean[context.numBuckets];
    @@ -722,7 +722,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
          private Metadata metadata;
          private List<OrcProto.Type> types;
          private final boolean isOriginal;
    - private final List<Long> deltas;
    + private final List<DeltaMetaData> deltas;
          private final boolean hasBase;
          private OrcFile.WriterVersion writerVersion;
          private long projColsUncompressedSize;

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    index da23544..b58c880 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    @@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
      import java.util.ArrayList;
      import java.util.List;

    +import org.apache.hadoop.hive.ql.io.AcidInputFormat;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.WritableUtils;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    @@ -37,7 +38,7 @@ public class OrcNewSplit extends FileSplit {
        private boolean hasFooter;
        private boolean isOriginal;
        private boolean hasBase;
    - private final List<Long> deltas = new ArrayList<Long>();
    + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
        private OrcFile.WriterVersion writerVersion;

        protected OrcNewSplit(){
    @@ -67,8 +68,8 @@ public class OrcNewSplit extends FileSplit {
              (hasFooter ? OrcSplit.FOOTER_FLAG : 0);
          out.writeByte(flags);
          out.writeInt(deltas.size());
    - for(Long delta: deltas) {
    - out.writeLong(delta);
    + for(AcidInputFormat.DeltaMetaData delta: deltas) {
    + delta.write(out);
          }
          if (hasFooter) {
            // serialize FileMetaInfo fields
    @@ -101,7 +102,9 @@ public class OrcNewSplit extends FileSplit {
          deltas.clear();
          int numDeltas = in.readInt();
          for(int i=0; i < numDeltas; i++) {
    - deltas.add(in.readLong());
    + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
    + dmd.readFields(in);
    + deltas.add(dmd);
          }
          if (hasFooter) {
            // deserialize FileMetaInfo fields
    @@ -137,7 +140,7 @@ public class OrcNewSplit extends FileSplit {
          return hasBase;
        }

    - public List<Long> getDeltas() {
    + public List<AcidInputFormat.DeltaMetaData> getDeltas() {
          return deltas;
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    index 728118a..2f11611 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    @@ -72,41 +72,55 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
        /**
         * A RecordIdentifier extended with the current transaction id. This is the
         * key of our merge sort with the originalTransaction, bucket, and rowId
    - * ascending and the currentTransaction descending. This means that if the
    + * ascending and the currentTransaction, statementId descending. This means that if the
         * reader is collapsing events to just the last update, just the first
         * instance of each record is required.
         */
        final static class ReaderKey extends RecordIdentifier{
          private long currentTransactionId;
    + private int statementId;//sort on this descending, like currentTransactionId

          public ReaderKey() {
    - this(-1, -1, -1, -1);
    + this(-1, -1, -1, -1, 0);
          }

          public ReaderKey(long originalTransaction, int bucket, long rowId,
                           long currentTransactionId) {
    + this(originalTransaction, bucket, rowId, currentTransactionId, 0);
    + }
    + /**
    + * @param statementId - set this to 0 if N/A
    + */
    + public ReaderKey(long originalTransaction, int bucket, long rowId,
    + long currentTransactionId, int statementId) {
            super(originalTransaction, bucket, rowId);
            this.currentTransactionId = currentTransactionId;
    + this.statementId = statementId;
          }

          @Override
          public void set(RecordIdentifier other) {
            super.set(other);
            currentTransactionId = ((ReaderKey) other).currentTransactionId;
    + statementId = ((ReaderKey) other).statementId;
          }

          public void setValues(long originalTransactionId,
                                int bucket,
                                long rowId,
    - long currentTransactionId) {
    + long currentTransactionId,
    + int statementId) {
            setValues(originalTransactionId, bucket, rowId);
            this.currentTransactionId = currentTransactionId;
    + this.statementId = statementId;
          }

          @Override
          public boolean equals(Object other) {
            return super.equals(other) &&
    - currentTransactionId == ((ReaderKey) other).currentTransactionId;
    + currentTransactionId == ((ReaderKey) other).currentTransactionId
    + && statementId == ((ReaderKey) other).statementId//consistent with compareTo()
    + ;
          }

          @Override
    @@ -118,6 +132,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                if (currentTransactionId != oth.currentTransactionId) {
                  return currentTransactionId < oth.currentTransactionId ? +1 : -1;
                }
    + if(statementId != oth.statementId) {
    + return statementId < oth.statementId ? +1 : -1;
    + }
              } else {
                return -1;
              }
    @@ -125,6 +142,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            return sup;
          }

    + /**
    + * This means 1 txn modified the same row more than once
    + */
    + private boolean isSameRow(ReaderKey other) {
    + return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
    + }
    +
          public long getCurrentTransactionId() {
            return currentTransactionId;
          }
    @@ -142,7 +166,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          public String toString() {
            return "{originalTxn: " + getTransactionId() + ", bucket: " +
                getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
    - currentTransactionId + "}";
    + currentTransactionId + ", statementId: "+ statementId + "}";
          }
        }

    @@ -159,6 +183,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          final ReaderKey key;
          final RecordIdentifier maxKey;
          final int bucket;
    + private final int statementId;

          /**
           * Create a reader that reads from the first key larger than minKey to any
    @@ -170,17 +195,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           * @param maxKey only return keys less than or equal to maxKey if it is
           * non-null
           * @param options options to provide to read the rows.
    + * @param statementId id of SQL statement within a transaction
           * @throws IOException
           */
          ReaderPair(ReaderKey key, Reader reader, int bucket,
                     RecordIdentifier minKey, RecordIdentifier maxKey,
    - ReaderImpl.Options options) throws IOException {
    + ReaderImpl.Options options, int statementId) throws IOException {
            this.reader = reader;
            this.key = key;
            this.maxKey = maxKey;
            this.bucket = bucket;
            // TODO use stripe statistics to jump over stripes
            recordReader = reader.rowsOptions(options);
    + this.statementId = statementId;
            // advance the reader until we reach the minimum key
            do {
              next(nextRecord);
    @@ -195,7 +222,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
                  OrcRecordUpdater.getBucket(nextRecord),
                  OrcRecordUpdater.getRowId(nextRecord),
    - OrcRecordUpdater.getCurrentTransaction(nextRecord));
    + OrcRecordUpdater.getCurrentTransaction(nextRecord),
    + statementId);

              // if this record is larger than maxKey, we need to stop
              if (maxKey != null && key.compareRow(maxKey) > 0) {
    @@ -223,7 +251,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
                             RecordIdentifier minKey, RecordIdentifier maxKey,
                             Reader.Options options) throws IOException {
    - super(key, reader, bucket, minKey, maxKey, options);
    + super(key, reader, bucket, minKey, maxKey, options, 0);
          }

          @Override
    @@ -263,7 +291,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                nextRecord.setFieldValue(OrcRecordUpdater.ROW,
                    recordReader.next(OrcRecordUpdater.getRow(next)));
              }
    - key.setValues(0L, bucket, nextRowId, 0L);
    + key.setValues(0L, bucket, nextRowId, 0L, 0);
              if (maxKey != null && key.compareRow(maxKey) > 0) {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("key " + key + " > maxkey " + maxKey);
    @@ -415,7 +443,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          this.offset = options.getOffset();
          this.length = options.getLength();
          this.validTxnList = validTxnList;
    - // modify the optins to reflect the event instead of the base row
    + // modify the options to reflect the event instead of the base row
          Reader.Options eventOptions = createEventOptions(options);
          if (reader == null) {
            baseReader = null;
    @@ -438,7 +466,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                                            options);
            } else {
              pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
    - eventOptions);
    + eventOptions, 0);
            }

            // if there is at least one record, put it in the map
    @@ -458,13 +486,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            for(Path delta: deltaDirectory) {
              ReaderKey key = new ReaderKey();
              Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
    + AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
              FileSystem fs = deltaFile.getFileSystem(conf);
              long length = getLastFlushLength(fs, deltaFile);
              if (length != -1 && fs.exists(deltaFile)) {
                Reader deltaReader = OrcFile.createReader(deltaFile,
                    OrcFile.readerOptions(conf).maxLength(length));
                ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
    - maxKey, eventOptions);
    + maxKey, eventOptions, deltaDir.getStatementId());
                if (deltaPair.nextRecord != null) {
                  readers.put(key, deltaPair);
                }
    @@ -580,9 +609,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              continue;
            }

    + /*for multi-statement txns, you may have multiple events for the same
    + * row in the same (current) transaction. We want to collapse these to just the last one
    + * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the
    + * same row in the same txn. There is no benefit passing along anything except the last
    + * event. If we did want to pass it along, we'd have to include statementId in the row
    + * returned so that compaction could write it out or make minor minor compaction understand
    + * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any
    + * value in this.*/
    + boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
            // if we are collapsing, figure out if this is a new row
    - if (collapse) {
    - keysSame = prevKey.compareRow(recordIdentifier) == 0;
    + if (collapse || isSameRow) {
    + keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
              if (!keysSame) {
                prevKey.set(recordIdentifier);
              }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    index b576496..e4651b8 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    @@ -89,6 +89,7 @@ public class OrcRecordUpdater implements RecordUpdater {
        private final IntWritable bucket = new IntWritable();
        private final LongWritable rowId = new LongWritable();
        private long insertedRows = 0;
    + private long rowIdOffset = 0;
        // This records how many rows have been inserted or deleted. It is separate from insertedRows
        // because that is monotonically increasing to give new unique row ids.
        private long rowCountDelta = 0;
    @@ -263,6 +264,41 @@ public class OrcRecordUpdater implements RecordUpdater {
          item.setFieldValue(ROW_ID, rowId);
        }

    + /**
    + * To handle multiple INSERT... statements in a single transaction, we want to make sure
    + * to generate unique {@code rowId} for all inserted rows of the transaction.
    + * @return largest rowId created by previous statements (maybe 0)
    + * @throws IOException
    + */
    + private long findRowIdOffsetForInsert() throws IOException {
    + /*
    + * 1. need to know bucket we are writing to
    + * 2. need to know which delta dir it's in
    + * Then,
    + * 1. find the same bucket file in previous delta dir for this txn
    + * 2. read the footer and get AcidStats which has insert count
    + * 2.1 if AcidStats.inserts>0 done
    + * else go to previous delta file
    + * For example, consider insert/update/insert case...*/
    + if(options.getStatementId() <= 0) {
    + return 0;//there is only 1 statement in this transaction (so far)
    + }
    + for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
    + Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
    + if(!fs.exists(matchingBucket)) {
    + continue;
    + }
    + Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration()));
    + //no close() on Reader?!
    + AcidStats acidStats = parseAcidStats(reader);
    + if(acidStats.inserts > 0) {
    + return acidStats.inserts;
    + }
    + }
    + //if we got here, we looked at all delta files in this txn, prior to current statement and didn't
    + //find any inserts...
    + return 0;
    + }
        // Find the record identifier column (if there) and return a possibly new ObjectInspector that
        // will strain out the record id for the underlying writer.
        private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
    @@ -304,6 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater {
                recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
            rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
          }
    + else if(operation == INSERT_OPERATION) {
    + rowId += rowIdOffset;
    + }
          this.rowId.set(rowId);
          this.originalTransaction.set(originalTransaction);
          item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
    @@ -315,6 +354,9 @@ public class OrcRecordUpdater implements RecordUpdater {
        public void insert(long currentTransaction, Object row) throws IOException {
          if (this.currentTransaction.get() != currentTransaction) {
            insertedRows = 0;
    + //this method is almost no-op in hcatalog.streaming case since statementId == 0 is
    + //always true in that case
    + rowIdOffset = findRowIdOffsetForInsert();
          }
          addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
          rowCountDelta++;
    @@ -407,6 +449,22 @@ public class OrcRecordUpdater implements RecordUpdater {
          }
          return result;
        }
    + /**
    + * {@link KeyIndexBuilder} creates these
    + */
    + static AcidStats parseAcidStats(Reader reader) {
    + String statsSerialized;
    + try {
    + ByteBuffer val =
    + reader.getMetadataValue(OrcRecordUpdater.ACID_STATS)
    + .duplicate();
    + statsSerialized = utf8Decoder.decode(val).toString();
    + } catch (CharacterCodingException e) {
    + throw new IllegalArgumentException("Bad string encoding for " +
    + OrcRecordUpdater.ACID_STATS, e);
    + }
    + return new AcidStats(statsSerialized);
    + }

        static class KeyIndexBuilder implements OrcFile.WriterCallback {
          StringBuilder lastKey = new StringBuilder();

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    index 0c7dd40..8cf4cc0 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    @@ -26,6 +26,8 @@ import java.util.ArrayList;
      import java.util.List;

      import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.io.AcidInputFormat;
    +import org.apache.hadoop.hive.ql.io.AcidUtils;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.WritableUtils;
      import org.apache.hadoop.mapred.FileSplit;
    @@ -41,7 +43,7 @@ public class OrcSplit extends FileSplit {
        private boolean hasFooter;
        private boolean isOriginal;
        private boolean hasBase;
    - private final List<Long> deltas = new ArrayList<Long>();
    + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
        private OrcFile.WriterVersion writerVersion;
        private long projColsUncompressedSize;

    @@ -58,7 +60,7 @@ public class OrcSplit extends FileSplit {

        public OrcSplit(Path path, long offset, long length, String[] hosts,
            ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
    - List<Long> deltas, long projectedDataSize) {
    + List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
          super(path, offset, length, hosts);
          this.fileMetaInfo = fileMetaInfo;
          hasFooter = this.fileMetaInfo != null;
    @@ -78,8 +80,8 @@ public class OrcSplit extends FileSplit {
              (hasFooter ? FOOTER_FLAG : 0);
          out.writeByte(flags);
          out.writeInt(deltas.size());
    - for(Long delta: deltas) {
    - out.writeLong(delta);
    + for(AcidInputFormat.DeltaMetaData delta: deltas) {
    + delta.write(out);
          }
          if (hasFooter) {
            // serialize FileMetaInfo fields
    @@ -112,7 +114,9 @@ public class OrcSplit extends FileSplit {
          deltas.clear();
          int numDeltas = in.readInt();
          for(int i=0; i < numDeltas; i++) {
    - deltas.add(in.readLong());
    + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
    + dmd.readFields(in);
    + deltas.add(dmd);
          }
          if (hasFooter) {
            // deserialize FileMetaInfo fields
    @@ -148,7 +152,7 @@ public class OrcSplit extends FileSplit {
          return hasBase;
        }

    - public List<Long> getDeltas() {
    + public List<AcidInputFormat.DeltaMetaData> getDeltas() {
          return deltas;
        }


    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    index f8fff1a..445f606 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    @@ -52,6 +52,14 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        private DbLockManager lockMgr = null;
        private IMetaStoreClient client = null;
        private long txnId = 0;
    + /**
    + * assigns a unique monotonically increasing ID to each statement
    + * which is part of an open transaction. This is used by storage
    + * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
    + * to keep apart multiple writes of the same data within the same transaction
    + * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
    + */
    + private int statementId = -1;

        DbTxnManager() {
        }
    @@ -69,6 +77,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
          init();
          try {
            txnId = client.openTxn(user);
    + statementId = 0;
            LOG.debug("Opened txn " + txnId);
            return txnId;
          } catch (TException e) {
    @@ -222,7 +231,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            return null;
          }

    - List<HiveLock> locks = new ArrayList<HiveLock>(1);
    + List<HiveLock> locks = new ArrayList<HiveLock>(1);
    + if(txnId > 0) {
    + statementId++;
    + }
          LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
          ctx.setHiveLocks(locks);
          return lockState;
    @@ -249,6 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
                e);
          } finally {
            txnId = 0;
    + statementId = -1;
          }
        }

    @@ -270,6 +283,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
                e);
          } finally {
            txnId = 0;
    + statementId = -1;
          }
        }

    @@ -361,5 +375,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            }
          }
        }
    + @Override
    + public int getStatementId() {
    + return statementId;
    + }

      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    index 21ab8ee..1906982 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    @@ -54,6 +54,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
        }

        @Override
    + public int getStatementId() {
    + return 0;
    + }
    + @Override
        public HiveLockManager getLockManager() throws LockException {
          if (lockMgr == null) {
            boolean supportConcurrency =

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    index 2dd0c7d..6c3dc33 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
    @@ -127,4 +127,7 @@ public interface HiveTxnManager {
         * @return true if this transaction manager does ACID
         */
        boolean supportsAcid();
    +
    + int getStatementId();
    +
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    index 1ce98b8..5719cf4 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    @@ -6605,7 +6605,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
            rsCtx.getNumFiles(),
            rsCtx.getTotalFiles(),
            rsCtx.getPartnCols(),
    - dpCtx);
    + dpCtx,
    + dest_path);

          // If this is an insert, update, or delete on an ACID table then mark that so the
          // FileSinkOperator knows how to properly write to it.

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    index bb6cee5..f73b502 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    @@ -92,16 +92,21 @@ public class FileSinkDesc extends AbstractOperatorDesc {
        // Record what type of write this is. Default is non-ACID (ie old style).
        private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
        private long txnId = 0; // transaction id for this operation
    + private int statementId = -1;

        private transient Table table;
    + private Path destPath;

        public FileSinkDesc() {
        }

    + /**
    + * @param destPath - the final destination for data
    + */
        public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
            final boolean compressed, final int destTableId, final boolean multiFileSpray,
            final boolean canBeMerged, final int numFiles, final int totalFiles,
    - final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx) {
    + final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) {

          this.dirName = dirName;
          this.tableInfo = tableInfo;
    @@ -114,6 +119,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
          this.partitionCols = partitionCols;
          this.dpCtx = dpCtx;
          this.dpSortState = DPSortState.NONE;
    + this.destPath = destPath;
        }

        public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
    @@ -135,7 +141,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
        public Object clone() throws CloneNotSupportedException {
          FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
              destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
    - partitionCols, dpCtx);
    + partitionCols, dpCtx, destPath);
          ret.setCompressCodec(compressCodec);
          ret.setCompressType(compressType);
          ret.setGatherStats(gatherStats);
    @@ -231,9 +237,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
          return temporary;
        }

    - /**
    - * @param totalFiles the totalFiles to set
    - */
        public void setTemporary(boolean temporary) {
          this.temporary = temporary;
        }
    @@ -438,11 +441,23 @@ public class FileSinkDesc extends AbstractOperatorDesc {
        public void setTransactionId(long id) {
          txnId = id;
        }
    -
        public long getTransactionId() {
          return txnId;
        }

    + public void setStatementId(int id) {
    + statementId = id;
    + }
    + /**
    + * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)}
    + */
    + public int getStatementId() {
    + return statementId;
    + }
    + public Path getDestPath() {
    + return destPath;
    + }
    +
        public Table getTable() {
          return table;
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    index c5f2d4d..6c77ba4 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    @@ -545,7 +545,9 @@ public class CompactorMR {
                  .reporter(reporter)
                  .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
                  .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
    - .bucket(bucket);
    + .bucket(bucket)
    + .statementId(-1);//setting statementId == -1 makes compacted delta files use
    + //delta_xxxx_yyyy format

              // Instantiate the underlying output format
              @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    index e400778..c6ae030 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    @@ -303,7 +303,8 @@ public class TestFileSinkOperator {
            Map<String, String> partColNames = new HashMap<String, String>(1);
            partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
            dpCtx.setInputToDPCols(partColNames);
    - desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
    + //todo: does this need the finalDestination?
    + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
          } else {
            desc = new FileSinkDesc(basePath, tableDesc, false);
          }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    index 1e3df34..f8ded12 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    @@ -46,17 +46,23 @@ public class TestAcidUtils {
              AcidUtils.createFilename(p, options).toString());
          options.bucket(123);
          assertEquals("/tmp/00123_0",
    - AcidUtils.createFilename(p, options).toString());
    + AcidUtils.createFilename(p, options).toString());
          options.bucket(23)
              .minimumTransactionId(100)
              .maximumTransactionId(200)
              .writingBase(true)
              .setOldStyle(false);
          assertEquals("/tmp/base_0000200/bucket_00023",
    - AcidUtils.createFilename(p, options).toString());
    + AcidUtils.createFilename(p, options).toString());
          options.writingBase(false);
    + assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
    + AcidUtils.createFilename(p, options).toString());
    + options.statementId(-1);
          assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
    - AcidUtils.createFilename(p, options).toString());
    + AcidUtils.createFilename(p, options).toString());
    + options.statementId(7);
    + assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
    + AcidUtils.createFilename(p, options).toString());
        }

        @Test
    @@ -236,7 +242,6 @@ public class TestAcidUtils {
              new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
              new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
              new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
    - new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
              new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
          Path part = new MockPath(fs, "mock:/tbl/part1");
          AcidUtils.Directory dir =
    @@ -254,6 +259,45 @@ public class TestAcidUtils {
          assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString());
        }

    + /**
    + * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns
    + * @throws Exception
    + */
    + @Test
    + public void testOverlapingDelta2() throws Exception {
    + Configuration conf = new Configuration();
    + MockFileSystem fs = new MockFileSystem(conf,
    + new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
    + Path part = new MockPath(fs, "mock:/tbl/part1");
    + AcidUtils.Directory dir =
    + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
    + assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
    + List<FileStatus> obsolete = dir.getObsolete();
    + assertEquals(5, obsolete.size());
    + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString());
    + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
    + assertEquals(5, delts.size());
    + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString());
    + }
    +
        @Test
        public void deltasWithOpenTxnInRead() throws Exception {
          Configuration conf = new Configuration();
    @@ -268,6 +312,27 @@ public class TestAcidUtils {
          assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
        }

    + /**
    + * @since 1.3.0
    + * @throws Exception
    + */
    + @Test
    + public void deltasWithOpenTxnInRead2() throws Exception {
    + Configuration conf = new Configuration();
    + MockFileSystem fs = new MockFileSystem(conf,
    + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
    + new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
    + Path part = new MockPath(fs, "mock:/tbl/part1");
    + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
    + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
    + assertEquals(2, delts.size());
    + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
    + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
    + }
    +
        @Test
        public void deltasWithOpenTxnsNotInCompact() throws Exception {
          Configuration conf = new Configuration();

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    index 56e5f9f..e96ab2a 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
      import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
      import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
      import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.hadoop.hive.ql.io.AcidInputFormat;
      import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
      import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
      import org.apache.hadoop.hive.ql.io.HiveInputFormat;
    @@ -927,7 +928,7 @@ public class TestInputOutputFormat {
          OrcInputFormat.SplitGenerator splitter =
              new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
                  fs.getFileStatus(new Path("/a/file")), null, true,
    - new ArrayList<Long>(), true, null, null));
    + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
          OrcSplit result = splitter.createSplit(0, 200, null);
          assertEquals(0, result.getStart());
          assertEquals(200, result.getLength());
    @@ -968,7 +969,7 @@ public class TestInputOutputFormat {
          OrcInputFormat.SplitGenerator splitter =
              new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
                  fs.getFileStatus(new Path("/a/file")), null, true,
    - new ArrayList<Long>(), true, null, null));
    + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
          List<OrcSplit> results = splitter.call();
          OrcSplit result = results.get(0);
          assertEquals(3, result.getStart());
    @@ -990,7 +991,7 @@ public class TestInputOutputFormat {
          conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
          context = new OrcInputFormat.Context(conf);
          splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
    - fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<Long>(),
    + fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(),
              true, null, null));
          results = splitter.call();
          for(int i=0; i < stripeSizes.length; ++i) {
    @@ -1497,7 +1498,7 @@ public class TestInputOutputFormat {
          Path partDir = new Path(conf.get("mapred.input.dir"));
          OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
              new AcidOutputFormat.Options(conf).maximumTransactionId(10)
    - .writingBase(true).bucket(0).inspector(inspector));
    + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir));
          for(int i=0; i < 100; ++i) {
            BigRow row = new BigRow(i);
            writer.insert(10, row);
    @@ -1648,7 +1649,7 @@ public class TestInputOutputFormat {
          // write a base file in partition 0
          OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
              new AcidOutputFormat.Options(conf).maximumTransactionId(10)
    - .writingBase(true).bucket(0).inspector(inspector));
    + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0]));
          for(int i=0; i < 10; ++i) {
            writer.insert(10, new MyRow(i, 2 * i));
          }
    @@ -1661,7 +1662,7 @@ public class TestInputOutputFormat {
          // write a delta file in partition 0
          writer = new OrcRecordUpdater(partDir[0],
              new AcidOutputFormat.Options(conf).maximumTransactionId(10)
    - .writingBase(true).bucket(1).inspector(inspector));
    + .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0]));
          for(int i=10; i < 20; ++i) {
            writer.insert(10, new MyRow(i, 2*i));
          }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    index 921e954..39f71f1 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    @@ -62,12 +62,12 @@ import static org.junit.Assert.assertNull;
      public class TestOrcRawRecordMerger {

        private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
    -
    +//todo: why is statementId -1?
        @Test
        public void testOrdering() throws Exception {
          ReaderKey left = new ReaderKey(100, 200, 1200, 300);
          ReaderKey right = new ReaderKey();
    - right.setValues(100, 200, 1000, 200);
    + right.setValues(100, 200, 1000, 200,1);
          assertTrue(right.compareTo(left) < 0);
          assertTrue(left.compareTo(right) > 0);
          assertEquals(false, left.equals(right));
    @@ -76,16 +76,16 @@ public class TestOrcRawRecordMerger {
          assertEquals(true, right.equals(left));
          right.setRowId(2000);
          assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 4);
    - right.setValues(100, 2, 3, 4);
    + left.setValues(1, 2, 3, 4,-1);
    + right.setValues(100, 2, 3, 4,-1);
          assertTrue(left.compareTo(right) < 0);
          assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 4);
    - right.setValues(1, 100, 3, 4);
    + left.setValues(1, 2, 3, 4,-1);
    + right.setValues(1, 100, 3, 4,-1);
          assertTrue(left.compareTo(right) < 0);
          assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 100);
    - right.setValues(1, 2, 3, 4);
    + left.setValues(1, 2, 3, 100,-1);
    + right.setValues(1, 2, 3, 4,-1);
          assertTrue(left.compareTo(right) < 0);
          assertTrue(right.compareTo(left) > 0);

    @@ -177,7 +177,7 @@ public class TestOrcRawRecordMerger {
          RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
          RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
          ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
    - new Reader.Options());
    + new Reader.Options(), 0);
          RecordReader recordReader = pair.recordReader;
          assertEquals(10, key.getTransactionId());
          assertEquals(20, key.getBucketId());
    @@ -203,7 +203,7 @@ public class TestOrcRawRecordMerger {
          Reader reader = createMockReader();

          ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
    - new Reader.Options());
    + new Reader.Options(), 0);
          RecordReader recordReader = pair.recordReader;
          assertEquals(10, key.getTransactionId());
          assertEquals(20, key.getBucketId());
    @@ -489,7 +489,7 @@ public class TestOrcRawRecordMerger {
          // write the empty base
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
              .inspector(inspector).bucket(BUCKET).writingBase(true)
    - .maximumTransactionId(100);
    + .maximumTransactionId(100).finalDestination(root);
          of.getRecordUpdater(root, options).close(false);

          ValidTxnList txnList = new ValidReadTxnList("200:");
    @@ -515,6 +515,10 @@ public class TestOrcRawRecordMerger {
         */
        @Test
        public void testNewBaseAndDelta() throws Exception {
    + testNewBaseAndDelta(false);
    + testNewBaseAndDelta(true);
    + }
    + private void testNewBaseAndDelta(boolean use130Format) throws Exception {
          final int BUCKET = 10;
          String[] values = new String[]{"first", "second", "third", "fourth",
                                         "fifth", "sixth", "seventh", "eighth",
    @@ -532,7 +536,10 @@ public class TestOrcRawRecordMerger {

          // write the base
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
    - .inspector(inspector).bucket(BUCKET);
    + .inspector(inspector).bucket(BUCKET).finalDestination(root);
    + if(!use130Format) {
    + options.statementId(-1);
    + }
          RecordUpdater ru = of.getRecordUpdater(root,
              options.writingBase(true).maximumTransactionId(100));
          for(String v: values) {
    @@ -554,7 +561,8 @@ public class TestOrcRawRecordMerger {
          AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);

          assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
    - assertEquals(new Path(root, "delta_0000200_0000200"),
    + assertEquals(new Path(root, use130Format ?
    + AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
              directory.getCurrentDirectories().get(0).getPath());

          Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
    @@ -829,7 +837,7 @@ public class TestOrcRawRecordMerger {
          // write a delta
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
              .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5);
    + .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
          RecordUpdater ru = of.getRecordUpdater(root, options);
          values = new String[]{"0.0", null, null, "1.1", null, null, null,
              "ignore.7"};
    @@ -920,6 +928,7 @@ public class TestOrcRawRecordMerger {
          options.orcOptions(OrcFile.writerOptions(conf)
            .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
            .memory(mgr));
    + options.finalDestination(root);
          RecordUpdater ru = of.getRecordUpdater(root, options);
          String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
              "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
    @@ -1004,7 +1013,8 @@ public class TestOrcRawRecordMerger {
          AcidOutputFormat.Options options =
              new AcidOutputFormat.Options(conf)
                  .bucket(BUCKET).inspector(inspector).filesystem(fs)
    - .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
    + .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
    + .finalDestination(root);
          RecordUpdater ru = of.getRecordUpdater(root, options);
          String[] values = new String[]{"a", "b", "c", "d", "e"};
          for(int i=0; i < values.length; ++i) {
    @@ -1047,6 +1057,14 @@ public class TestOrcRawRecordMerger {
         */
        @Test
        public void testRecordReaderIncompleteDelta() throws Exception {
    + testRecordReaderIncompleteDelta(false);
    + testRecordReaderIncompleteDelta(true);
    + }
    + /**
    + *
    + * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
    + */
    + private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
          final int BUCKET = 1;
          Configuration conf = new Configuration();
          OrcOutputFormat of = new OrcOutputFormat();
    @@ -1063,7 +1081,10 @@ public class TestOrcRawRecordMerger {
          AcidOutputFormat.Options options =
              new AcidOutputFormat.Options(conf)
                  .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs);
    + .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
    + if(!use130Format) {
    + options.statementId(-1);
    + }
          RecordUpdater ru = of.getRecordUpdater(root, options);
          String[] values= new String[]{"1", "2", "3", "4", "5"};
          for(int i=0; i < values.length; ++i) {
    @@ -1110,8 +1131,8 @@ public class TestOrcRawRecordMerger {
          splits = inf.getSplits(job, 1);
          assertEquals(2, splits.length);
          rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
    - Path sideFile = new Path(root +
    - "/delta_0000010_0000019/bucket_00001_flush_length");
    + Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
    + AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
          assertEquals(true, fs.exists(sideFile));
          assertEquals(24, fs.getFileStatus(sideFile).getLen());


    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    index 22bd4b9..22030b4 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
    @@ -97,7 +97,8 @@ public class TestOrcRecordUpdater {
              .minimumTransactionId(10)
              .maximumTransactionId(19)
              .inspector(inspector)
    - .reporter(Reporter.NULL);
    + .reporter(Reporter.NULL)
    + .finalDestination(root);
          RecordUpdater updater = new OrcRecordUpdater(root, options);
          updater.insert(11, new MyRow("first"));
          updater.insert(11, new MyRow("second"));
    @@ -197,7 +198,8 @@ public class TestOrcRecordUpdater {
              .maximumTransactionId(100)
              .inspector(inspector)
              .reporter(Reporter.NULL)
    - .recordIdColumn(1);
    + .recordIdColumn(1)
    + .finalDestination(root);
          RecordUpdater updater = new OrcRecordUpdater(root, options);
          updater.update(100, new MyRow("update", 30, 10, bucket));
          updater.delete(100, new MyRow("", 60, 40, bucket));

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    index 671e122..21adc9d 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    @@ -241,7 +241,7 @@ public abstract class CompactorTest {
          return sd;
        }

    - // I can't do this with @Before because I want to be able to control when the thead starts
    + // I can't do this with @Before because I want to be able to control when the thread starts
        private void startThread(char type, boolean stopAfterOne) throws Exception {
          startThread(type, stopAfterOne, new AtomicBoolean());
        }
    @@ -284,7 +284,7 @@ public abstract class CompactorTest {
          switch (type) {
            case BASE: filename = "base_" + maxTxn; break;
            case LENGTH_FILE: // Fall through to delta
    - case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
    + case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
            case LEGACY: break; // handled below
          }

    @@ -508,5 +508,21 @@ public abstract class CompactorTest {
          }
        }

    + /**
    + * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that
    + * the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats
    + * are used since new (1.3) code has to be able to read old files.
    + */
    + abstract boolean useHive130DeltaDirName();

    + String makeDeltaDirName(long minTxnId, long maxTxnId) {
    + return useHive130DeltaDirName() ?
    + AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
    + }
    + /**
    + * delta dir name after compaction
    + */
    + String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
    + return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    index ffdbb9a..0db732c 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    @@ -139,7 +139,7 @@ public class TestCleaner extends CompactorTest {
          boolean sawBase = false, sawDelta = false;
          for (Path p : paths) {
            if (p.getName().equals("base_20")) sawBase = true;
    - else if (p.getName().equals("delta_21_24")) sawDelta = true;
    + else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
            else Assert.fail("Unexpected file " + p.getName());
          }
          Assert.assertTrue(sawBase);
    @@ -177,7 +177,7 @@ public class TestCleaner extends CompactorTest {
          boolean sawBase = false, sawDelta = false;
          for (Path path : paths) {
            if (path.getName().equals("base_20")) sawBase = true;
    - else if (path.getName().equals("delta_21_24")) sawDelta = true;
    + else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true;
            else Assert.fail("Unexpected file " + path.getName());
          }
          Assert.assertTrue(sawBase);
    @@ -480,4 +480,8 @@ public class TestCleaner extends CompactorTest {
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(0, rsp.getCompactsSize());
        }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
    new file mode 100644
    index 0000000..c637dd1
    --- /dev/null
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
    @@ -0,0 +1,14 @@
    +package org.apache.hadoop.hive.ql.txn.compactor;
    +
    +/**
    + * Same as TestCleaner but tests delta file names in Hive 1.3.0 format
    + */
    +public class TestCleaner2 extends TestCleaner {
    + public TestCleaner2() throws Exception {
    + super();
    + }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    index 00b13de..0b0b1da 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    @@ -713,5 +713,9 @@ public class TestInitiator extends CompactorTest {
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(0, compacts.size());
        }
    + @Override
    + boolean useHive130DeltaDirName() {
    + return false;
    + }

      }

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJul 13, '15 at 4:11p
activeJul 13, '15 at 4:31p
posts4
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 4 posts

People

Translate

site design / logo © 2021 Grokbase