FAQ
Repository: hive
Updated Branches:
   refs/heads/branch-1 f4020cfce -> e654efeb3


HVIE-11540 Too many delta files during Compaction - OOM (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e654efeb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e654efeb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e654efeb

Branch: refs/heads/branch-1
Commit: e654efeb32c62fb5cd56214b823526173cb009bb
Parents: f4020cf
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Sat Oct 24 22:01:20 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Sat Oct 24 22:01:20 2015 -0700

----------------------------------------------------------------------
  .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
  .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 15 +-
  .../hive/ql/txn/compactor/CompactorMR.java | 143 ++++++++++++-------
  .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +-
  .../hive/ql/txn/compactor/CompactorTest.java | 4 +
  .../hive/ql/txn/compactor/TestWorker.java | 120 ++++++++++++++--
  6 files changed, 225 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2febd39..4724523 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1500,6 +1500,8 @@ public class HiveConf extends Configuration {
      HIVE_COMPACTOR_DELTA_PCT_THRESHOLD("hive.compactor.delta.pct.threshold", 0.1f,
          "Percentage (fractional) size of the delta files relative to the base that will trigger\n" +
          "a major compaction. (1.0 = 100%, so the default 0.1 = 10%.)"),
+ COMPACTOR_MAX_NUM_DELTA("hive.compactor.max.num.delta", 500, "Maximum number of delta files that " +
+ "the compactor will attempt to handle in a single job."),

      HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
          "Number of aborted transactions involving a given table or partition that will trigger\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index c7e0780..8f60e9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -129,6 +129,9 @@ public class AcidUtils {
      return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
    }

+ public static String baseDir(long txnId) {
+ return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
+ }
    /**
     * Create a filename for a bucket file.
     * @param directory the partition directory
@@ -218,14 +221,16 @@ public class AcidUtils {
      Path getBaseDirectory();

      /**
- * Get the list of original files.
+ * Get the list of original files. Not {@code null}.
       * @return the list of original files (eg. 000000_0)
       */
      List<FileStatus> getOriginalFiles();

      /**
       * Get the list of base and delta directories that are valid and not
- * obsolete.
+ * obsolete. Not {@code null}. List must be sorted in a specific way.
+ * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)}
+ * for details.
       * @return the minimal list of current directories
       */
      List<ParsedDelta> getCurrentDirectories();
@@ -234,7 +239,7 @@ public class AcidUtils {
       * Get the list of obsolete directories. After filtering out bases and
       * deltas that are not selected by the valid transaction list, return the
       * list of original files, bases, and deltas that have been replaced by
- * more up to date ones.
+ * more up to date ones. Not {@code null}.
       */
      List<FileStatus> getObsolete();
    }
@@ -281,6 +286,7 @@ public class AcidUtils {
       * happens in a different process; thus it's possible to have bases/deltas with
       * overlapping txnId boundaries. The sort order helps figure out the "best" set of files
       * to use to get data.
+ * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20)
       */
      @Override
      public int compareTo(ParsedDelta parsedDelta) {
@@ -493,6 +499,9 @@ public class AcidUtils {
      }

      Collections.sort(working);
+ //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
+ //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
+ //subject to list of 'exceptions' in 'txnList' (not show in above example).
      long current = bestBaseTxn;
      int lastStmtId = -1;
      for(ParsedDelta next: working) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index a45536e..436c36d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
  import org.apache.hadoop.hive.metastore.api.FieldSchema;
  import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
  import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
  import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.io.AcidInputFormat;
  import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -93,18 +94,8 @@ public class CompactorMR {
    public CompactorMR() {
    }

- /**
- * Run a compactor job.
- * @param conf Hive configuration file
- * @param jobName name to run this job with
- * @param t metastore table
- * @param sd metastore storage descriptor
- * @param txns list of valid transactions
- * @param isMajor is this a major compaction?
- * @throws java.io.IOException if the job fails
- */
- void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException {
+ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
+ ValidTxnList txns) {
      JobConf job = new JobConf(conf);
      job.setJobName(jobName);
      job.setOutputKeyClass(NullWritable.class);
@@ -116,7 +107,7 @@ public class CompactorMR {
      job.setInputFormat(CompactorInputFormat.class);
      job.setOutputFormat(NullOutputFormat.class);
      job.setOutputCommitter(CompactorOutputCommitter.class);
-
+
      String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
      if(queueName != null && queueName.length() > 0) {
        job.setQueueName(queueName);
@@ -126,12 +117,26 @@ public class CompactorMR {
      job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
      job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
      job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
- job.setBoolean(IS_MAJOR, isMajor);
      job.setBoolean(IS_COMPRESSED, sd.isCompressed());
      job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
      job.setInt(NUM_BUCKETS, sd.getNumBuckets());
      job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
      setColumnTypes(job, sd.getCols());
+ return job;
+ }
+ /**
+ * Run Compaction which may consist of several jobs on the cluster.
+ * @param conf Hive configuration file
+ * @param jobName name to run this job with
+ * @param t metastore table
+ * @param sd metastore storage descriptor
+ * @param txns list of valid transactions
+ * @param ci CompactionInfo
+ * @throws java.io.IOException if the job fails
+ */
+ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
+ ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+ JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);

      // Figure out and encode what files we need to read. We do this here (rather than in
      // getSplits below) because as part of this we discover our minimum and maximum transactions,
@@ -139,9 +144,36 @@ public class CompactorMR {
      // mapper.

      AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+ List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
+ int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
+ if(parsedDeltas.size() > maxDeltastoHandle) {
+ /**
+ * if here, that means we have very high number of delta files. This may be sign of a temporary
+ * glitch or a real issue. For example, if transaction batch size or transaction size is set too
+ * low for the event flow rate in Streaming API, it may generate lots of delta files very
+ * quickly. Another possibility is that Compaction is repeatedly failing and not actually compacting.
+ * Thus, force N minor compactions first to reduce number of deltas and then follow up with
+ * the compaction actually requested in {@link ci} which now needs to compact a lot fewer deltas
+ */
+ LOG.warn(parsedDeltas.size() + " delta files found for " + ci.getFullPartitionName()
+ + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " +
+ "especially if this message repeats. Check that compaction is running properly. Check for any " +
+ "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
+ int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
+ for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
+ JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns);
+ launchCompactionJob(jobMinorCompact,
+ null, CompactionType.MINOR, null,
+ parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
+ maxDeltastoHandle, -1);
+ }
+ //now recompute state since we've done minor compactions and have different 'best' set of deltas
+ dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+ }
+
      StringableList dirsToSearch = new StringableList();
      Path baseDir = null;
- if (isMajor) {
+ if (ci.isMajorCompaction()) {
        // There may not be a base dir if the partition was empty before inserts or if this
        // partition is just now being converted to ACID.
        baseDir = dir.getBaseDirectory();
@@ -163,14 +195,26 @@ public class CompactorMR {
        }
      }

- List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
-
- if (parsedDeltas == null || parsedDeltas.size() == 0) {
+ if (parsedDeltas.size() == 0) {
        // Seriously, no deltas? Can't compact that.
        LOG.error( "No delta files found to compact in " + sd.getLocation());
+ //couldn't someone want to run a Major compaction to convert old table to ACID?
        return;
      }

+ launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
+ dir.getCurrentDirectories().size(), dir.getObsolete().size());
+
+ su.gatherStats();
+ }
+ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
+ StringableList dirsToSearch,
+ List<AcidUtils.ParsedDelta> parsedDeltas,
+ int curDirNumber, int obsoleteDirNumber) throws IOException {
+ job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
+ if(dirsToSearch == null) {
+ dirsToSearch = new StringableList();
+ }
      StringableList deltaDirs = new StringableList();
      long minTxn = Long.MAX_VALUE;
      long maxTxn = Long.MIN_VALUE;
@@ -187,18 +231,15 @@ public class CompactorMR {
      job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
      job.setLong(MIN_TXN, minTxn);
      job.setLong(MAX_TXN, maxTxn);
- LOG.debug("Setting minimum transaction to " + minTxn);
- LOG.debug("Setting maximume transaction to " + maxTxn);

+ LOG.info("Submitting " + compactionType + " compaction job '" +
+ job.getJobName() + "' to " + job.getQueueName() + " queue. " +
+ "(current delta dirs count=" + curDirNumber +
+ ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
      RunningJob rj = JobClient.runJob(job);
- LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" +
- jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue. " +
- "(current delta dirs count=" + dir.getCurrentDirectories().size() +
- ", obsolete delta dirs count=" + dir.getObsolete());
+ LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID());
      rj.waitForCompletion();
- su.gatherStats();
    }
-
    /**
     * Set the column names and types into the job conf for the input format
     * to use.
@@ -245,8 +286,8 @@ public class CompactorMR {
       * @throws IOException
       */
      CompactorInputSplit(Configuration hadoopConf, int bucket, List<Path> files, Path base,
- Path[] deltas)
- throws IOException {
+ Path[] deltas)
+ throws IOException {
        bucketNum = bucket;
        this.base = base;
        this.deltas = deltas;
@@ -396,7 +437,7 @@ public class CompactorMR {
          // If this is a base or delta directory, then we need to be looking for the bucket files.
          // But if it's a legacy file then we need to add it directly.
          if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) ||
- dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+ dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
            boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
            FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
            for(FileStatus f : files) {
@@ -414,7 +455,7 @@ public class CompactorMR {
        for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) {
          BucketTracker bt = e.getValue();
          splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets,
- bt.sawBase ? baseDir : null, deltaDirs));
+ bt.sawBase ? baseDir : null, deltaDirs));
        }

        LOG.debug("Returning " + splits.size() + " splits");
@@ -423,7 +464,7 @@ public class CompactorMR {

      @Override
      public RecordReader<NullWritable, CompactorInputSplit> getRecordReader(
- InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
+ InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
        return new CompactorRecordReader((CompactorInputSplit)inputSplit);
      }

@@ -431,7 +472,7 @@ public class CompactorMR {
                                Map<Integer, BucketTracker> splitToBucketMap) {
        if (!matcher.find()) {
          LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
- file.toString() + " Matcher=" + matcher.toString());
+ file.toString() + " Matcher=" + matcher.toString());
        }
        int bucketNum = Integer.valueOf(matcher.group());
        BucketTracker bt = splitToBucketMap.get(bucketNum);
@@ -456,7 +497,7 @@ public class CompactorMR {
    }

    static class CompactorRecordReader
- implements RecordReader<NullWritable, CompactorInputSplit> {
+ implements RecordReader<NullWritable, CompactorInputSplit> {
      private CompactorInputSplit split;

      CompactorRecordReader(CompactorInputSplit split) {
@@ -501,7 +542,7 @@ public class CompactorMR {
    }

    static class CompactorMap<V extends Writable>
- implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> {
+ implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> {

      JobConf jobConf;
      RecordWriter writer;
@@ -515,15 +556,15 @@ public class CompactorMR {
        // Based on the split we're passed we go instantiate the real reader and then iterate on it
        // until it finishes.
        @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
- AcidInputFormat<WritableComparable, V> aif =
- instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
+ AcidInputFormat<WritableComparable, V> aif =
+ instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
        ValidTxnList txnList =
- new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+ new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));

        boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
        AcidInputFormat.RawReader<V> reader =
- aif.getRawReader(jobConf, isMajor, split.getBucket(),
- txnList, split.getBaseDir(), split.getDeltaDirs());
+ aif.getRawReader(jobConf, isMajor, split.getBucket(),
+ txnList, split.getBaseDir(), split.getDeltaDirs());
        RecordIdentifier identifier = reader.createKey();
        V value = reader.createValue();
        getWriter(reporter, reader.getObjectInspector(), split.getBucket());
@@ -551,20 +592,20 @@ public class CompactorMR {
        if (writer == null) {
          AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
          options.inspector(inspector)
- .writingBase(jobConf.getBoolean(IS_MAJOR, false))
- .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
- .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
- .reporter(reporter)
- .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
- .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
- .bucket(bucket)
- .statementId(-1);//setting statementId == -1 makes compacted delta files use
+ .writingBase(jobConf.getBoolean(IS_MAJOR, false))
+ .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+ .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
+ .reporter(reporter)
+ .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+ .bucket(bucket)
+ .statementId(-1);//setting statementId == -1 makes compacted delta files use
          //delta_xxxx_yyyy format

          // Instantiate the underlying output format
          @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
- AcidOutputFormat<WritableComparable, V> aof =
- instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+ AcidOutputFormat<WritableComparable, V> aof =
+ instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));

          writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options);
        }
@@ -719,7 +760,7 @@ public class CompactorMR {
        Path finalLocation = new Path(conf.get(FINAL_LOCATION));
        FileSystem fs = tmpLocation.getFileSystem(conf);
        LOG.debug("Moving contents of " + tmpLocation.toString() + " to " +
- finalLocation.toString());
+ finalLocation.toString());

        FileStatus[] contents = fs.listStatus(tmpLocation);
        for (int i = 0; i < contents.length; i++) {
@@ -738,4 +779,4 @@ public class CompactorMR {
        fs.delete(tmpLocation, true);
      }
    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 0548117..cc7441a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -77,7 +77,7 @@ public class Worker extends CompactorThread {
        // Make sure nothing escapes this run method and kills the metastore at large,
        // so wrap it in a big catch Throwable statement.
        try {
- CompactionInfo ci = txnHandler.findNextToCompact(name);
+ final CompactionInfo ci = txnHandler.findNextToCompact(name);

          if (ci == null && !stop.get()) {
            try {
@@ -158,14 +158,14 @@ public class Worker extends CompactorThread {
          launchedJob = true;
          try {
            if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
+ mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
            } else {
              UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
                UserGroupInformation.getLoginUser());
              ugi.doAs(new PrivilegedExceptionAction<Object>() {
                @Override
                public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
+ mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
                  return null;
                }
              });

http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 5a8c932..39c0571 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -516,6 +516,10 @@ public abstract class CompactorTest {
    abstract boolean useHive130DeltaDirName();

    String makeDeltaDirName(long minTxnId, long maxTxnId) {
+ if(minTxnId != maxTxnId) {
+ //covers both streaming api and post compaction style.
+ return makeDeltaDirNameCompacted(minTxnId, maxTxnId);
+ }
      return useHive130DeltaDirName() ?
        AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 11e5333..245e839 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.fs.*;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
@@ -29,6 +30,7 @@ import org.junit.Test;
  import java.io.*;
  import java.util.ArrayList;
  import java.util.Arrays;
+import java.util.BitSet;
  import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
@@ -37,6 +39,10 @@ import java.util.Map;

  /**
   * Tests for the worker thread and its MR jobs.
+ * todo: most delta files in this test suite use txn id range, i.e. [N,N+M]
+ * That means that they all look like they were created by compaction or by streaming api.
+ * Delta files created by SQL should have [N,N] range (and a suffix in v1.3 and later)
+ * Need to change some of these to have better test coverage.
   */
  public class TestWorker extends CompactorTest {
    static final private String CLASS_NAME = TestWorker.class.getName();
@@ -325,18 +331,14 @@ public class TestWorker extends CompactorTest {
      // There should still now be 5 directories in the location
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- boolean is130 = this instanceof TestWorker2;
- Assert.assertEquals(is130 ? 5 : 4, stat.length);
+ Assert.assertEquals(4, stat.length);

      // Find the new delta file and make sure it has the right contents
      Arrays.sort(stat);
      Assert.assertEquals("base_20", stat[0].getPath().getName());
- if(is130) {//in1.3.0 orig delta is delta_00021_00022_0000 and compacted one is delta_00021_00022...
- Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
- }
- Assert.assertEquals(makeDeltaDirName(21, 22), stat[1 + (is130 ? 1 : 0)].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[2 + (is130 ? 1 : 0)].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[3 + (is130 ? 1 : 0)].getPath().getName());
+ Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
    }

    @Test
@@ -508,6 +510,108 @@ public class TestWorker extends CompactorTest {
    }

    @Test
+ public void minorNoBaseLotsOfDeltas() throws Exception {
+ compactNoBaseLotsOfDeltas(CompactionType.MINOR);
+ }
+ @Test
+ public void majorNoBaseLotsOfDeltas() throws Exception {
+ compactNoBaseLotsOfDeltas(CompactionType.MAJOR);
+ }
+ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception {
+ conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2);
+ Table t = newTable("default", "mapwb", true);
+ Partition p = newPartition(t, "today");
+
+// addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 21L, 2);
+ addDeltaFile(t, p, 23L, 23L, 2);
+ //make it look like streaming API use case
+ addDeltaFile(t, p, 25L, 29L, 2);
+ addDeltaFile(t, p, 31L, 32L, 3);
+ //make it looks like 31-32 has been compacted, but not cleaned
+ addDeltaFile(t, p, 31L, 33L, 5);
+ addDeltaFile(t, p, 35L, 35L, 1);
+
+ /*since COMPACTOR_MAX_NUM_DELTA=2,
+ we expect files 1,2 to be minor compacted by 1 job to produce delta_21_23
+ * 3,5 to be minor compacted by 2nd job (file 4 is obsolete) to make delta_25_33 (4th is skipped)
+ *
+ * and then the 'requested'
+ * minor compaction to combine delta_21_23, delta_25_33 and delta_35_35 to make delta_21_35
+ * or major compaction to create base_35*/
+ burnThroughTransactions(35);
+ CompactionRequest rqst = new CompactionRequest("default", "mapwb", type);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(9, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ BitSet matchesFound = new BitSet(9);
+ for (int i = 0; i < stat.length; i++) {
+ if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) {
+ matchesFound.set(0);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) {
+ matchesFound.set(1);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) {
+ matchesFound.set(2);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) {
+ matchesFound.set(3);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) {
+ matchesFound.set(4);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) {
+ matchesFound.set(5);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) {
+ matchesFound.set(6);
+ }
+ else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) {
+ matchesFound.set(7);
+ }
+ switch (type) {
+ //yes, both do set(8)
+ case MINOR:
+ if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) {
+ matchesFound.set(8);
+ }
+ break;
+ case MAJOR:
+ if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) {
+ matchesFound.set(8);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ StringBuilder sb = null;
+ for(int i = 0; i < stat.length; i++) {
+ if(!matchesFound.get(i)) {
+ if(sb == null) {
+ sb = new StringBuilder("Some files are missing at index: ");
+ }
+ sb.append(i).append(",");
+ }
+ }
+ if (sb != null) {
+ Assert.assertTrue(sb.toString(), false);
+ }
+ }
+ @Test
    public void majorPartitionWithBase() throws Exception {
      LOG.debug("Starting majorPartitionWithBase");
      Table t = newTable("default", "mapwb", true);

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 25, '15 at 5:02a
activeOct 25, '15 at 5:02a
posts1
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 1 post

People

Translate

site design / logo © 2021 Grokbase