FAQ
HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c30ab468
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c30ab468
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c30ab468

Branch: refs/heads/branch-1
Commit: c30ab4686cbfe73c3cf4552fa7e07c8ded3b4b17
Parents: 16d1b74
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Mon Jul 13 09:31:17 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Mon Jul 13 09:31:17 2015 -0700

----------------------------------------------------------------------
  .../streaming/AbstractRecordWriter.java | 4 +-
  .../java/org/apache/hadoop/hive/ql/Driver.java | 1 +
  .../hadoop/hive/ql/io/AcidInputFormat.java | 60 +++++++-
  .../hadoop/hive/ql/io/AcidOutputFormat.java | 49 +++++-
  .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 152 +++++++++++++++----
  .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 19 +--
  .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 20 +--
  .../hadoop/hive/ql/io/orc/OrcNewSplit.java | 13 +-
  .../hive/ql/io/orc/OrcRawRecordMerger.java | 66 ++++++--
  .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 58 +++++++
  .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 16 +-
  .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 20 ++-
  .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 4 +
  .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 +
  .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 +-
  .../hadoop/hive/ql/plan/FileSinkDesc.java | 27 +++-
  .../hive/ql/txn/compactor/CompactorMR.java | 4 +-
  .../hive/ql/exec/TestFileSinkOperator.java | 3 +-
  .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 73 ++++++++-
  .../hive/ql/io/orc/TestInputOutputFormat.java | 13 +-
  .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 57 ++++---
  .../hive/ql/io/orc/TestOrcRecordUpdater.java | 6 +-
  .../hive/ql/txn/compactor/CompactorTest.java | 20 ++-
  .../hive/ql/txn/compactor/TestCleaner.java | 8 +-
  .../hive/ql/txn/compactor/TestCleaner2.java | 14 ++
  .../hive/ql/txn/compactor/TestInitiator.java | 4 +
  .../hive/ql/txn/compactor/TestWorker.java | 49 +++---
  .../hive/ql/txn/compactor/TestWorker2.java | 16 ++
  28 files changed, 642 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index ed46bca..c959222 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -143,7 +143,9 @@ abstract class AbstractRecordWriter implements RecordWriter {
                        .inspector(getSerde().getObjectInspector())
                        .bucket(bucketId)
                        .minimumTransactionId(minTxnId)
- .maximumTransactionId(maxTxnID));
+ .maximumTransactionId(maxTxnID)
+ .statementId(-1)
+ .finalDestination(partitionPath));
      } catch (SerDeException e) {
        throw new SerializationError("Failed to get object inspector from Serde "
                + getSerde().getClass().getName(), e);

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e04165b..d161503 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -986,6 +986,7 @@ public class Driver implements CommandProcessor {
          if (acidSinks != null) {
            for (FileSinkDesc desc : acidSinks) {
              desc.setTransactionId(txnId);
+ desc.setStatementId(txnMgr.getStatementId());
            }
          }


http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index e1d2395..24506b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -22,13 +22,19 @@ import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.ValidTxnList;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.mapred.InputFormat;
  import org.apache.hadoop.mapred.InputSplit;
  import org.apache.hadoop.mapred.RecordReader;
  import org.apache.hadoop.mapred.Reporter;

+import java.io.DataInput;
+import java.io.DataOutput;
  import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;

  /**
   * The interface required for input formats that what to support ACID
@@ -62,7 +68,7 @@ import java.io.IOException;
   * <li>New format -
   * <pre>
   * $partition/base_$tid/$bucket
- * delta_$tid_$tid/$bucket
+ * delta_$tid_$tid_$stid/$bucket
   * </pre></li>
   * </ul>
   * <p>
@@ -71,6 +77,8 @@ import java.io.IOException;
   * stored sorted by the original transaction id (ascending), bucket (ascending),
   * row id (ascending), and current transaction id (descending). Thus the files
   * can be merged by advancing through the files in parallel.
+ * The stid is unique id (within the transaction) of the statement that created
+ * this delta file.
   * <p>
   * The base files include all transactions from the beginning of time
   * (transaction id 0) to the transaction in the directory name. Delta
@@ -91,7 +99,7 @@ import java.io.IOException;
   * For row-at-a-time processing, KEY can conveniently pass RowId into the operator
   * pipeline. For vectorized execution the KEY could perhaps represent a range in the batch.
   * Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return
- * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined
+ * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader} is defined
   * to provide access to the RowId. Other implementations of AcidInputFormat can use either
   * mechanism.
   * </p>
@@ -101,6 +109,54 @@ import java.io.IOException;
  public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
      extends InputFormat<KEY, VALUE>, InputFormatChecker {

+ static final class DeltaMetaData implements Writable {
+ private long minTxnId;
+ private long maxTxnId;
+ private List<Integer> stmtIds;
+
+ public DeltaMetaData() {
+ this(0,0,null);
+ }
+ DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.stmtIds = stmtIds;
+ }
+ long getMinTxnId() {
+ return minTxnId;
+ }
+ long getMaxTxnId() {
+ return maxTxnId;
+ }
+ List<Integer> getStmtIds() {
+ return stmtIds;
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(minTxnId);
+ out.writeLong(maxTxnId);
+ out.writeInt(stmtIds.size());
+ if(stmtIds == null) {
+ return;
+ }
+ for(Integer id : stmtIds) {
+ out.writeInt(id);
+ }
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ minTxnId = in.readLong();
+ maxTxnId = in.readLong();
+ int numStatements = in.readInt();
+ if(numStatements <= 0) {
+ return;
+ }
+ stmtIds = new ArrayList<>();
+ for(int i = 0; i < numStatements; i++) {
+ stmtIds.add(in.readInt());
+ }
+ }
+ }
    /**
     * Options for controlling the record readers.
     */

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 0d537e1..dd90a95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -39,7 +39,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
    /**
     * Options to control how the files are written
     */
- public static class Options {
+ public static class Options implements Cloneable {
      private final Configuration configuration;
      private FileSystem fs;
      private ObjectInspector inspector;
@@ -53,7 +53,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
      private PrintStream dummyStream = null;
      private boolean oldStyle = false;
      private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id
-
+ //unique within a transaction
+ private int statementId = 0;
+ private Path finalDestination;
      /**
       * Create the options object.
       * @param conf Use the given configuration
@@ -63,6 +65,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
      }

      /**
+ * shallow clone
+ */
+ @Override
+ public Options clone() {
+ try {
+ return (Options)super.clone();
+ }
+ catch(CloneNotSupportedException ex) {
+ throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex);
+ }
+ }
+ /**
       * Use the given ObjectInspector for each record written.
       * @param inspector the inspector to use.
       * @return this
@@ -185,6 +199,31 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
        return this;
      }

+ /**
+ * @since 1.3.0
+ * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
+ * This is primarily needed for testing to make sure 1.3 code can still read files created
+ * by older code. Also used by Comactor.
+ */
+ public Options statementId(int id) {
+ if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) {
+ throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId);
+ }
+ if(id < -1) {
+ throw new IllegalArgumentException("Illegal statementId value: " + id);
+ }
+ this.statementId = id;
+ return this;
+ }
+ /**
+ * @param p where the data for this operation will eventually end up;
+ * basically table or partition directory in FS
+ */
+ public Options finalDestination(Path p) {
+ this.finalDestination = p;
+ return this;
+ }
+
      public Configuration getConfiguration() {
        return configuration;
      }
@@ -236,6 +275,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
      boolean getOldStyle() {
        return oldStyle;
      }
+ public int getStatementId() {
+ return statementId;
+ }
+ public Path getFinalDestination() {
+ return finalDestination;
+ }
    }

    /**

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2214733..c7e0780 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -67,6 +67,15 @@ public class AcidUtils {
    };
    public static final String BUCKET_DIGITS = "%05d";
    public static final String DELTA_DIGITS = "%07d";
+ /**
+ * 10K statements per tx. Probably overkill ... since that many delta files
+ * would not be good for performance
+ */
+ public static final String STATEMENT_DIGITS = "%04d";
+ /**
+ * This must be in sync with {@link #STATEMENT_DIGITS}
+ */
+ public static final int MAX_STATEMENTS_PER_TXN = 10000;
    public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
    public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
    public static final PathFilter originalBucketFilter = new PathFilter() {
@@ -79,7 +88,7 @@ public class AcidUtils {
    private AcidUtils() {
      // NOT USED
    }
- private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
+ private static final Log LOG = LogFactory.getLog(AcidUtils.class);

    private static final Pattern ORIGINAL_PATTERN =
        Pattern.compile("[0-9]+_[0-9]+");
@@ -104,12 +113,23 @@ public class AcidUtils {
          BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
    }

- private static String deltaSubdir(long min, long max) {
+ /**
+ * This is format of delta dir name prior to Hive 1.3.x
+ */
+ public static String deltaSubdir(long min, long max) {
      return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
          String.format(DELTA_DIGITS, max);
    }

    /**
+ * Each write statement in a transaction creates its own delta dir.
+ * @since 1.3.x
+ */
+ public static String deltaSubdir(long min, long max, int statementId) {
+ return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
+ }
+
+ /**
     * Create a filename for a bucket file.
     * @param directory the partition directory
     * @param options the options for writing the bucket
@@ -124,9 +144,15 @@ public class AcidUtils {
      } else if (options.isWritingBase()) {
        subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
            options.getMaximumTransactionId());
+ } else if(options.getStatementId() == -1) {
+ //when minor compaction runs, we collapse per statement delta files inside a single
+ //transaction so we no longer need a statementId in the file name
+ subdir = deltaSubdir(options.getMinimumTransactionId(),
+ options.getMaximumTransactionId());
      } else {
        subdir = deltaSubdir(options.getMinimumTransactionId(),
- options.getMaximumTransactionId());
+ options.getMaximumTransactionId(),
+ options.getStatementId());
      }
      return createBucketFile(new Path(directory, subdir), options.getBucket());
    }
@@ -214,14 +240,24 @@ public class AcidUtils {
    }

    public static class ParsedDelta implements Comparable<ParsedDelta> {
- final long minTransaction;
- final long maxTransaction;
- final FileStatus path;
+ private final long minTransaction;
+ private final long maxTransaction;
+ private final FileStatus path;
+ //-1 is for internal (getAcidState()) purposes and means the delta dir
+ //had no statement ID
+ private final int statementId;

+ /**
+ * for pre 1.3.x delta files
+ */
      ParsedDelta(long min, long max, FileStatus path) {
+ this(min, max, path, -1);
+ }
+ ParsedDelta(long min, long max, FileStatus path, int statementId) {
        this.minTransaction = min;
        this.maxTransaction = max;
        this.path = path;
+ this.statementId = statementId;
      }

      public long getMinTransaction() {
@@ -236,6 +272,16 @@ public class AcidUtils {
        return path.getPath();
      }

+ public int getStatementId() {
+ return statementId == -1 ? 0 : statementId;
+ }
+
+ /**
+ * Compactions (Major/Minor) merge deltas/bases but delete of old files
+ * happens in a different process; thus it's possible to have bases/deltas with
+ * overlapping txnId boundaries. The sort order helps figure out the "best" set of files
+ * to use to get data.
+ */
      @Override
      public int compareTo(ParsedDelta parsedDelta) {
        if (minTransaction != parsedDelta.minTransaction) {
@@ -250,7 +296,22 @@ public class AcidUtils {
          } else {
            return -1;
          }
- } else {
+ }
+ else if(statementId != parsedDelta.statementId) {
+ /**
+ * We want deltas after minor compaction (w/o statementId) to sort
+ * earlier so that getAcidState() considers compacted files (into larger ones) obsolete
+ * Before compaction, include deltas with all statementIds for a given txnId
+ * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory}
+ */
+ if(statementId < parsedDelta.statementId) {
+ return -1;
+ }
+ else {
+ return 1;
+ }
+ }
+ else {
          return path.compareTo(parsedDelta.path);
        }
      }
@@ -271,46 +332,72 @@ public class AcidUtils {

    /**
     * Convert the list of deltas into an equivalent list of begin/end
- * transaction id pairs.
+ * transaction id pairs. Assumes {@code deltas} is sorted.
     * @param deltas
     * @return the list of transaction ids to serialize
     */
- public static List<Long> serializeDeltas(List<ParsedDelta> deltas) {
- List<Long> result = new ArrayList<Long>(deltas.size() * 2);
- for(ParsedDelta delta: deltas) {
- result.add(delta.minTransaction);
- result.add(delta.maxTransaction);
+ public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) {
+ List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
+ AcidInputFormat.DeltaMetaData last = null;
+ for(ParsedDelta parsedDelta : deltas) {
+ if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) {
+ last.getStmtIds().add(parsedDelta.getStatementId());
+ continue;
+ }
+ last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>());
+ result.add(last);
+ if(parsedDelta.statementId >= 0) {
+ last.getStmtIds().add(parsedDelta.getStatementId());
+ }
      }
      return result;
    }

    /**
     * Convert the list of begin/end transaction id pairs to a list of delta
- * directories.
+ * directories. Note that there may be multiple delta files for the exact same txn range starting
+ * with 1.3.x;
+ * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
     * @param root the root directory
     * @param deltas list of begin/end transaction id pairs
     * @return the list of delta paths
     */
- public static Path[] deserializeDeltas(Path root, List<Long> deltas) {
- int deltaSize = deltas.size() / 2;
- Path[] result = new Path[deltaSize];
- for(int i = 0; i < deltaSize; ++i) {
- result[i] = new Path(root, deltaSubdir(deltas.get(i * 2),
- deltas.get(i * 2 + 1)));
+ public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
+ List<Path> results = new ArrayList<Path>(deltas.size());
+ for(AcidInputFormat.DeltaMetaData dmd : deltas) {
+ if(dmd.getStmtIds().isEmpty()) {
+ results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
+ continue;
+ }
+ for(Integer stmtId : dmd.getStmtIds()) {
+ results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
+ }
      }
- return result;
+ return results.toArray(new Path[results.size()]);
    }

- static ParsedDelta parseDelta(FileStatus path) {
- String filename = path.getPath().getName();
+ private static ParsedDelta parseDelta(FileStatus path) {
+ ParsedDelta p = parsedDelta(path.getPath());
+ return new ParsedDelta(p.getMinTransaction(),
+ p.getMaxTransaction(), path, p.statementId);
+ }
+ public static ParsedDelta parsedDelta(Path deltaDir) {
+ String filename = deltaDir.getName();
      if (filename.startsWith(DELTA_PREFIX)) {
        String rest = filename.substring(DELTA_PREFIX.length());
        int split = rest.indexOf('_');
+ int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
        long min = Long.parseLong(rest.substring(0, split));
- long max = Long.parseLong(rest.substring(split + 1));
- return new ParsedDelta(min, max, path);
+ long max = split2 == -1 ?
+ Long.parseLong(rest.substring(split + 1)) :
+ Long.parseLong(rest.substring(split + 1, split2));
+ if(split2 == -1) {
+ return new ParsedDelta(min, max, null);
+ }
+ int statementId = Integer.parseInt(rest.substring(split2 + 1));
+ return new ParsedDelta(min, max, null, statementId);
      }
- throw new IllegalArgumentException(path + " does not start with " +
+ throw new IllegalArgumentException(deltaDir + " does not start with " +
                                         DELTA_PREFIX);
    }

@@ -407,15 +494,24 @@ public class AcidUtils {

      Collections.sort(working);
      long current = bestBaseTxn;
+ int lastStmtId = -1;
      for(ParsedDelta next: working) {
        if (next.maxTransaction > current) {
          // are any of the new transactions ones that we care about?
          if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
- ValidTxnList.RangeResponse.NONE) {
+ ValidTxnList.RangeResponse.NONE) {
            deltas.add(next);
            current = next.maxTransaction;
+ lastStmtId = next.statementId;
          }
- } else {
+ }
+ else if(next.maxTransaction == current && lastStmtId >= 0) {
+ //make sure to get all deltas within a single transaction; multi-statement txn
+ //generate multiple delta files with the same txnId range
+ //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
+ deltas.add(next);
+ }
+ else {
          obsolete.add(next.path);
        }
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 7ad5aa0..50ba740 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -297,31 +297,32 @@ public final class HiveFileFormatUtils {
      // TODO not 100% sure about this. This call doesn't set the compression type in the conf
      // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
      // sure if this is correct or not.
- return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
- bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
+ return getRecordUpdater(jc, acidOutputFormat,
+ bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
    }


    private static RecordUpdater getRecordUpdater(JobConf jc,
                                                  AcidOutputFormat<?, ?> acidOutputFormat,
- boolean isCompressed,
- long txnId,
                                                  int bucket,
                                                  ObjectInspector inspector,
                                                  Properties tableProp,
                                                  Path outPath,
                                                  Reporter reporter,
- int rowIdColNum) throws IOException {
+ int rowIdColNum,
+ FileSinkDesc conf) throws IOException {
      return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
- .isCompressed(isCompressed)
+ .isCompressed(conf.getCompressed())
          .tableProperties(tableProp)
          .reporter(reporter)
          .writingBase(false)
- .minimumTransactionId(txnId)
- .maximumTransactionId(txnId)
+ .minimumTransactionId(conf.getTransactionId())
+ .maximumTransactionId(conf.getTransactionId())
          .bucket(bucket)
          .inspector(inspector)
- .recordIdColumn(rowIdColNum));
+ .recordIdColumn(rowIdColNum)
+ .statementId(conf.getStatementId())
+ .finalDestination(conf.getDestPath()));
    }

    public static PartitionDesc getPartitionDescFromPathRecursively(

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 7346bc4..6fe0044 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -443,13 +443,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
      private final FileStatus file;
      private final FileInfo fileInfo;
      private final boolean isOriginal;
- private final List<Long> deltas;
+ private final List<DeltaMetaData> deltas;
      private final boolean hasBase;

      SplitInfo(Context context, FileSystem fs,
          FileStatus file, FileInfo fileInfo,
          boolean isOriginal,
- List<Long> deltas,
+ List<DeltaMetaData> deltas,
          boolean hasBase, Path dir, boolean[] covered) throws IOException {
        super(dir, context.numBuckets, deltas, covered);
        this.context = context;
@@ -471,12 +471,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
      FileSystem fs;
      List<FileStatus> files;
      boolean isOriginal;
- List<Long> deltas;
+ List<DeltaMetaData> deltas;
      Path dir;
      boolean[] covered;

      public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
- boolean isOriginal, List<Long> deltas, boolean[] covered) {
+ boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
        this.context = context;
        this.dir = dir;
        this.fs = fs;
@@ -547,14 +547,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    static final class BISplitStrategy extends ACIDSplitStrategy {
      List<FileStatus> fileStatuses;
      boolean isOriginal;
- List<Long> deltas;
+ List<DeltaMetaData> deltas;
      FileSystem fs;
      Context context;
      Path dir;

      public BISplitStrategy(Context context, FileSystem fs,
          Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
- List<Long> deltas, boolean[] covered) {
+ List<DeltaMetaData> deltas, boolean[] covered) {
        super(dir, context.numBuckets, deltas, covered);
        this.context = context;
        this.fileStatuses = fileStatuses;
@@ -591,11 +591,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     */
    static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
      Path dir;
- List<Long> deltas;
+ List<DeltaMetaData> deltas;
      boolean[] covered;
      int numBuckets;

- public ACIDSplitStrategy(Path dir, int numBuckets, List<Long> deltas, boolean[] covered) {
+ public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
        this.dir = dir;
        this.numBuckets = numBuckets;
        this.deltas = deltas;
@@ -644,7 +644,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
        final SplitStrategy splitStrategy;
        AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
            context.conf, context.transactionList);
- List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+ List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
        Path base = dirInfo.getBaseDirectory();
        List<FileStatus> original = dirInfo.getOriginalFiles();
        boolean[] covered = new boolean[context.numBuckets];
@@ -722,7 +722,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
      private Metadata metadata;
      private List<OrcProto.Type> types;
      private final boolean isOriginal;
- private final List<Long> deltas;
+ private final List<DeltaMetaData> deltas;
      private final boolean hasBase;
      private OrcFile.WriterVersion writerVersion;
      private long projColsUncompressedSize;

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
index da23544..b58c880 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.List;

+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableUtils;
  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -37,7 +38,7 @@ public class OrcNewSplit extends FileSplit {
    private boolean hasFooter;
    private boolean isOriginal;
    private boolean hasBase;
- private final List<Long> deltas = new ArrayList<Long>();
+ private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
    private OrcFile.WriterVersion writerVersion;

    protected OrcNewSplit(){
@@ -67,8 +68,8 @@ public class OrcNewSplit extends FileSplit {
          (hasFooter ? OrcSplit.FOOTER_FLAG : 0);
      out.writeByte(flags);
      out.writeInt(deltas.size());
- for(Long delta: deltas) {
- out.writeLong(delta);
+ for(AcidInputFormat.DeltaMetaData delta: deltas) {
+ delta.write(out);
      }
      if (hasFooter) {
        // serialize FileMetaInfo fields
@@ -101,7 +102,9 @@ public class OrcNewSplit extends FileSplit {
      deltas.clear();
      int numDeltas = in.readInt();
      for(int i=0; i < numDeltas; i++) {
- deltas.add(in.readLong());
+ AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+ dmd.readFields(in);
+ deltas.add(dmd);
      }
      if (hasFooter) {
        // deserialize FileMetaInfo fields
@@ -137,7 +140,7 @@ public class OrcNewSplit extends FileSplit {
      return hasBase;
    }

- public List<Long> getDeltas() {
+ public List<AcidInputFormat.DeltaMetaData> getDeltas() {
      return deltas;
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 728118a..2f11611 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -72,41 +72,55 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    /**
     * A RecordIdentifier extended with the current transaction id. This is the
     * key of our merge sort with the originalTransaction, bucket, and rowId
- * ascending and the currentTransaction descending. This means that if the
+ * ascending and the currentTransaction, statementId descending. This means that if the
     * reader is collapsing events to just the last update, just the first
     * instance of each record is required.
     */
    final static class ReaderKey extends RecordIdentifier{
      private long currentTransactionId;
+ private int statementId;//sort on this descending, like currentTransactionId

      public ReaderKey() {
- this(-1, -1, -1, -1);
+ this(-1, -1, -1, -1, 0);
      }

      public ReaderKey(long originalTransaction, int bucket, long rowId,
                       long currentTransactionId) {
+ this(originalTransaction, bucket, rowId, currentTransactionId, 0);
+ }
+ /**
+ * @param statementId - set this to 0 if N/A
+ */
+ public ReaderKey(long originalTransaction, int bucket, long rowId,
+ long currentTransactionId, int statementId) {
        super(originalTransaction, bucket, rowId);
        this.currentTransactionId = currentTransactionId;
+ this.statementId = statementId;
      }

      @Override
      public void set(RecordIdentifier other) {
        super.set(other);
        currentTransactionId = ((ReaderKey) other).currentTransactionId;
+ statementId = ((ReaderKey) other).statementId;
      }

      public void setValues(long originalTransactionId,
                            int bucket,
                            long rowId,
- long currentTransactionId) {
+ long currentTransactionId,
+ int statementId) {
        setValues(originalTransactionId, bucket, rowId);
        this.currentTransactionId = currentTransactionId;
+ this.statementId = statementId;
      }

      @Override
      public boolean equals(Object other) {
        return super.equals(other) &&
- currentTransactionId == ((ReaderKey) other).currentTransactionId;
+ currentTransactionId == ((ReaderKey) other).currentTransactionId
+ && statementId == ((ReaderKey) other).statementId//consistent with compareTo()
+ ;
      }

      @Override
@@ -118,6 +132,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            if (currentTransactionId != oth.currentTransactionId) {
              return currentTransactionId < oth.currentTransactionId ? +1 : -1;
            }
+ if(statementId != oth.statementId) {
+ return statementId < oth.statementId ? +1 : -1;
+ }
          } else {
            return -1;
          }
@@ -125,6 +142,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
        return sup;
      }

+ /**
+ * This means 1 txn modified the same row more than once
+ */
+ private boolean isSameRow(ReaderKey other) {
+ return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
+ }
+
      public long getCurrentTransactionId() {
        return currentTransactionId;
      }
@@ -142,7 +166,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      public String toString() {
        return "{originalTxn: " + getTransactionId() + ", bucket: " +
            getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
- currentTransactionId + "}";
+ currentTransactionId + ", statementId: "+ statementId + "}";
      }
    }

@@ -159,6 +183,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      final ReaderKey key;
      final RecordIdentifier maxKey;
      final int bucket;
+ private final int statementId;

      /**
       * Create a reader that reads from the first key larger than minKey to any
@@ -170,17 +195,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       * @param maxKey only return keys less than or equal to maxKey if it is
       * non-null
       * @param options options to provide to read the rows.
+ * @param statementId id of SQL statement within a transaction
       * @throws IOException
       */
      ReaderPair(ReaderKey key, Reader reader, int bucket,
                 RecordIdentifier minKey, RecordIdentifier maxKey,
- ReaderImpl.Options options) throws IOException {
+ ReaderImpl.Options options, int statementId) throws IOException {
        this.reader = reader;
        this.key = key;
        this.maxKey = maxKey;
        this.bucket = bucket;
        // TODO use stripe statistics to jump over stripes
        recordReader = reader.rowsOptions(options);
+ this.statementId = statementId;
        // advance the reader until we reach the minimum key
        do {
          next(nextRecord);
@@ -195,7 +222,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
              OrcRecordUpdater.getBucket(nextRecord),
              OrcRecordUpdater.getRowId(nextRecord),
- OrcRecordUpdater.getCurrentTransaction(nextRecord));
+ OrcRecordUpdater.getCurrentTransaction(nextRecord),
+ statementId);

          // if this record is larger than maxKey, we need to stop
          if (maxKey != null && key.compareRow(maxKey) > 0) {
@@ -223,7 +251,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
                         RecordIdentifier minKey, RecordIdentifier maxKey,
                         Reader.Options options) throws IOException {
- super(key, reader, bucket, minKey, maxKey, options);
+ super(key, reader, bucket, minKey, maxKey, options, 0);
      }

      @Override
@@ -263,7 +291,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            nextRecord.setFieldValue(OrcRecordUpdater.ROW,
                recordReader.next(OrcRecordUpdater.getRow(next)));
          }
- key.setValues(0L, bucket, nextRowId, 0L);
+ key.setValues(0L, bucket, nextRowId, 0L, 0);
          if (maxKey != null && key.compareRow(maxKey) > 0) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("key " + key + " > maxkey " + maxKey);
@@ -415,7 +443,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      this.offset = options.getOffset();
      this.length = options.getLength();
      this.validTxnList = validTxnList;
- // modify the optins to reflect the event instead of the base row
+ // modify the options to reflect the event instead of the base row
      Reader.Options eventOptions = createEventOptions(options);
      if (reader == null) {
        baseReader = null;
@@ -438,7 +466,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                                        options);
        } else {
          pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
- eventOptions);
+ eventOptions, 0);
        }

        // if there is at least one record, put it in the map
@@ -458,13 +486,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
        for(Path delta: deltaDirectory) {
          ReaderKey key = new ReaderKey();
          Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
+ AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
          FileSystem fs = deltaFile.getFileSystem(conf);
          long length = getLastFlushLength(fs, deltaFile);
          if (length != -1 && fs.exists(deltaFile)) {
            Reader deltaReader = OrcFile.createReader(deltaFile,
                OrcFile.readerOptions(conf).maxLength(length));
            ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
- maxKey, eventOptions);
+ maxKey, eventOptions, deltaDir.getStatementId());
            if (deltaPair.nextRecord != null) {
              readers.put(key, deltaPair);
            }
@@ -580,9 +609,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          continue;
        }

+ /*for multi-statement txns, you may have multiple events for the same
+ * row in the same (current) transaction. We want to collapse these to just the last one
+ * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the
+ * same row in the same txn. There is no benefit passing along anything except the last
+ * event. If we did want to pass it along, we'd have to include statementId in the row
+ * returned so that compaction could write it out or make minor minor compaction understand
+ * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any
+ * value in this.*/
+ boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
        // if we are collapsing, figure out if this is a new row
- if (collapse) {
- keysSame = prevKey.compareRow(recordIdentifier) == 0;
+ if (collapse || isSameRow) {
+ keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
          if (!keysSame) {
            prevKey.set(recordIdentifier);
          }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index b576496..e4651b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -89,6 +89,7 @@ public class OrcRecordUpdater implements RecordUpdater {
    private final IntWritable bucket = new IntWritable();
    private final LongWritable rowId = new LongWritable();
    private long insertedRows = 0;
+ private long rowIdOffset = 0;
    // This records how many rows have been inserted or deleted. It is separate from insertedRows
    // because that is monotonically increasing to give new unique row ids.
    private long rowCountDelta = 0;
@@ -263,6 +264,41 @@ public class OrcRecordUpdater implements RecordUpdater {
      item.setFieldValue(ROW_ID, rowId);
    }

+ /**
+ * To handle multiple INSERT... statements in a single transaction, we want to make sure
+ * to generate unique {@code rowId} for all inserted rows of the transaction.
+ * @return largest rowId created by previous statements (maybe 0)
+ * @throws IOException
+ */
+ private long findRowIdOffsetForInsert() throws IOException {
+ /*
+ * 1. need to know bucket we are writing to
+ * 2. need to know which delta dir it's in
+ * Then,
+ * 1. find the same bucket file in previous delta dir for this txn
+ * 2. read the footer and get AcidStats which has insert count
+ * 2.1 if AcidStats.inserts>0 done
+ * else go to previous delta file
+ * For example, consider insert/update/insert case...*/
+ if(options.getStatementId() <= 0) {
+ return 0;//there is only 1 statement in this transaction (so far)
+ }
+ for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
+ Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
+ if(!fs.exists(matchingBucket)) {
+ continue;
+ }
+ Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration()));
+ //no close() on Reader?!
+ AcidStats acidStats = parseAcidStats(reader);
+ if(acidStats.inserts > 0) {
+ return acidStats.inserts;
+ }
+ }
+ //if we got here, we looked at all delta files in this txn, prior to current statement and didn't
+ //find any inserts...
+ return 0;
+ }
    // Find the record identifier column (if there) and return a possibly new ObjectInspector that
    // will strain out the record id for the underlying writer.
    private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
@@ -304,6 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater {
            recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
        rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
      }
+ else if(operation == INSERT_OPERATION) {
+ rowId += rowIdOffset;
+ }
      this.rowId.set(rowId);
      this.originalTransaction.set(originalTransaction);
      item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
@@ -315,6 +354,9 @@ public class OrcRecordUpdater implements RecordUpdater {
    public void insert(long currentTransaction, Object row) throws IOException {
      if (this.currentTransaction.get() != currentTransaction) {
        insertedRows = 0;
+ //this method is almost no-op in hcatalog.streaming case since statementId == 0 is
+ //always true in that case
+ rowIdOffset = findRowIdOffsetForInsert();
      }
      addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
      rowCountDelta++;
@@ -407,6 +449,22 @@ public class OrcRecordUpdater implements RecordUpdater {
      }
      return result;
    }
+ /**
+ * {@link KeyIndexBuilder} creates these
+ */
+ static AcidStats parseAcidStats(Reader reader) {
+ String statsSerialized;
+ try {
+ ByteBuffer val =
+ reader.getMetadataValue(OrcRecordUpdater.ACID_STATS)
+ .duplicate();
+ statsSerialized = utf8Decoder.decode(val).toString();
+ } catch (CharacterCodingException e) {
+ throw new IllegalArgumentException("Bad string encoding for " +
+ OrcRecordUpdater.ACID_STATS, e);
+ }
+ return new AcidStats(statsSerialized);
+ }

    static class KeyIndexBuilder implements OrcFile.WriterCallback {
      StringBuilder lastKey = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 0c7dd40..8cf4cc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
  import java.util.List;

  import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableUtils;
  import org.apache.hadoop.mapred.FileSplit;
@@ -41,7 +43,7 @@ public class OrcSplit extends FileSplit {
    private boolean hasFooter;
    private boolean isOriginal;
    private boolean hasBase;
- private final List<Long> deltas = new ArrayList<Long>();
+ private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
    private OrcFile.WriterVersion writerVersion;
    private long projColsUncompressedSize;

@@ -58,7 +60,7 @@ public class OrcSplit extends FileSplit {

    public OrcSplit(Path path, long offset, long length, String[] hosts,
        ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
- List<Long> deltas, long projectedDataSize) {
+ List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
      super(path, offset, length, hosts);
      this.fileMetaInfo = fileMetaInfo;
      hasFooter = this.fileMetaInfo != null;
@@ -78,8 +80,8 @@ public class OrcSplit extends FileSplit {
          (hasFooter ? FOOTER_FLAG : 0);
      out.writeByte(flags);
      out.writeInt(deltas.size());
- for(Long delta: deltas) {
- out.writeLong(delta);
+ for(AcidInputFormat.DeltaMetaData delta: deltas) {
+ delta.write(out);
      }
      if (hasFooter) {
        // serialize FileMetaInfo fields
@@ -112,7 +114,9 @@ public class OrcSplit extends FileSplit {
      deltas.clear();
      int numDeltas = in.readInt();
      for(int i=0; i < numDeltas; i++) {
- deltas.add(in.readLong());
+ AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+ dmd.readFields(in);
+ deltas.add(dmd);
      }
      if (hasFooter) {
        // deserialize FileMetaInfo fields
@@ -148,7 +152,7 @@ public class OrcSplit extends FileSplit {
      return hasBase;
    }

- public List<Long> getDeltas() {
+ public List<AcidInputFormat.DeltaMetaData> getDeltas() {
      return deltas;
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index f8fff1a..445f606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -52,6 +52,14 @@ public class DbTxnManager extends HiveTxnManagerImpl {
    private DbLockManager lockMgr = null;
    private IMetaStoreClient client = null;
    private long txnId = 0;
+ /**
+ * assigns a unique monotonically increasing ID to each statement
+ * which is part of an open transaction. This is used by storage
+ * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
+ * to keep apart multiple writes of the same data within the same transaction
+ * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
+ */
+ private int statementId = -1;

    DbTxnManager() {
    }
@@ -69,6 +77,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
      init();
      try {
        txnId = client.openTxn(user);
+ statementId = 0;
        LOG.debug("Opened txn " + txnId);
        return txnId;
      } catch (TException e) {
@@ -222,7 +231,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        return null;
      }

- List<HiveLock> locks = new ArrayList<HiveLock>(1);
+ List<HiveLock> locks = new ArrayList<HiveLock>(1);
+ if(txnId > 0) {
+ statementId++;
+ }
      LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
      ctx.setHiveLocks(locks);
      return lockState;
@@ -249,6 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            e);
      } finally {
        txnId = 0;
+ statementId = -1;
      }
    }

@@ -270,6 +283,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            e);
      } finally {
        txnId = 0;
+ statementId = -1;
      }
    }

@@ -361,5 +375,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        }
      }
    }
+ @Override
+ public int getStatementId() {
+ return statementId;
+ }

  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 21ab8ee..1906982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -54,6 +54,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
    }

    @Override
+ public int getStatementId() {
+ return 0;
+ }
+ @Override
    public HiveLockManager getLockManager() throws LockException {
      if (lockMgr == null) {
        boolean supportConcurrency =

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 2dd0c7d..6c3dc33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -127,4 +127,7 @@ public interface HiveTxnManager {
     * @return true if this transaction manager does ACID
     */
    boolean supportsAcid();
+
+ int getStatementId();
+
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 1ce98b8..5719cf4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6605,7 +6605,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
        rsCtx.getNumFiles(),
        rsCtx.getTotalFiles(),
        rsCtx.getPartnCols(),
- dpCtx);
+ dpCtx,
+ dest_path);

      // If this is an insert, update, or delete on an ACID table then mark that so the
      // FileSinkOperator knows how to properly write to it.

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index bb6cee5..f73b502 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -92,16 +92,21 @@ public class FileSinkDesc extends AbstractOperatorDesc {
    // Record what type of write this is. Default is non-ACID (ie old style).
    private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
    private long txnId = 0; // transaction id for this operation
+ private int statementId = -1;

    private transient Table table;
+ private Path destPath;

    public FileSinkDesc() {
    }

+ /**
+ * @param destPath - the final destination for data
+ */
    public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
        final boolean compressed, final int destTableId, final boolean multiFileSpray,
        final boolean canBeMerged, final int numFiles, final int totalFiles,
- final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx) {
+ final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) {

      this.dirName = dirName;
      this.tableInfo = tableInfo;
@@ -114,6 +119,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
      this.partitionCols = partitionCols;
      this.dpCtx = dpCtx;
      this.dpSortState = DPSortState.NONE;
+ this.destPath = destPath;
    }

    public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -135,7 +141,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
    public Object clone() throws CloneNotSupportedException {
      FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
          destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
- partitionCols, dpCtx);
+ partitionCols, dpCtx, destPath);
      ret.setCompressCodec(compressCodec);
      ret.setCompressType(compressType);
      ret.setGatherStats(gatherStats);
@@ -231,9 +237,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
      return temporary;
    }

- /**
- * @param totalFiles the totalFiles to set
- */
    public void setTemporary(boolean temporary) {
      this.temporary = temporary;
    }
@@ -438,11 +441,23 @@ public class FileSinkDesc extends AbstractOperatorDesc {
    public void setTransactionId(long id) {
      txnId = id;
    }
-
    public long getTransactionId() {
      return txnId;
    }

+ public void setStatementId(int id) {
+ statementId = id;
+ }
+ /**
+ * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)}
+ */
+ public int getStatementId() {
+ return statementId;
+ }
+ public Path getDestPath() {
+ return destPath;
+ }
+
    public Table getTable() {
      return table;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index c5f2d4d..6c77ba4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -545,7 +545,9 @@ public class CompactorMR {
              .reporter(reporter)
              .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
              .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
- .bucket(bucket);
+ .bucket(bucket)
+ .statementId(-1);//setting statementId == -1 makes compacted delta files use
+ //delta_xxxx_yyyy format

          // Instantiate the underlying output format
          @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index e400778..c6ae030 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -303,7 +303,8 @@ public class TestFileSinkOperator {
        Map<String, String> partColNames = new HashMap<String, String>(1);
        partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
        dpCtx.setInputToDPCols(partColNames);
- desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
+ //todo: does this need the finalDestination?
+ desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
      } else {
        desc = new FileSinkDesc(basePath, tableDesc, false);
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 1e3df34..f8ded12 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -46,17 +46,23 @@ public class TestAcidUtils {
          AcidUtils.createFilename(p, options).toString());
      options.bucket(123);
      assertEquals("/tmp/00123_0",
- AcidUtils.createFilename(p, options).toString());
+ AcidUtils.createFilename(p, options).toString());
      options.bucket(23)
          .minimumTransactionId(100)
          .maximumTransactionId(200)
          .writingBase(true)
          .setOldStyle(false);
      assertEquals("/tmp/base_0000200/bucket_00023",
- AcidUtils.createFilename(p, options).toString());
+ AcidUtils.createFilename(p, options).toString());
      options.writingBase(false);
+ assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ options.statementId(-1);
      assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
- AcidUtils.createFilename(p, options).toString());
+ AcidUtils.createFilename(p, options).toString());
+ options.statementId(7);
+ assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
    }

    @Test
@@ -236,7 +242,6 @@ public class TestAcidUtils {
          new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
          new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
          new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
- new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
          new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
      Path part = new MockPath(fs, "mock:/tbl/part1");
      AcidUtils.Directory dir =
@@ -254,6 +259,45 @@ public class TestAcidUtils {
      assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString());
    }

+ /**
+ * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns
+ * @throws Exception
+ */
+ @Test
+ public void testOverlapingDelta2() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+ assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
+ List<FileStatus> obsolete = dir.getObsolete();
+ assertEquals(5, obsolete.size());
+ assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString());
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(5, delts.size());
+ assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString());
+ }
+
    @Test
    public void deltasWithOpenTxnInRead() throws Exception {
      Configuration conf = new Configuration();
@@ -268,6 +312,27 @@ public class TestAcidUtils {
      assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
    }

+ /**
+ * @since 1.3.0
+ * @throws Exception
+ */
+ @Test
+ public void deltasWithOpenTxnInRead2() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(2, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
+ }
+
    @Test
    public void deltasWithOpenTxnsNotInCompact() throws Exception {
      Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 56e5f9f..e96ab2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
  import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
  import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -927,7 +928,7 @@ public class TestInputOutputFormat {
      OrcInputFormat.SplitGenerator splitter =
          new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
              fs.getFileStatus(new Path("/a/file")), null, true,
- new ArrayList<Long>(), true, null, null));
+ new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
      OrcSplit result = splitter.createSplit(0, 200, null);
      assertEquals(0, result.getStart());
      assertEquals(200, result.getLength());
@@ -968,7 +969,7 @@ public class TestInputOutputFormat {
      OrcInputFormat.SplitGenerator splitter =
          new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
              fs.getFileStatus(new Path("/a/file")), null, true,
- new ArrayList<Long>(), true, null, null));
+ new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
      List<OrcSplit> results = splitter.call();
      OrcSplit result = results.get(0);
      assertEquals(3, result.getStart());
@@ -990,7 +991,7 @@ public class TestInputOutputFormat {
      conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
      context = new OrcInputFormat.Context(conf);
      splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
- fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<Long>(),
+ fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(),
          true, null, null));
      results = splitter.call();
      for(int i=0; i < stripeSizes.length; ++i) {
@@ -1497,7 +1498,7 @@ public class TestInputOutputFormat {
      Path partDir = new Path(conf.get("mapred.input.dir"));
      OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
          new AcidOutputFormat.Options(conf).maximumTransactionId(10)
- .writingBase(true).bucket(0).inspector(inspector));
+ .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir));
      for(int i=0; i < 100; ++i) {
        BigRow row = new BigRow(i);
        writer.insert(10, row);
@@ -1648,7 +1649,7 @@ public class TestInputOutputFormat {
      // write a base file in partition 0
      OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
          new AcidOutputFormat.Options(conf).maximumTransactionId(10)
- .writingBase(true).bucket(0).inspector(inspector));
+ .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0]));
      for(int i=0; i < 10; ++i) {
        writer.insert(10, new MyRow(i, 2 * i));
      }
@@ -1661,7 +1662,7 @@ public class TestInputOutputFormat {
      // write a delta file in partition 0
      writer = new OrcRecordUpdater(partDir[0],
          new AcidOutputFormat.Options(conf).maximumTransactionId(10)
- .writingBase(true).bucket(1).inspector(inspector));
+ .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0]));
      for(int i=10; i < 20; ++i) {
        writer.insert(10, new MyRow(i, 2*i));
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 921e954..39f71f1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -62,12 +62,12 @@ import static org.junit.Assert.assertNull;
  public class TestOrcRawRecordMerger {

    private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
-
+//todo: why is statementId -1?
    @Test
    public void testOrdering() throws Exception {
      ReaderKey left = new ReaderKey(100, 200, 1200, 300);
      ReaderKey right = new ReaderKey();
- right.setValues(100, 200, 1000, 200);
+ right.setValues(100, 200, 1000, 200,1);
      assertTrue(right.compareTo(left) < 0);
      assertTrue(left.compareTo(right) > 0);
      assertEquals(false, left.equals(right));
@@ -76,16 +76,16 @@ public class TestOrcRawRecordMerger {
      assertEquals(true, right.equals(left));
      right.setRowId(2000);
      assertTrue(right.compareTo(left) > 0);
- left.setValues(1, 2, 3, 4);
- right.setValues(100, 2, 3, 4);
+ left.setValues(1, 2, 3, 4,-1);
+ right.setValues(100, 2, 3, 4,-1);
      assertTrue(left.compareTo(right) < 0);
      assertTrue(right.compareTo(left) > 0);
- left.setValues(1, 2, 3, 4);
- right.setValues(1, 100, 3, 4);
+ left.setValues(1, 2, 3, 4,-1);
+ right.setValues(1, 100, 3, 4,-1);
      assertTrue(left.compareTo(right) < 0);
      assertTrue(right.compareTo(left) > 0);
- left.setValues(1, 2, 3, 100);
- right.setValues(1, 2, 3, 4);
+ left.setValues(1, 2, 3, 100,-1);
+ right.setValues(1, 2, 3, 4,-1);
      assertTrue(left.compareTo(right) < 0);
      assertTrue(right.compareTo(left) > 0);

@@ -177,7 +177,7 @@ public class TestOrcRawRecordMerger {
      RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
      RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
      ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
- new Reader.Options());
+ new Reader.Options(), 0);
      RecordReader recordReader = pair.recordReader;
      assertEquals(10, key.getTransactionId());
      assertEquals(20, key.getBucketId());
@@ -203,7 +203,7 @@ public class TestOrcRawRecordMerger {
      Reader reader = createMockReader();

      ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
- new Reader.Options());
+ new Reader.Options(), 0);
      RecordReader recordReader = pair.recordReader;
      assertEquals(10, key.getTransactionId());
      assertEquals(20, key.getBucketId());
@@ -489,7 +489,7 @@ public class TestOrcRawRecordMerger {
      // write the empty base
      AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
          .inspector(inspector).bucket(BUCKET).writingBase(true)
- .maximumTransactionId(100);
+ .maximumTransactionId(100).finalDestination(root);
      of.getRecordUpdater(root, options).close(false);

      ValidTxnList txnList = new ValidReadTxnList("200:");
@@ -515,6 +515,10 @@ public class TestOrcRawRecordMerger {
     */
    @Test
    public void testNewBaseAndDelta() throws Exception {
+ testNewBaseAndDelta(false);
+ testNewBaseAndDelta(true);
+ }
+ private void testNewBaseAndDelta(boolean use130Format) throws Exception {
      final int BUCKET = 10;
      String[] values = new String[]{"first", "second", "third", "fourth",
                                     "fifth", "sixth", "seventh", "eighth",
@@ -532,7 +536,10 @@ public class TestOrcRawRecordMerger {

      // write the base
      AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
- .inspector(inspector).bucket(BUCKET);
+ .inspector(inspector).bucket(BUCKET).finalDestination(root);
+ if(!use130Format) {
+ options.statementId(-1);
+ }
      RecordUpdater ru = of.getRecordUpdater(root,
          options.writingBase(true).maximumTransactionId(100));
      for(String v: values) {
@@ -554,7 +561,8 @@ public class TestOrcRawRecordMerger {
      AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);

      assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
- assertEquals(new Path(root, "delta_0000200_0000200"),
+ assertEquals(new Path(root, use130Format ?
+ AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
          directory.getCurrentDirectories().get(0).getPath());

      Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -829,7 +837,7 @@ public class TestOrcRawRecordMerger {
      // write a delta
      AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
          .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
- .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5);
+ .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
      RecordUpdater ru = of.getRecordUpdater(root, options);
      values = new String[]{"0.0", null, null, "1.1", null, null, null,
          "ignore.7"};
@@ -920,6 +928,7 @@ public class TestOrcRawRecordMerger {
      options.orcOptions(OrcFile.writerOptions(conf)
        .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
        .memory(mgr));
+ options.finalDestination(root);
      RecordUpdater ru = of.getRecordUpdater(root, options);
      String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
          "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
@@ -1004,7 +1013,8 @@ public class TestOrcRawRecordMerger {
      AcidOutputFormat.Options options =
          new AcidOutputFormat.Options(conf)
              .bucket(BUCKET).inspector(inspector).filesystem(fs)
- .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+ .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+ .finalDestination(root);
      RecordUpdater ru = of.getRecordUpdater(root, options);
      String[] values = new String[]{"a", "b", "c", "d", "e"};
      for(int i=0; i < values.length; ++i) {
@@ -1047,6 +1057,14 @@ public class TestOrcRawRecordMerger {
     */
    @Test
    public void testRecordReaderIncompleteDelta() throws Exception {
+ testRecordReaderIncompleteDelta(false);
+ testRecordReaderIncompleteDelta(true);
+ }
+ /**
+ *
+ * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
+ */
+ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
      final int BUCKET = 1;
      Configuration conf = new Configuration();
      OrcOutputFormat of = new OrcOutputFormat();
@@ -1063,7 +1081,10 @@ public class TestOrcRawRecordMerger {
      AcidOutputFormat.Options options =
          new AcidOutputFormat.Options(conf)
              .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
- .bucket(BUCKET).inspector(inspector).filesystem(fs);
+ .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
+ if(!use130Format) {
+ options.statementId(-1);
+ }
      RecordUpdater ru = of.getRecordUpdater(root, options);
      String[] values= new String[]{"1", "2", "3", "4", "5"};
      for(int i=0; i < values.length; ++i) {
@@ -1110,8 +1131,8 @@ public class TestOrcRawRecordMerger {
      splits = inf.getSplits(job, 1);
      assertEquals(2, splits.length);
      rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
- Path sideFile = new Path(root +
- "/delta_0000010_0000019/bucket_00001_flush_length");
+ Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
+ AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
      assertEquals(true, fs.exists(sideFile));
      assertEquals(24, fs.getFileStatus(sideFile).getLen());


http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index 22bd4b9..22030b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -97,7 +97,8 @@ public class TestOrcRecordUpdater {
          .minimumTransactionId(10)
          .maximumTransactionId(19)
          .inspector(inspector)
- .reporter(Reporter.NULL);
+ .reporter(Reporter.NULL)
+ .finalDestination(root);
      RecordUpdater updater = new OrcRecordUpdater(root, options);
      updater.insert(11, new MyRow("first"));
      updater.insert(11, new MyRow("second"));
@@ -197,7 +198,8 @@ public class TestOrcRecordUpdater {
          .maximumTransactionId(100)
          .inspector(inspector)
          .reporter(Reporter.NULL)
- .recordIdColumn(1);
+ .recordIdColumn(1)
+ .finalDestination(root);
      RecordUpdater updater = new OrcRecordUpdater(root, options);
      updater.update(100, new MyRow("update", 30, 10, bucket));
      updater.delete(100, new MyRow("", 60, 40, bucket));

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 671e122..21adc9d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -241,7 +241,7 @@ public abstract class CompactorTest {
      return sd;
    }

- // I can't do this with @Before because I want to be able to control when the thead starts
+ // I can't do this with @Before because I want to be able to control when the thread starts
    private void startThread(char type, boolean stopAfterOne) throws Exception {
      startThread(type, stopAfterOne, new AtomicBoolean());
    }
@@ -284,7 +284,7 @@ public abstract class CompactorTest {
      switch (type) {
        case BASE: filename = "base_" + maxTxn; break;
        case LENGTH_FILE: // Fall through to delta
- case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
+ case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
        case LEGACY: break; // handled below
      }

@@ -508,5 +508,21 @@ public abstract class CompactorTest {
      }
    }

+ /**
+ * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that
+ * the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats
+ * are used since new (1.3) code has to be able to read old files.
+ */
+ abstract boolean useHive130DeltaDirName();

+ String makeDeltaDirName(long minTxnId, long maxTxnId) {
+ return useHive130DeltaDirName() ?
+ AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
+ }
+ /**
+ * delta dir name after compaction
+ */
+ String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
+ return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index ffdbb9a..0db732c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -139,7 +139,7 @@ public class TestCleaner extends CompactorTest {
      boolean sawBase = false, sawDelta = false;
      for (Path p : paths) {
        if (p.getName().equals("base_20")) sawBase = true;
- else if (p.getName().equals("delta_21_24")) sawDelta = true;
+ else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
        else Assert.fail("Unexpected file " + p.getName());
      }
      Assert.assertTrue(sawBase);
@@ -177,7 +177,7 @@ public class TestCleaner extends CompactorTest {
      boolean sawBase = false, sawDelta = false;
      for (Path path : paths) {
        if (path.getName().equals("base_20")) sawBase = true;
- else if (path.getName().equals("delta_21_24")) sawDelta = true;
+ else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true;
        else Assert.fail("Unexpected file " + path.getName());
      }
      Assert.assertTrue(sawBase);
@@ -480,4 +480,8 @@ public class TestCleaner extends CompactorTest {
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals(0, rsp.getCompactsSize());
    }
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
new file mode 100644
index 0000000..c637dd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+/**
+ * Same as TestCleaner but tests delta file names in Hive 1.3.0 format
+ */
+public class TestCleaner2 extends TestCleaner {
+ public TestCleaner2() throws Exception {
+ super();
+ }
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 00b13de..0b0b1da 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -713,5 +713,9 @@ public class TestInitiator extends CompactorTest {
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
      Assert.assertEquals(0, compacts.size());
    }
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }

  }

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 4 of 4 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJul 13, '15 at 4:11p
activeJul 13, '15 at 4:31p
posts4
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 4 posts

People

Translate

site design / logo © 2021 Grokbase