FAQ
Author: gates
Date: Mon Oct 13 21:32:54 2014
New Revision: 1631562

URL: http://svn.apache.org/r1631562
Log:
HIVE-8258 Compactor cleaners can be starved on a busy table or partition. (Alan Gates reviewed by Eugene Koifman)

Modified:
     hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
     hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
     hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
     hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
     hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
     hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
     hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
     hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
     hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
     hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
     hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
     hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
     hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
     hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Modified: hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/branch-0.14/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Oct 13 21:32:54 2014
@@ -1296,6 +1296,9 @@ public class HiveConf extends Configurat
          "Number of aborted transactions involving a particular table or partition before major\n" +
          "compaction is initiated."),

+ HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
+
      // For HBase storage handler
      HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
          "Whether writes to HBase should be forced to the write-ahead log. \n" +

Modified: hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (original)
+++ hive/branches/branch-0.14/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Mon Oct 13 21:32:54 2014
@@ -230,8 +230,9 @@ public class TestCompactor {
      t.setThreadId((int) t.getId());
      t.setHiveConf(conf);
      MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+ MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer();
      stop.boolVal = true;
- t.init(stop);
+ t.init(stop, looped);
      t.run();
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();

Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Mon Oct 13 21:32:54 2014
@@ -5933,7 +5933,7 @@ public class HiveMetaStore extends Thrif
      LOG.info("Starting metastore thread of type " + thread.getClass().getName());
      thread.setHiveConf(conf);
      thread.setThreadId(nextThreadId++);
- thread.init(new MetaStoreThread.BooleanPointer());
+ thread.init(new MetaStoreThread.BooleanPointer(), new MetaStoreThread.BooleanPointer());
      thread.start();
    }
  }

Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java (original)
+++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java Mon Oct 13 21:32:54 2014
@@ -43,8 +43,13 @@ public interface MetaStoreThread {
     * have been called.
     * @param stop a flag to watch for when to stop. If this value is set to true,
     * the thread will terminate the next time through its main loop.
+ * @param looped a flag that is set to true everytime a thread goes through it's main loop.
+ * This is purely for testing so that tests can assure themselves that the thread
+ * has run through it's loop once. The test can set this value to false. The
+ * thread should then assure that the loop has been gone completely through at
+ * least once.
     */
- void init(BooleanPointer stop) throws MetaException;
+ void init(BooleanPointer stop, BooleanPointer looped) throws MetaException;

    /**
     * Run the thread in the background. This must not be called until

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Oct 13 21:32:54 2014
@@ -348,7 +348,7 @@ public class AcidUtils {
      long bestBaseTxn = 0;
      final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
      List<ParsedDelta> working = new ArrayList<ParsedDelta>();
- final List<FileStatus> original = new ArrayList<FileStatus>();
+ List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
      final List<FileStatus> obsolete = new ArrayList<FileStatus>();
      List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory,
          hiddenFileFilter);
@@ -375,16 +375,26 @@ public class AcidUtils {
            working.add(delta);
          }
        } else {
- findOriginals(fs, child, original);
+ // This is just the directory. We need to recurse and find the actual files. But don't
+ // do this until we have determined there is no base. This saves time. Plus,
+ // it is possible that the cleaner is running and removing these original files,
+ // in which case recursing through them could cause us to get an error.
+ originalDirectories.add(child);
        }
      }

+ final List<FileStatus> original = new ArrayList<FileStatus>();
      // if we have a base, the original files are obsolete.
      if (bestBase != null) {
- obsolete.addAll(original);
        // remove the entries so we don't get confused later and think we should
        // use them.
        original.clear();
+ } else {
+ // Okay, we're going to need these originals. Recurse through them and figure out what we
+ // really need.
+ for (FileStatus origDir : originalDirectories) {
+ findOriginals(fs, origDir, original);
+ }
      }

      Collections.sort(working);

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Mon Oct 13 21:32:54 2014
@@ -24,15 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.ValidTxnList;
  import org.apache.hadoop.hive.common.ValidTxnListImpl;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
  import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
  import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
  import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.security.UserGroupInformation;
@@ -41,7 +38,12 @@ import org.apache.hadoop.util.StringUtil
  import java.io.IOException;
  import java.security.PrivilegedExceptionAction;
  import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
  import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;

  /**
   * A class to clean directories after compactions. This will run in a separate thread.
@@ -50,35 +52,85 @@ public class Cleaner extends CompactorTh
    static final private String CLASS_NAME = Cleaner.class.getName();
    static final private Log LOG = LogFactory.getLog(CLASS_NAME);

- private long cleanerCheckInterval = 5000;
+ private long cleanerCheckInterval = 0;
+
+ // List of compactions to clean.
+ private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>();
+ private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>();

    @Override
    public void run() {
- // Make sure nothing escapes this run method and kills the metastore at large,
- // so wrap it in a big catch Throwable statement.
+ if (cleanerCheckInterval == 0) {
+ cleanerCheckInterval = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
      do {
+ // This is solely for testing. It checks if the test has set the looped value to false,
+ // and if so remembers that and then sets it to true at the end. We have to check here
+ // first to make sure we go through a complete iteration of the loop before resetting it.
+ boolean setLooped = !looped.boolVal;
+ // Make sure nothing escapes this run method and kills the metastore at large,
+ // so wrap it in a big catch Throwable statement.
        try {
          long startedAt = System.currentTimeMillis();

- // Now look for new entries ready to be cleaned.
+ // First look for all the compactions that are waiting to be cleaned. If we have not
+ // seen an entry before, look for all the locks held on that table or partition and
+ // record them. We will then only clean the partition once all of those locks have been
+ // released. This way we avoid removing the files while they are in use,
+ // while at the same time avoiding starving the cleaner as new readers come along.
+ // This works because we know that any reader who comes along after the worker thread has
+ // done the compaction will read the more up to date version of the data (either in a
+ // newer delta or in a newer base).
          List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- for (CompactionInfo ci : toClean) {
- LockComponent comp = null;
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, ci.dbname);
- comp.setTablename(ci.tableName);
- if (ci.partName != null) comp.setPartitionname(ci.partName);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest rqst = new LockRequest(components, System.getProperty("user.name"),
- Worker.hostname());
- LockResponse rsp = txnHandler.lockNoWait(rqst);
+ if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
+ ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
+
+ for (CompactionInfo ci : toClean) {
+ // Check to see if we have seen this request before. If so, ignore it. If not,
+ // add it to our queue.
+ if (!compactId2LockMap.containsKey(ci.id)) {
+ compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse));
+ compactId2CompactInfoMap.put(ci.id, ci);
+ }
+ }
+
+ // Now, for each entry in the queue, see if all of the associated locks are clear so we
+ // can clean
+ Set<Long> currentLocks = buildCurrentLockSet(locksResponse);
+ List<Long> expiredLocks = new ArrayList<Long>();
+ List<Long> compactionsCleaned = new ArrayList<Long>();
            try {
- if (rsp.getState() == LockState.ACQUIRED) {
- clean(ci);
+ for (Map.Entry<Long, Set<Long>> queueEntry : compactId2LockMap.entrySet()) {
+ boolean sawLock = false;
+ for (Long lockId : queueEntry.getValue()) {
+ if (currentLocks.contains(lockId)) {
+ sawLock = true;
+ break;
+ } else {
+ expiredLocks.add(lockId);
+ }
+ }
+
+ if (!sawLock) {
+ // Remember to remove this when we're out of the loop,
+ // we can't do it in the loop or we'll get a concurrent modification exception.
+ compactionsCleaned.add(queueEntry.getKey());
+ clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
+ } else {
+ // Remove the locks we didn't see so we don't look for them again next time
+ for (Long lockId : expiredLocks) {
+ queueEntry.getValue().remove(lockId);
+ }
+ }
              }
            } finally {
- if (rsp.getState() == LockState.ACQUIRED) {
- txnHandler.unlock(new UnlockRequest(rsp.getLockid()));
+ if (compactionsCleaned.size() > 0) {
+ for (Long compactId : compactionsCleaned) {
+ compactId2LockMap.remove(compactId);
+ compactId2CompactInfoMap.remove(compactId);
+ }
              }
            }
          }
@@ -91,9 +143,37 @@ public class Cleaner extends CompactorTh
          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
              StringUtils.stringifyException(t));
        }
+ if (setLooped) {
+ looped.boolVal = true;
+ }
      } while (!stop.boolVal);
    }

+ private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
+ Set<Long> relatedLocks = new HashSet<Long>();
+ for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+ if (ci.dbname.equals(lock.getDbname())) {
+ if ((ci.tableName == null && lock.getTablename() == null) ||
+ (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) {
+ if ((ci.partName == null && lock.getPartname() == null) ||
+ (ci.partName != null && ci.partName.equals(lock.getPartname()))) {
+ relatedLocks.add(lock.getLockid());
+ }
+ }
+ }
+ }
+
+ return relatedLocks;
+ }
+
+ private Set<Long> buildCurrentLockSet(ShowLocksResponse locksResponse) {
+ Set<Long> currentLocks = new HashSet<Long>(locksResponse.getLocks().size());
+ for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
+ currentLocks.add(lock.getLockid());
+ }
+ return currentLocks;
+ }
+
    private void clean(CompactionInfo ci) throws MetaException {
      LOG.info("Starting cleaning for " + ci.getFullPartitionName());
      try {

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java Mon Oct 13 21:32:54 2014
@@ -53,6 +53,7 @@ abstract class CompactorThread extends T
    protected RawStore rs;
    protected int threadId;
    protected BooleanPointer stop;
+ protected BooleanPointer looped;

    @Override
    public void setHiveConf(HiveConf conf) {
@@ -66,8 +67,9 @@ abstract class CompactorThread extends T
    }

    @Override
- public void init(BooleanPointer stop) throws MetaException {
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
      this.stop = stop;
+ this.looped = looped;
      setPriority(MIN_PRIORITY);
      setDaemon(true); // this means the process will exit without waiting for this thread


Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Mon Oct 13 21:32:54 2014
@@ -137,8 +137,8 @@ public class Initiator extends Compactor
    }

    @Override
- public void init(BooleanPointer stop) throws MetaException {
- super.init(stop);
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+ super.init(stop, looped);
      checkInterval =
          conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ;
    }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Mon Oct 13 21:32:54 2014
@@ -168,8 +168,8 @@ public class Worker extends CompactorThr
    }

    @Override
- public void init(BooleanPointer stop) throws MetaException {
- super.init(stop);
+ public void init(BooleanPointer stop, BooleanPointer looped) throws MetaException {
+ super.init(stop, looped);

      StringBuilder name = new StringBuilder(hostname());
      name.append("-");

Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Mon Oct 13 21:32:54 2014
@@ -217,11 +217,11 @@ public class TestAcidUtils {
      Path part = new MockPath(fs, "/tbl/part1");
      AcidUtils.Directory dir =
          AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+ // The two original buckets won't be in the obsolete list because we don't look at those
+ // until we have determined there is no base.
      List<FileStatus> obsolete = dir.getObsolete();
- assertEquals(3, obsolete.size());
+ assertEquals(1, obsolete.size());
      assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString());
- assertEquals("mock:/tbl/part1/000000_0", obsolete.get(1).getPath().toString());
- assertEquals("mock:/tbl/part1/000001_1", obsolete.get(2).getPath().toString());
      assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
    }


Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Mon Oct 13 21:32:54 2014
@@ -63,12 +63,13 @@ public abstract class CompactorTest {
    protected CompactionTxnHandler txnHandler;
    protected IMetaStoreClient ms;
    protected long sleepTime = 1000;
+ protected HiveConf conf;

    private final MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
    private final File tmpdir;

    protected CompactorTest() throws Exception {
- HiveConf conf = new HiveConf();
+ conf = new HiveConf();
      TxnDbUtil.setConfValues(conf);
      TxnDbUtil.cleanDb();
      ms = new HiveMetaStoreClient(conf);
@@ -79,16 +80,20 @@ public abstract class CompactorTest {
      tmpdir.deleteOnExit();
    }

- protected void startInitiator(HiveConf conf) throws Exception {
- startThread('i', conf);
+ protected void startInitiator() throws Exception {
+ startThread('i', true);
    }

- protected void startWorker(HiveConf conf) throws Exception {
- startThread('w', conf);
+ protected void startWorker() throws Exception {
+ startThread('w', true);
    }

- protected void startCleaner(HiveConf conf) throws Exception {
- startThread('c', conf);
+ protected void startCleaner() throws Exception {
+ startThread('c', true);
+ }
+
+ protected void startCleaner(MetaStoreThread.BooleanPointer looped) throws Exception {
+ startThread('c', false, looped);
    }

    protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
@@ -117,6 +122,9 @@ public abstract class CompactorTest {

      table.setParameters(parameters);

+ // drop the table first, in case some previous test created it
+ ms.dropTable(dbName, tableName);
+
      ms.createTable(table);
      return table;
    }
@@ -142,37 +150,27 @@ public abstract class CompactorTest {
      return txns.get(0);
    }

- protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
- int numRecords) throws Exception{
- addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
+ protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
+ throws Exception {
+ addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
    }

- protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
- int numRecords) throws Exception{
- addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
+ protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception {
+ addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
    }

- protected void addLegacyFile(HiveConf conf, Table t, Partition p,
- int numRecords) throws Exception {
- addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
+ protected void addLegacyFile(Table t, Partition p, int numRecords) throws Exception {
+ addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
    }

- protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
- int numRecords, int numBuckets, boolean allBucketsPresent)
- throws Exception {
- addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
+ protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords,
+ int numBuckets, boolean allBucketsPresent) throws Exception {
+ addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
    }

- protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
- int numRecords, int numBuckets, boolean allBucketsPresent)
- throws Exception {
- addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
- }
-
- protected void addLegacyFile(HiveConf conf, Table t, Partition p,
- int numRecords, int numBuckets, boolean allBucketsPresent)
- throws Exception {
- addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, numBuckets, allBucketsPresent);
+ protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, int numBuckets,
+ boolean allBucketsPresent) throws Exception {
+ addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
    }

    protected List<Path> getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
@@ -191,6 +189,10 @@ public abstract class CompactorTest {
      for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid));
    }

+ protected void stopThread() {
+ stop.boolVal = true;
+ }
+
    private StorageDescriptor newStorageDescriptor(String location, List<Order> sortCols) {
      StorageDescriptor sd = new StorageDescriptor();
      List<FieldSchema> cols = new ArrayList<FieldSchema>(2);
@@ -214,9 +216,13 @@ public abstract class CompactorTest {
      return sd;
    }

- // I can't do this with @Before because I want to be able to control the config file provided
- // to each test.
- private void startThread(char type, HiveConf conf) throws Exception {
+ // I can't do this with @Before because I want to be able to control when the thead starts
+ private void startThread(char type, boolean stopAfterOne) throws Exception {
+ startThread(type, stopAfterOne, new MetaStoreThread.BooleanPointer());
+ }
+
+ private void startThread(char type, boolean stopAfterOne, MetaStoreThread.BooleanPointer looped)
+ throws Exception {
      TxnDbUtil.setConfValues(conf);
      CompactorThread t = null;
      switch (type) {
@@ -227,9 +233,10 @@ public abstract class CompactorTest {
      }
      t.setThreadId((int) t.getId());
      t.setHiveConf(conf);
- stop.boolVal = true;
- t.init(stop);
- t.run();
+ stop.boolVal = stopAfterOne;
+ t.init(stop, looped);
+ if (stopAfterOne) t.run();
+ else t.start();
    }

    private String getLocation(String tableName, String partValue) {
@@ -243,7 +250,7 @@ public abstract class CompactorTest {

    private enum FileType {BASE, DELTA, LEGACY};

- private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ private void addFile(Table t, Partition p, long minTxn, long maxTxn,
                         int numRecords, FileType type, int numBuckets,
                         boolean allBucketsPresent) throws Exception {
      String partValue = (p == null) ? null : p.getValues().get(0);

Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java Mon Oct 13 21:32:54 2014
@@ -18,21 +18,26 @@
  package org.apache.hadoop.hive.ql.txn.compactor;

  import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
  import org.apache.hadoop.hive.metastore.api.*;
  import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.junit.Before;
  import org.junit.Test;

  import java.util.ArrayList;
  import java.util.List;
+import java.util.concurrent.TimeUnit;

  /**
   * Tests for the compactor Cleaner thread
   */
  public class TestCleaner extends CompactorTest {
+
+ static final private Log LOG = LogFactory.getLog(TestCleaner.class.getName());
+
    public TestCleaner() throws Exception {
      super();
    }
@@ -41,19 +46,17 @@ public class TestCleaner extends Compact
    public void nothing() throws Exception {
      // Test that the whole things works when there's nothing in the queue. This is just a
      // survival test.
- startCleaner(new HiveConf());
+ startCleaner();
    }

    @Test
    public void cleanupAfterMajorTableCompaction() throws Exception {
      Table t = newTable("default", "camtc", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addBaseFile(conf, t, null, 25L, 25);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 25L, 25);

      burnThroughTransactions(25);

@@ -63,7 +66,7 @@ public class TestCleaner extends Compact
      txnHandler.markCompacted(ci);
      txnHandler.setRunAs(ci.id, System.getProperty("user.name"));

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -80,12 +83,10 @@ public class TestCleaner extends Compact
      Table t = newTable("default", "campc", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addBaseFile(conf, t, p, 25L, 25);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 25L, 25);

      burnThroughTransactions(25);

@@ -96,7 +97,7 @@ public class TestCleaner extends Compact
      txnHandler.markCompacted(ci);
      txnHandler.setRunAs(ci.id, System.getProperty("user.name"));

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -112,12 +113,10 @@ public class TestCleaner extends Compact
    public void cleanupAfterMinorTableCompaction() throws Exception {
      Table t = newTable("default", "camitc", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addDeltaFile(conf, t, null, 21L, 24L, 4);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);

      burnThroughTransactions(25);

@@ -127,7 +126,7 @@ public class TestCleaner extends Compact
      txnHandler.markCompacted(ci);
      txnHandler.setRunAs(ci.id, System.getProperty("user.name"));

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -151,12 +150,10 @@ public class TestCleaner extends Compact
      Table t = newTable("default", "camipc", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addDeltaFile(conf, t, p, 21L, 24L, 4);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);

      burnThroughTransactions(25);

@@ -167,7 +164,7 @@ public class TestCleaner extends Compact
      txnHandler.markCompacted(ci);
      txnHandler.setRunAs(ci.id, System.getProperty("user.name"));

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -190,12 +187,10 @@ public class TestCleaner extends Compact
    public void blockedByLockTable() throws Exception {
      Table t = newTable("default", "bblt", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addDeltaFile(conf, t, null, 21L, 24L, 4);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);

      burnThroughTransactions(25);

@@ -212,7 +207,7 @@ public class TestCleaner extends Compact
      LockRequest req = new LockRequest(components, "me", "localhost");
      LockResponse res = txnHandler.lock(req);

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -228,12 +223,10 @@ public class TestCleaner extends Compact
      Table t = newTable("default", "bblp", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addDeltaFile(conf, t, p, 21L, 24L, 4);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);

      burnThroughTransactions(25);

@@ -244,7 +237,7 @@ public class TestCleaner extends Compact
      txnHandler.markCompacted(ci);
      txnHandler.setRunAs(ci.id, System.getProperty("user.name"));

- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("bblp");
      comp.setPartitionname("ds=today");
      List<LockComponent> components = new ArrayList<LockComponent>(1);
@@ -252,7 +245,7 @@ public class TestCleaner extends Compact
      LockRequest req = new LockRequest(components, "me", "localhost");
      LockResponse res = txnHandler.lock(req);

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -265,15 +258,154 @@ public class TestCleaner extends Compact
    }

    @Test
+ public void notBlockedBySubsequentLock() throws Exception {
+ Table t = newTable("default", "bblt", false);
+
+ // Set the run frequency low on this test so it doesn't take long
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100,
+ TimeUnit.MILLISECONDS);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
+ comp.setTablename("bblt");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer();
+ looped.boolVal = false;
+ startCleaner(looped);
+
+ // Make sure the compactor has a chance to run once
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+
+ // There should still be one request, as the locks still held.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+
+ // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial
+ // clean request
+ LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
+ comp2.setTablename("bblt");
+ List<LockComponent> components2 = new ArrayList<LockComponent>(1);
+ components2.add(comp2);
+ LockRequest req2 = new LockRequest(components, "me", "localhost");
+ LockResponse res2 = txnHandler.lock(req2);
+
+ // Unlock the previous lock
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ looped.boolVal = false;
+
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+ stopThread();
+ Thread.currentThread().sleep(200);
+
+
+ // Check there are no compactions requests left.
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ }
+
+ @Test
+ public void partitionNotBlockedBySubsequentLock() throws Exception {
+ Table t = newTable("default", "bblt", true);
+ Partition p = newPartition(t, "today");
+
+ // Set the run frequency low on this test so it doesn't take long
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100,
+ TimeUnit.MILLISECONDS);
+
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default");
+ comp.setTablename("bblt");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ MetaStoreThread.BooleanPointer looped = new MetaStoreThread.BooleanPointer();
+ looped.boolVal = false;
+ startCleaner(looped);
+
+ // Make sure the compactor has a chance to run once
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+
+ // There should still be one request, as the locks still held.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+
+
+ // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial
+ // clean request
+ LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default");
+ comp2.setTablename("bblt");
+ comp2.setPartitionname("ds=today");
+ List<LockComponent> components2 = new ArrayList<LockComponent>(1);
+ components2.add(comp2);
+ LockRequest req2 = new LockRequest(components, "me", "localhost");
+ LockResponse res2 = txnHandler.lock(req2);
+
+ // Unlock the previous lock
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ looped.boolVal = false;
+
+ while (!looped.boolVal) {
+ Thread.currentThread().sleep(100);
+ }
+ stopThread();
+ Thread.currentThread().sleep(200);
+
+
+ // Check there are no compactions requests left.
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ }
+
+ @Test
    public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception {
      Table t = newTable("default", "campcnb", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, p, 1L, 22L, 22);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addBaseFile(conf, t, p, 25L, 25);
+ addDeltaFile(t, p, 1L, 22L, 22);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 25L, 25);

      burnThroughTransactions(25);

@@ -284,7 +416,7 @@ public class TestCleaner extends Compact
      txnHandler.markCompacted(ci);
      txnHandler.setRunAs(ci.id, System.getProperty("user.name"));

- startCleaner(conf);
+ startCleaner();

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -295,9 +427,4 @@ public class TestCleaner extends Compact
      Assert.assertEquals(1, paths.size());
      Assert.assertEquals("base_25", paths.get(0).getName());
    }
-
- @Before
- public void setUpTxnDb() throws Exception {
- TxnDbUtil.setConfValues(new HiveConf());
- }
  }

Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Mon Oct 13 21:32:54 2014
@@ -47,7 +47,7 @@ public class TestInitiator extends Compa
    public void nothing() throws Exception {
      // Test that the whole things works when there's nothing in the queue. This is just a
      // survival test.
- startInitiator(new HiveConf());
+ startInitiator();
    }

    @Test
@@ -63,7 +63,7 @@ public class TestInitiator extends Compa
      txnHandler.findNextToCompact(Worker.hostname() + "-193892");
      txnHandler.findNextToCompact("nosuchhost-193892");

- startInitiator(new HiveConf());
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -89,10 +89,9 @@ public class TestInitiator extends Compa

      txnHandler.findNextToCompact("nosuchhost-193892");

- HiveConf conf = new HiveConf();
      conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -104,7 +103,6 @@ public class TestInitiator extends Compa
    public void majorCompactOnTableTooManyAborts() throws Exception {
      Table t = newTable("default", "mcottma", false);

- HiveConf conf = new HiveConf();
      HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);

      for (int i = 0; i < 11; i++) {
@@ -119,7 +117,7 @@ public class TestInitiator extends Compa
        txnHandler.abortTxn(new AbortTxnRequest(txnid));
      }

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -134,7 +132,6 @@ public class TestInitiator extends Compa
      Table t = newTable("default", "mcoptma", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
      HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);

      for (int i = 0; i < 11; i++) {
@@ -150,7 +147,7 @@ public class TestInitiator extends Compa
        txnHandler.abortTxn(new AbortTxnRequest(txnid));
      }

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -168,7 +165,6 @@ public class TestInitiator extends Compa
        Partition p = newPartition(t, "day-" + i);
      }

- HiveConf conf = new HiveConf();
      HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);

      for (int i = 0; i < 11; i++) {
@@ -184,7 +180,7 @@ public class TestInitiator extends Compa
        txnHandler.abortTxn(new AbortTxnRequest(txnid));
      }

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals(0, rsp.getCompactsSize());
@@ -197,8 +193,6 @@ public class TestInitiator extends Compa
      // accidently clean it too.
      Table t = newTable("default", "ceat", false);

- HiveConf conf = new HiveConf();
-
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("ceat");
@@ -216,7 +210,7 @@ public class TestInitiator extends Compa
      GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
      Assert.assertEquals(101, openTxns.getOpen_txnsSize());

- startInitiator(conf);
+ startInitiator();

      openTxns = txnHandler.getOpenTxns();
      Assert.assertEquals(1, openTxns.getOpen_txnsSize());
@@ -228,7 +222,6 @@ public class TestInitiator extends Compa
      parameters.put("NO_AUTO_COMPACTION", "true");
      Table t = newTable("default", "ncwncs", false, parameters);

- HiveConf conf = new HiveConf();
      HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);

      for (int i = 0; i < 11; i++) {
@@ -243,7 +236,7 @@ public class TestInitiator extends Compa
        txnHandler.abortTxn(new AbortTxnRequest(txnid));
      }

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals(0, rsp.getCompactsSize());
@@ -253,7 +246,6 @@ public class TestInitiator extends Compa
    public void noCompactWhenCompactAlreadyScheduled() throws Exception {
      Table t = newTable("default", "ncwcas", false);

- HiveConf conf = new HiveConf();
      HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);

      for (int i = 0; i < 11; i++) {
@@ -277,7 +269,7 @@ public class TestInitiator extends Compa
      Assert.assertEquals("initiated", compacts.get(0).getState());
      Assert.assertEquals("ncwcas", compacts.get(0).getTablename());

- startInitiator(conf);
+ startInitiator();

      rsp = txnHandler.showCompact(new ShowCompactRequest());
      compacts = rsp.getCompacts();
@@ -291,11 +283,9 @@ public class TestInitiator extends Compa
    public void compactTableHighDeltaPct() throws Exception {
      Table t = newTable("default", "cthdp", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);

      burnThroughTransactions(23);

@@ -309,7 +299,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -324,11 +314,9 @@ public class TestInitiator extends Compa
      Table t = newTable("default", "cphdp", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);

      burnThroughTransactions(23);

@@ -343,7 +331,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -358,11 +346,9 @@ public class TestInitiator extends Compa
    public void noCompactTableDeltaPctNotHighEnough() throws Exception {
      Table t = newTable("default", "nctdpnhe", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 50L, 50);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 50L, 50);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);

      burnThroughTransactions(53);

@@ -376,7 +362,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals(0, rsp.getCompactsSize());
@@ -386,20 +372,18 @@ public class TestInitiator extends Compa
    public void compactTableTooManyDeltas() throws Exception {
      Table t = newTable("default", "cttmd", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 200L, 200);
- addDeltaFile(conf, t, null, 201L, 201L, 1);
- addDeltaFile(conf, t, null, 202L, 202L, 1);
- addDeltaFile(conf, t, null, 203L, 203L, 1);
- addDeltaFile(conf, t, null, 204L, 204L, 1);
- addDeltaFile(conf, t, null, 205L, 205L, 1);
- addDeltaFile(conf, t, null, 206L, 206L, 1);
- addDeltaFile(conf, t, null, 207L, 207L, 1);
- addDeltaFile(conf, t, null, 208L, 208L, 1);
- addDeltaFile(conf, t, null, 209L, 209L, 1);
- addDeltaFile(conf, t, null, 210L, 210L, 1);
- addDeltaFile(conf, t, null, 211L, 211L, 1);
+ addBaseFile(t, null, 200L, 200);
+ addDeltaFile(t, null, 201L, 201L, 1);
+ addDeltaFile(t, null, 202L, 202L, 1);
+ addDeltaFile(t, null, 203L, 203L, 1);
+ addDeltaFile(t, null, 204L, 204L, 1);
+ addDeltaFile(t, null, 205L, 205L, 1);
+ addDeltaFile(t, null, 206L, 206L, 1);
+ addDeltaFile(t, null, 207L, 207L, 1);
+ addDeltaFile(t, null, 208L, 208L, 1);
+ addDeltaFile(t, null, 209L, 209L, 1);
+ addDeltaFile(t, null, 210L, 210L, 1);
+ addDeltaFile(t, null, 211L, 211L, 1);

      burnThroughTransactions(210);

@@ -413,7 +397,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -428,20 +412,18 @@ public class TestInitiator extends Compa
      Table t = newTable("default", "cptmd", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 200L, 200);
- addDeltaFile(conf, t, p, 201L, 201L, 1);
- addDeltaFile(conf, t, p, 202L, 202L, 1);
- addDeltaFile(conf, t, p, 203L, 203L, 1);
- addDeltaFile(conf, t, p, 204L, 204L, 1);
- addDeltaFile(conf, t, p, 205L, 205L, 1);
- addDeltaFile(conf, t, p, 206L, 206L, 1);
- addDeltaFile(conf, t, p, 207L, 207L, 1);
- addDeltaFile(conf, t, p, 208L, 208L, 1);
- addDeltaFile(conf, t, p, 209L, 209L, 1);
- addDeltaFile(conf, t, p, 210L, 210L, 1);
- addDeltaFile(conf, t, p, 211L, 211L, 1);
+ addBaseFile(t, p, 200L, 200);
+ addDeltaFile(t, p, 201L, 201L, 1);
+ addDeltaFile(t, p, 202L, 202L, 1);
+ addDeltaFile(t, p, 203L, 203L, 1);
+ addDeltaFile(t, p, 204L, 204L, 1);
+ addDeltaFile(t, p, 205L, 205L, 1);
+ addDeltaFile(t, p, 206L, 206L, 1);
+ addDeltaFile(t, p, 207L, 207L, 1);
+ addDeltaFile(t, p, 208L, 208L, 1);
+ addDeltaFile(t, p, 209L, 209L, 1);
+ addDeltaFile(t, p, 210L, 210L, 1);
+ addDeltaFile(t, p, 211L, 211L, 1);

      burnThroughTransactions(210);

@@ -456,7 +438,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -471,11 +453,9 @@ public class TestInitiator extends Compa
    public void noCompactTableNotEnoughDeltas() throws Exception {
      Table t = newTable("default", "nctned", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 200L, 200);
- addDeltaFile(conf, t, null, 201L, 205L, 5);
- addDeltaFile(conf, t, null, 206L, 211L, 6);
+ addBaseFile(t, null, 200L, 200);
+ addDeltaFile(t, null, 201L, 205L, 5);
+ addDeltaFile(t, null, 206L, 211L, 6);

      burnThroughTransactions(210);

@@ -489,7 +469,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals(0, rsp.getCompactsSize());
@@ -499,20 +479,18 @@ public class TestInitiator extends Compa
    public void chooseMajorOverMinorWhenBothValid() throws Exception {
      Table t = newTable("default", "cmomwbv", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 200L, 200);
- addDeltaFile(conf, t, null, 201L, 211L, 11);
- addDeltaFile(conf, t, null, 212L, 222L, 11);
- addDeltaFile(conf, t, null, 223L, 233L, 11);
- addDeltaFile(conf, t, null, 234L, 244L, 11);
- addDeltaFile(conf, t, null, 245L, 255L, 11);
- addDeltaFile(conf, t, null, 256L, 266L, 11);
- addDeltaFile(conf, t, null, 267L, 277L, 11);
- addDeltaFile(conf, t, null, 278L, 288L, 11);
- addDeltaFile(conf, t, null, 289L, 299L, 11);
- addDeltaFile(conf, t, null, 300L, 310L, 11);
- addDeltaFile(conf, t, null, 311L, 321L, 11);
+ addBaseFile(t, null, 200L, 200);
+ addDeltaFile(t, null, 201L, 211L, 11);
+ addDeltaFile(t, null, 212L, 222L, 11);
+ addDeltaFile(t, null, 223L, 233L, 11);
+ addDeltaFile(t, null, 234L, 244L, 11);
+ addDeltaFile(t, null, 245L, 255L, 11);
+ addDeltaFile(t, null, 256L, 266L, 11);
+ addDeltaFile(t, null, 267L, 277L, 11);
+ addDeltaFile(t, null, 278L, 288L, 11);
+ addDeltaFile(t, null, 289L, 299L, 11);
+ addDeltaFile(t, null, 300L, 310L, 11);
+ addDeltaFile(t, null, 311L, 321L, 11);

      burnThroughTransactions(320);

@@ -526,7 +504,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -541,19 +519,17 @@ public class TestInitiator extends Compa
      Table t = newTable("default", "ednb", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, p, 1L, 201L, 200);
- addDeltaFile(conf, t, p, 202L, 202L, 1);
- addDeltaFile(conf, t, p, 203L, 203L, 1);
- addDeltaFile(conf, t, p, 204L, 204L, 1);
- addDeltaFile(conf, t, p, 205L, 205L, 1);
- addDeltaFile(conf, t, p, 206L, 206L, 1);
- addDeltaFile(conf, t, p, 207L, 207L, 1);
- addDeltaFile(conf, t, p, 208L, 208L, 1);
- addDeltaFile(conf, t, p, 209L, 209L, 1);
- addDeltaFile(conf, t, p, 210L, 210L, 1);
- addDeltaFile(conf, t, p, 211L, 211L, 1);
+ addDeltaFile(t, p, 1L, 201L, 200);
+ addDeltaFile(t, p, 202L, 202L, 1);
+ addDeltaFile(t, p, 203L, 203L, 1);
+ addDeltaFile(t, p, 204L, 204L, 1);
+ addDeltaFile(t, p, 205L, 205L, 1);
+ addDeltaFile(t, p, 206L, 206L, 1);
+ addDeltaFile(t, p, 207L, 207L, 1);
+ addDeltaFile(t, p, 208L, 208L, 1);
+ addDeltaFile(t, p, 209L, 209L, 1);
+ addDeltaFile(t, p, 210L, 210L, 1);
+ addDeltaFile(t, p, 211L, 211L, 1);

      burnThroughTransactions(210);

@@ -568,7 +544,7 @@ public class TestInitiator extends Compa
      LockResponse res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -584,11 +560,9 @@ public class TestInitiator extends Compa
      Table t = newTable("default", "ttospgocr", true);
      Partition p = newPartition(t, "today");

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);

      burnThroughTransactions(23);

@@ -614,7 +588,7 @@ public class TestInitiator extends Compa
      res = txnHandler.lock(req);
      txnHandler.commitTxn(new CommitTxnRequest(txnid));

- startInitiator(conf);
+ startInitiator();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -626,9 +600,4 @@ public class TestInitiator extends Compa
    }

    // TODO test compactions with legacy file types
-
- @Before
- public void setUpTxnDb() throws Exception {
- TxnDbUtil.setConfValues(new HiveConf());
- }
  }

Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1631562&r1=1631561&r2=1631562&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Mon Oct 13 21:32:54 2014
@@ -48,7 +48,7 @@ public class TestWorker extends Compacto
    public void nothing() throws Exception {
      // Test that the whole things works when there's nothing in the queue. This is just a
      // survival test.
- startWorker(new HiveConf());
+ startWorker();
    }

    @Test
@@ -205,19 +205,17 @@ public class TestWorker extends Compacto

      Table t = newTable("default", "st", false, new HashMap<String, String>(), sortCols);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
- addDeltaFile(conf, t, null, 21L, 24L, 4);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);

      burnThroughTransactions(25);

      CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR);
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      // There should still be four directories in the location.
      FileSystem fs = FileSystem.get(conf);
@@ -232,12 +230,11 @@ public class TestWorker extends Compacto

      Table t = newTable("default", "sp", true, new HashMap<String, String>(), sortCols);
      Partition p = newPartition(t, "today", sortCols);
- HiveConf conf = new HiveConf();

- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
- addDeltaFile(conf, t, p, 21L, 24L, 4);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);

      burnThroughTransactions(25);

@@ -245,7 +242,7 @@ public class TestWorker extends Compacto
      rqst.setPartitionname("ds=today");
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      // There should still be four directories in the location.
      FileSystem fs = FileSystem.get(conf);
@@ -258,18 +255,16 @@ public class TestWorker extends Compacto
      LOG.debug("Starting minorTableWithBase");
      Table t = newTable("default", "mtwb", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);

      burnThroughTransactions(25);

      CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
      txnHandler.compact(rqst);

- startWorker(conf);
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -304,11 +299,10 @@ public class TestWorker extends Compacto
    public void minorPartitionWithBase() throws Exception {
      Table t = newTable("default", "mpwb", true);
      Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();

- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);

      burnThroughTransactions(25);

@@ -316,7 +310,7 @@ public class TestWorker extends Compacto
      rqst.setPartitionname("ds=today");
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -351,17 +345,15 @@ public class TestWorker extends Compacto
      LOG.debug("Starting minorTableWithBase");
      Table t = newTable("default", "mtnb", false);

- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, null, 1L, 2L, 2);
- addDeltaFile(conf, t, null, 3L, 4L, 2);
+ addDeltaFile(t, null, 1L, 2L, 2);
+ addDeltaFile(t, null, 3L, 4L, 2);

      burnThroughTransactions(5);

      CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR);
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -396,18 +388,16 @@ public class TestWorker extends Compacto
      LOG.debug("Starting majorTableWithBase");
      Table t = newTable("default", "matwb", false);

- HiveConf conf = new HiveConf();
-
- addBaseFile(conf, t, null, 20L, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);

      burnThroughTransactions(25);

      CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR);
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -442,11 +432,10 @@ public class TestWorker extends Compacto
      LOG.debug("Starting majorPartitionWithBase");
      Table t = newTable("default", "mapwb", true);
      Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();

- addBaseFile(conf, t, p, 20L, 20);
- addDeltaFile(conf, t, p, 21L, 22L, 2);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);

      burnThroughTransactions(25);

@@ -454,7 +443,7 @@ public class TestWorker extends Compacto
      rqst.setPartitionname("ds=today");
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -489,17 +478,15 @@ public class TestWorker extends Compacto
      LOG.debug("Starting majorTableNoBase");
      Table t = newTable("default", "matnb", false);

- HiveConf conf = new HiveConf();
-
- addDeltaFile(conf, t, null, 1L, 2L, 2);
- addDeltaFile(conf, t, null, 3L, 4L, 2);
+ addDeltaFile(t, null, 1L, 2L, 2);
+ addDeltaFile(t, null, 3L, 4L, 2);

      burnThroughTransactions(5);

      CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR);
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -534,18 +521,16 @@ public class TestWorker extends Compacto
      LOG.debug("Starting majorTableLegacy");
      Table t = newTable("default", "matl", false);

- HiveConf conf = new HiveConf();
-
- addLegacyFile(conf, t, null, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addLegacyFile(t, null, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);

      burnThroughTransactions(25);

      CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR);
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -580,18 +565,16 @@ public class TestWorker extends Compacto
      LOG.debug("Starting minorTableLegacy");
      Table t = newTable("default", "mtl", false);

- HiveConf conf = new HiveConf();
-
- addLegacyFile(conf, t, null, 20);
- addDeltaFile(conf, t, null, 21L, 22L, 2);
- addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addLegacyFile(t, null, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);

      burnThroughTransactions(25);

      CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR);
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -622,11 +605,10 @@ public class TestWorker extends Compacto
    public void majorPartitionWithBaseMissingBuckets() throws Exception {
      Table t = newTable("default", "mapwbmb", true);
      Partition p = newPartition(t, "today");
- HiveConf conf = new HiveConf();

- addBaseFile(conf, t, p, 20L, 20, 2, false);
- addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false);
- addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 20L, 20, 2, false);
+ addDeltaFile(t, p, 21L, 22L, 2, 2, false);
+ addDeltaFile(t, p, 23L, 24L, 2);

      burnThroughTransactions(25);

@@ -634,7 +616,7 @@ public class TestWorker extends Compacto
      rqst.setPartitionname("ds=today");
      txnHandler.compact(rqst);

- startWorker(new HiveConf());
+ startWorker();

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -669,9 +651,4 @@ public class TestWorker extends Compacto
      }
      Assert.assertTrue(sawNewBase);
    }
-
- @Before
- public void setUpTxnDb() throws Exception {
- TxnDbUtil.setConfValues(new HiveConf());
- }
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 13, '14 at 9:33p
activeOct 13, '14 at 9:33p
posts1
users1
websitehive.apache.org

1 user in discussion

Gates: 1 post

People

Translate

site design / logo © 2021 Grokbase