FAQ
Repository: hive
Updated Branches:
   refs/heads/master 8dec6a09b -> 165430b64


HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned(). it should not. (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/165430b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/165430b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/165430b6

Branch: refs/heads/master
Commit: 165430b6406832afabca906a8fdae231e9dd901c
Parents: 8dec6a0
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu Jan 21 17:55:49 2016 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu Jan 21 17:55:49 2016 -0800

----------------------------------------------------------------------
  .../org/apache/hadoop/hive/conf/HiveConf.java | 24 +-
  .../hive/ql/txn/compactor/TestCompactor.java | 57 -----
  .../hadoop/hive/metastore/HiveMetaStore.java | 5 +-
  .../hive/metastore/HouseKeeperService.java | 6 +
  .../hive/metastore/txn/CompactionInfo.java | 56 +++-
  .../metastore/txn/CompactionTxnHandler.java | 256 +++++++++++++++++--
  .../hadoop/hive/metastore/txn/TxnDbUtil.java | 21 +-
  .../hadoop/hive/metastore/txn/TxnHandler.java | 93 ++++---
  .../metastore/txn/TestCompactionTxnHandler.java | 3 +-
  .../ql/txn/AcidCompactionHistoryService.java | 83 ++++++
  .../hive/ql/txn/AcidHouseKeeperService.java | 65 ++---
  .../hadoop/hive/ql/txn/compactor/Cleaner.java | 10 +-
  .../hive/ql/txn/compactor/CompactorMR.java | 4 +
  .../txn/compactor/HouseKeeperServiceBase.java | 92 +++++++
  .../hadoop/hive/ql/txn/compactor/Initiator.java | 13 +-
  .../hadoop/hive/ql/txn/compactor/Worker.java | 8 +-
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 217 ++++++++++++++++
  .../hive/ql/txn/compactor/TestCleaner.java | 28 +-
  .../hive/ql/txn/compactor/TestWorker.java | 7 +-
  19 files changed, 853 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cf3280f..50a525c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -992,6 +992,7 @@ public class HiveConf extends Configuration {
      HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false),
      HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
      HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false),
+ HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false),

      HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
          "Merge small files at the end of a map-only job"),
@@ -1570,11 +1571,32 @@ public class HiveConf extends Configuration {
      HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
          "Number of aborted transactions involving a given table or partition that will trigger\n" +
          "a major compaction."),
-
+
+ COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2,
+ new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " +
+ "after which automatic compactions will not be scheduled any more. Note that this must be less " +
+ "than hive.compactor.history.retention.failed."),
+
      HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
          new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
      COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
        "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
+
+ COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3,
+ new RangeValidator(0, 100), "Determines how many successful compaction records will be " +
+ "retained in compaction history for a given table/partition."),
+
+ COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3,
+ new RangeValidator(0, 100), "Determines how many failed compaction records will be " +
+ "retained in compaction history for a given table/partition."),
+
+ COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2,
+ new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
+ "retained in compaction history for a given table/partition."),
+
+ COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m",
+ new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"),
+
      HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
        new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
      HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index da367ca..226a1fa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -790,63 +790,6 @@ public class TestCompactor {
      }
    }

- /**
- * HIVE-12352 has details
- * @throws Exception
- */
- @Test
- public void writeBetweenWorkerAndCleaner() throws Exception {
- String tblName = "HIVE12352";
- executeStatementOnDriver("drop table if exists " + tblName, driver);
- executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
- " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
-
- //create some data
- executeStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')", driver);
- executeStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3", driver);
-
- //run Worker to execute compaction
- CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
- txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
- Worker t = new Worker();
- t.setThreadId((int) t.getId());
- t.setHiveConf(conf);
- AtomicBoolean stop = new AtomicBoolean(true);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
- t.run();
-
- //delete something, but make sure txn is rolled back
- conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
- executeStatementOnDriver("delete from " + tblName + " where a = 1", driver);
- conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-
- List<String> expected = new ArrayList<>();
- expected.add("1\tfoo");
- expected.add("2\tbar");
- expected.add("3\tblah");
- Assert.assertEquals("", expected,
- execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
-
- //run Cleaner
- Cleaner c = new Cleaner();
- c.setThreadId((int)c.getId());
- c.setHiveConf(conf);
- c.init(stop, new AtomicBoolean());
- c.run();
-
- //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
- Initiator i = new Initiator();
- i.setThreadId((int)i.getId());
- i.setHiveConf(conf);
- i.init(stop, new AtomicBoolean());
- i.run();
-
- //check that aborted operation didn't become committed
- Assert.assertEquals("", expected,
- execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
- }
    @Test
    public void majorCompactAfterAbort() throws Exception {
      String dbName = "default";

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index ace644b..7830f17 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6406,7 +6406,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
      if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
        return;
      }
- Class c = Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService");
+ startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
+ startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
+ }
+ private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
      //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
      //should be called form it
      HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance();

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
index eb4ea93..539ace0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
@@ -36,4 +36,10 @@ public interface HouseKeeperService {
     * Returns short description of services this module provides.
     */
    public String getServiceDescription();
+
+ /**
+ * This is incremented each time the service is performed. Can be useful to
+ * check if serivce is still alive.
+ */
+ public int getIsAliveCounter();
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index d3cb7d5..73255d2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.hive.metastore.txn;

  import org.apache.hadoop.hive.metastore.api.CompactionType;

+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
  /**
   * Information on a possible or running compaction.
   */
@@ -27,13 +31,18 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
    public String dbname;
    public String tableName;
    public String partName;
+ char state;
    public CompactionType type;
+ String workerId;
+ long start;
    public String runAs;
    public boolean tooManyAborts = false;
    /**
- * {@code null} means it wasn't set (e.g. in case of upgrades)
+ * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
     */
- public Long highestTxnId;
+ public long highestTxnId;
+ byte[] metaInfo;
+ String hadoopJobId;

    private String fullPartitionName = null;
    private String fullTableName = null;
@@ -44,6 +53,11 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
      this.partName = partName;
      this.type = type;
    }
+ CompactionInfo(long id, String dbname, String tableName, String partName, char state) {
+ this(dbname, tableName, partName, null);
+ this.id = id;
+ this.state = state;
+ }
    CompactionInfo() {}

    public String getFullPartitionName() {
@@ -82,9 +96,47 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
        "dbname:" + dbname + "," +
        "tableName:" + tableName + "," +
        "partName:" + partName + "," +
+ "state:" + state + "," +
        "type:" + type + "," +
        "runAs:" + runAs + "," +
        "tooManyAborts:" + tooManyAborts + "," +
        "highestTxnId:" + highestTxnId;
    }
+
+ /**
+ * loads object from a row in Select * from COMPACTION_QUEUE
+ * @param rs ResultSet after call to rs.next()
+ * @throws SQLException
+ */
+ static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException {
+ CompactionInfo fullCi = new CompactionInfo();
+ fullCi.id = rs.getLong(1);
+ fullCi.dbname = rs.getString(2);
+ fullCi.tableName = rs.getString(3);
+ fullCi.partName = rs.getString(4);
+ fullCi.state = rs.getString(5).charAt(0);//cq_state
+ fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0));
+ fullCi.workerId = rs.getString(7);
+ fullCi.start = rs.getLong(8);
+ fullCi.runAs = rs.getString(9);
+ fullCi.highestTxnId = rs.getLong(10);
+ fullCi.metaInfo = rs.getBytes(11);
+ fullCi.hadoopJobId = rs.getString(12);
+ return fullCi;
+ }
+ static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException {
+ pStmt.setLong(1, ci.id);
+ pStmt.setString(2, ci.dbname);
+ pStmt.setString(3, ci.tableName);
+ pStmt.setString(4, ci.partName);
+ pStmt.setString(5, Character.toString(ci.state));
+ pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type)));
+ pStmt.setString(7, ci.workerId);
+ pStmt.setLong(8, ci.start);
+ pStmt.setLong(9, endTime);
+ pStmt.setString(10, ci.runAs);
+ pStmt.setLong(11, ci.highestTxnId);
+ pStmt.setBytes(12, ci.metaInfo);
+ pStmt.setString(13, ci.hadoopJobId);
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 9130322..18b288d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -174,16 +174,7 @@ public class CompactionTxnHandler extends TxnHandler {
            info.dbname = rs.getString(2);
            info.tableName = rs.getString(3);
            info.partName = rs.getString(4);
- switch (rs.getString(5).charAt(0)) {
- case MAJOR_TYPE:
- info.type = CompactionType.MAJOR;
- break;
- case MINOR_TYPE:
- info.type = CompactionType.MINOR;
- break;
- default:
- throw new MetaException("Unexpected compaction type " + rs.getString(5));
- }
+ info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
            // Now, update this record as being worked on by this worker.
            long now = getDbTime(dbConn);
            s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
@@ -291,8 +282,7 @@ public class CompactionTxnHandler extends TxnHandler {
              default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
            }
            info.runAs = rs.getString(6);
- long highestTxnId = rs.getLong(7);
- info.highestTxnId = rs.wasNull() ? null : highestTxnId;
+ info.highestTxnId = rs.getLong(7);
            rc.add(info);
          }
          LOG.debug("Going to rollback");
@@ -323,13 +313,19 @@ public class CompactionTxnHandler extends TxnHandler {
      try {
        Connection dbConn = null;
        Statement stmt = null;
+ PreparedStatement pStmt = null;
        ResultSet rs = null;
        try {
- //do we need serializable? Once we have the HWM as above, no. Before that
- //it's debatable, but problem described above applies either way
- //Thus can drop to RC
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
+ rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
+ if(rs.next()) {
+ info = CompactionInfo.loadFullFromCompactionQueue(rs);
+ }
+ else {
+ throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+ }
+ close(rs);
          String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
          LOG.debug("Going to execute update <" + s + ">");
          int updCount = stmt.executeUpdate(s);
@@ -338,6 +334,10 @@ public class CompactionTxnHandler extends TxnHandler {
            LOG.debug("Going to rollback");
            dbConn.rollback();
          }
+ pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+ info.state = SUCCEEDED_STATE;
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
+ updCount = pStmt.executeUpdate();

          // Remove entries from completed_txn_components as well, so we don't start looking there
          // again but only up to the highest txn ID include in this compaction job.
@@ -347,7 +347,7 @@ public class CompactionTxnHandler extends TxnHandler {
          if (info.partName != null) {
            s += " and ctc_partition = '" + info.partName + "'";
          }
- if(info.highestTxnId != null) {
+ if(info.highestTxnId != 0) {
            s += " and ctc_txnid <= " + info.highestTxnId;
          }
          LOG.debug("Going to execute update <" + s + ">");
@@ -358,7 +358,7 @@ public class CompactionTxnHandler extends TxnHandler {

          s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
            TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
- info.tableName + "'" + (info.highestTxnId == null ? "" : " and txn_id <= " + info.highestTxnId);
+ info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
          if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
          LOG.debug("Going to execute update <" + s + ">");
          rs = stmt.executeQuery(s);
@@ -406,6 +406,7 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
+ closeStmt(pStmt);
          close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
@@ -668,6 +669,225 @@ public class CompactionTxnHandler extends TxnHandler {
        setCompactionHighestTxnId(ci, highestTxnId);
      }
    }
+ private static class RetentionCounters {
+ int attemptedRetention = 0;
+ int failedRetention = 0;
+ int succeededRetention = 0;
+ RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
+ this.attemptedRetention = attemptedRetention;
+ this.failedRetention = failedRetention;
+ this.succeededRetention = succeededRetention;
+ }
+ }
+ private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
+ switch (ci.state) {
+ case ATTEMPTED_STATE:
+ if(--rc.attemptedRetention < 0) {
+ deleteSet.add(ci.id);
+ }
+ break;
+ case FAILED_STATE:
+ if(--rc.failedRetention < 0) {
+ deleteSet.add(ci.id);
+ }
+ break;
+ case SUCCEEDED_STATE:
+ if(--rc.succeededRetention < 0) {
+ deleteSet.add(ci.id);
+ }
+ break;
+ default:
+ //do nothing to hanlde future RU/D where we may want to add new state types
+ }
+ }
+
+ /**
+ * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
+ * history such that a configurable number of each type of state is present. Any other entries
+ * can be purged. This scheme has advantage of always retaining the last failure/success even if
+ * it's not recent.
+ * @throws MetaException
+ */
+ public void purgeCompactionHistory() throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ List<Long> deleteSet = new ArrayList<>();
+ RetentionCounters rc = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
+ thus this query groups by entity and withing group sorts most recent first*/
+ rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
+ "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
+ String lastCompactedEntity = null;
+ /*In each group, walk from most recent and count occurences of each state type. Once you
+ * have counted enough (for each state) to satisfy retention policy, delete all other
+ * instances of this status.*/
+ while(rs.next()) {
+ CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
+ if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
+ lastCompactedEntity = ci.getFullPartitionName();
+ rc = new RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ getFailedCompactionRetention(),
+ conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+ }
+ checkForDeletion(deleteSet, ci, rc);
+ }
+ close(rs);
+
+ String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id IN(";
+ StringBuilder queryStr = new StringBuilder(baseDeleteSql);
+ for(int i = 0; i < deleteSet.size(); i++) {
+ if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) {
+ queryStr.setCharAt(queryStr.length() - 1, ')');
+ stmt.executeUpdate(queryStr.toString());
+ dbConn.commit();
+ queryStr = new StringBuilder(baseDeleteSql);
+ }
+ queryStr.append(deleteSet.get(i)).append(',');
+ }
+ if(queryStr.length() > baseDeleteSql.length()) {
+ queryStr.setCharAt(queryStr.length() - 1, ')');
+ int updCnt = stmt.executeUpdate(queryStr.toString());
+ dbConn.commit();
+ }
+ dbConn.commit();
+ } catch (SQLException e) {
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "purgeCompactionHistory()");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException ex) {
+ purgeCompactionHistory();
+ }
+ }
+ /**
+ * this ensures that the number of failed compaction entries retained is > than number of failed
+ * compaction threshold which prevents new compactions from being scheduled.
+ */
+ public int getFailedCompactionRetention() {
+ int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+ int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
+ if(failedRetention < failedThreshold) {
+ LOG.warn("Invalid configuration " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
+ "=" + failedRetention + " < " + HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
+ failedRetention + ". Will use " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
+ "=" + failedRetention);
+ failedRetention = failedThreshold;
+ }
+ return failedRetention;
+ }
+ /**
+ * Returns {@code true} if there already exists sufficient number of consecutive failures for
+ * this table/partition so that no new automatic compactions will be scheduled.
+ * User initiated compactions don't do this check.
+ *
+ * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should.
+ * That would be a meta operations, i.e. first find all partitions for this table (which have
+ * txn info) and schedule each compaction separately. This avoids complications in this logic.
+ */
+ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
+ "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
+ "CC_TABLE = " + quoteString(ci.tableName) +
+ (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
+ " order by CC_ID desc");
+ int numFailed = 0;
+ int numTotal = 0;
+ int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+ while(rs.next() && ++numTotal <= failedThreshold) {
+ if(rs.getString(1).charAt(0) == FAILED_STATE) {
+ numFailed++;
+ }
+ else {
+ numFailed--;
+ }
+ }
+ return numFailed == failedThreshold;
+ }
+ catch (SQLException e) {
+ LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+ return false;//weren't able to check
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return checkFailedCompactions(ci);
+ }
+ }
+ /**
+ * If there is an entry in compaction_queue with ci.id, remove it
+ * Make entry in completed_compactions with status 'f'.
+ *
+ * but what abount markCleaned() which is called when table is had been deleted...
+ */
+ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
+ //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
+ if(rs.next()) {
+ ci = CompactionInfo.loadFullFromCompactionQueue(rs);
+ String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCnt = stmt.executeUpdate(s);
+ }
+ else {
+ throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+ }
+ close(rs, stmt, null);
+
+ pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+ ci.state = FAILED_STATE;
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ LOG.debug("Going to commit");
+ closeStmt(pStmt);
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ try {
+ checkRetryable(dbConn, e, "markFailed(" + ci + ")");
+ }
+ catch(MetaException ex) {
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+ }
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, null);
+ close(null, pStmt, dbConn);
+ }
+ } catch (RetryException e) {
+ markFailed(ci);
+ }
+ }
+
  }



http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 2015526..2a7545c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -118,10 +118,27 @@ public final class TxnDbUtil {
            " CQ_WORKER_ID varchar(128)," +
            " CQ_START bigint," +
            " CQ_RUN_AS varchar(128)," +
- " CQ_HIGHEST_TXN_ID bigint)");
+ " CQ_HIGHEST_TXN_ID bigint," +
+ " CQ_META_INFO varchar(2048) for bit data," +
+ " CQ_HADOOP_JOB_ID varchar(32))");

        stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
        stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+
+ stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
+ " CC_ID bigint PRIMARY KEY," +
+ " CC_DATABASE varchar(128) NOT NULL," +
+ " CC_TABLE varchar(128) NOT NULL," +
+ " CC_PARTITION varchar(767)," +
+ " CC_STATE char(1) NOT NULL," +
+ " CC_TYPE char(1) NOT NULL," +
+ " CC_WORKER_ID varchar(128)," +
+ " CC_START bigint," +
+ " CC_END bigint," +
+ " CC_RUN_AS varchar(128)," +
+ " CC_HIGHEST_TXN_ID bigint," +
+ " CC_META_INFO varchar(2048) for bit data," +
+ " CC_HADOOP_JOB_ID varchar(32))");

        conn.commit();
      } catch (SQLException e) {
@@ -161,7 +178,7 @@ public final class TxnDbUtil {
        dropTable(stmt, "NEXT_LOCK_ID");
        dropTable(stmt, "COMPACTION_QUEUE");
        dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
-
+ dropTable(stmt, "COMPLETED_COMPACTIONS");
        conn.commit();
      } finally {
        closeResources(conn, stmt, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 50d8892..c836f80 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -64,14 +64,20 @@ import java.util.concurrent.TimeUnit;
  @InterfaceAudience.Private
  @InterfaceStability.Evolving
  public class TxnHandler {
- // Compactor states
+ // Compactor states (Should really be enum)
    static final public String INITIATED_RESPONSE = "initiated";
    static final public String WORKING_RESPONSE = "working";
    static final public String CLEANING_RESPONSE = "ready for cleaning";
+ static final public String FAILED_RESPONSE = "failed";
+ static final public String SUCCEEDED_RESPONSE = "succeeded";
+ static final public String ATTEMPTED_RESPONSE = "attempted";

    static final protected char INITIATED_STATE = 'i';
    static final protected char WORKING_STATE = 'w';
    static final protected char READY_FOR_CLEANING = 'r';
+ static final char FAILED_STATE = 'f';
+ static final char SUCCEEDED_STATE = 's';
+ static final char ATTEMPTED_STATE = 'a';

    // Compactor types
    static final protected char MAJOR_TYPE = 'a';
@@ -759,7 +765,7 @@ public class TxnHandler {
      }
    }

- public void compact(CompactionRequest rqst) throws MetaException {
+ public long compact(CompactionRequest rqst) throws MetaException {
      // Put a compaction request in the queue.
      try {
        Connection dbConn = null;
@@ -826,6 +832,7 @@ public class TxnHandler {
          stmt.executeUpdate(s);
          LOG.debug("Going to commit");
          dbConn.commit();
+ return id;
        } catch (SQLException e) {
          LOG.debug("Going to rollback");
          rollbackDBConn(dbConn);
@@ -837,7 +844,7 @@ public class TxnHandler {
          closeDbConn(dbConn);
        }
      } catch (RetryException e) {
- compact(rqst);
+ return compact(rqst);
      }
    }

@@ -850,7 +857,13 @@ public class TxnHandler {
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
- "cq_start, cq_run_as from COMPACTION_QUEUE";
+ "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
+ "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
+ "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
+ //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013)
+ //to sort so that currently running jobs are at the end of the list (bottom of screen)
+ //and currently running ones are in sorted by start time
+ //w/o order by likely currently running compactions will be first (LHS of Union)
          LOG.debug("Going to execute query <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
          while (rs.next()) {
@@ -862,16 +875,26 @@ public class TxnHandler {
              case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
              case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
              case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
- default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
+ case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
+ case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
+ default:
+ //do nothing to handle RU/D if we add another status
            }
            switch (rs.getString(5).charAt(0)) {
              case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
              case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
- default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ default:
+ //do nothing to handle RU/D if we add another status
            }
            e.setWorkerid(rs.getString(6));
            e.setStart(rs.getLong(7));
- e.setRunAs(rs.getString(8));
+ long endTime = rs.getLong(8);
+ if(endTime != -1) {
+ e.setEndTime(endTime);
+ }
+ e.setRunAs(rs.getString(9));
+ e.setHadoopJobId(rs.getString(10));
+ long id = rs.getLong(11);//for debugging
            response.addToCompacts(e);
          }
          LOG.debug("Going to rollback");
@@ -2374,41 +2397,29 @@ public class TxnHandler {
          throw new MetaException(msg);
      }
    }
- /**
- * the caller is expected to retry if this fails
- *
- * @return
- * @throws SQLException
- * @throws MetaException
- */
- private long generateNewExtLockId() throws SQLException, MetaException {
- Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
- try {
- dbConn = getDbConn(getRequiredIsolationLevel());
- stmt = dbConn.createStatement();
-
- // Get the next lock id.
- String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- throw new MetaException("Transaction tables not properly " +
- "initialized, no record found in next_lock_id");
- }
- long extLockId = rs.getLong(1);
- s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
- LOG.debug("Going to commit.");
- dbConn.commit();
- return extLockId;
+ static String quoteString(String input) {
+ return "'" + input + "'";
+ }
+ static CompactionType dbCompactionType2ThriftType(char dbValue) {
+ switch (dbValue) {
+ case MAJOR_TYPE:
+ return CompactionType.MAJOR;
+ case MINOR_TYPE:
+ return CompactionType.MINOR;
+ default:
+ LOG.warn("Unexpected compaction type " + dbValue);
+ return null;
      }
- finally {
- close(rs, stmt, dbConn);
+ }
+ static Character thriftCompactionType2DbType(CompactionType ct) {
+ switch (ct) {
+ case MAJOR:
+ return MAJOR_TYPE;
+ case MINOR:
+ return MINOR_TYPE;
+ default:
+ LOG.warn("Unexpected compaction type " + ct);
+ return null;
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 06e0932..ff2c2c1 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -219,7 +219,8 @@ public class TestCompactionTxnHandler {
      assertEquals(0, txnHandler.findReadyToClean().size());

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- assertEquals(0, rsp.getCompactsSize());
+ assertEquals(1, rsp.getCompactsSize());
+ assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    }

    @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
new file mode 100644
index 0000000..a91ca5c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Purges obsolete items from compaction history data
+ */
+public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class);
+
+ @Override
+ protected long getStartDelayMs() {
+ return 0;
+ }
+ @Override
+ protected long getIntervalMs() {
+ return hiveConf.getTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ @Override
+ protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ return new ObsoleteEntryReaper(hiveConf, isAliveCounter);
+ }
+
+ @Override
+ public String getServiceDescription() {
+ return "Removes obsolete entries from Compaction History";
+ }
+
+ private static final class ObsoleteEntryReaper implements Runnable {
+ private final CompactionTxnHandler txnHandler;
+ private final AtomicInteger isAliveCounter;
+ private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ txnHandler = new CompactionTxnHandler(hiveConf);
+ this.isAliveCounter = isAliveCounter;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ txnHandler.purgeCompactionHistory();
+ int count = isAliveCounter.incrementAndGet();
+ LOG.info("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
+ }
+ catch(Throwable t) {
+ LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ }
+ }
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index dee7601..96e4d40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -17,17 +17,12 @@
   */
  package org.apache.hadoop.hive.ql.txn;

+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
  import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;

-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;

@@ -35,58 +30,40 @@ import java.util.concurrent.atomic.AtomicInteger;
   * Performs background tasks for Transaction management in Hive.
   * Runs inside Hive Metastore Service.
   */
-public class AcidHouseKeeperService implements HouseKeeperService {
+public class AcidHouseKeeperService extends HouseKeeperServiceBase {
    private static final Logger LOG = LoggerFactory.getLogger(AcidHouseKeeperService.class);
- private ScheduledExecutorService pool = null;
- private final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
+
    @Override
- public void start(HiveConf hiveConf) throws Exception {
- HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
- if(!mgr.supportsAcid()) {
- LOG.info(AcidHouseKeeperService.class.getName() + " not started since " +
- mgr.getClass().getName() + " does not support Acid.");
- return;//there are no transactions in this case
- }
- pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- private final AtomicInteger threadCounter = new AtomicInteger();
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "DeadTxnReaper-" + threadCounter.getAndIncrement());
- }
- });
- TimeUnit tu = TimeUnit.MILLISECONDS;
- pool.scheduleAtFixedRate(new TimedoutTxnReaper(hiveConf, this),
- hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu),
- hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu),
- TimeUnit.MILLISECONDS);
- LOG.info("Started " + this.getClass().getName() + " with delay/interval = " +
- hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu) + "/" +
- hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu) + " " + tu);
+ protected long getStartDelayMs() {
+ return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, TimeUnit.MILLISECONDS);
    }
    @Override
- public void stop() {
- if(pool != null && !pool.isShutdown()) {
- pool.shutdown();
- }
- pool = null;
+ protected long getIntervalMs() {
+ return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ @Override
+ protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ return new TimedoutTxnReaper(hiveConf, isAliveCounter);
    }
+
    @Override
    public String getServiceDescription() {
      return "Abort expired transactions";
    }
+
    private static final class TimedoutTxnReaper implements Runnable {
      private final TxnHandler txnHandler;
- private final AcidHouseKeeperService owner;
- private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) {
+ private final AtomicInteger isAliveCounter;
+ private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
        txnHandler = new TxnHandler(hiveConf);
- this.owner = owner;
+ this.isAliveCounter = isAliveCounter;
      }
      @Override
      public void run() {
        try {
          long startTime = System.currentTimeMillis();
          txnHandler.performTimeOuts();
- int count = owner.isAliveCounter.incrementAndGet();
+ int count = isAliveCounter.incrementAndGet();
          LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
        }
        catch(Throwable t) {
@@ -94,12 +71,4 @@ public class AcidHouseKeeperService implements HouseKeeperService {
        }
      }
    }
-
- /**
- * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1.
- * Starts with {@link java.lang.Integer#MIN_VALUE}
- */
- public int getIsAliveCounter() {
- return isAliveCounter.get();
- }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index b847202..fbf5481 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -189,6 +189,7 @@ public class Cleaner extends CompactorThread {
        if (t == null) {
          // The table was dropped before we got around to cleaning it.
          LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped");
+ txnHandler.markCleaned(ci);
          return;
        }
        Partition p = null;
@@ -198,6 +199,7 @@ public class Cleaner extends CompactorThread {
            // The partition was dropped before we got around to cleaning it.
            LOG.info("Unable to find partition " + ci.getFullPartitionName() +
                ", assuming it was dropped");
+ txnHandler.markCleaned(ci);
            return;
          }
        }
@@ -223,13 +225,11 @@ public class Cleaner extends CompactorThread {
            }
          });
        }
-
+ txnHandler.markCleaned(ci);
      } catch (Exception e) {
- LOG.error("Caught exception when cleaning, unable to complete cleaning " +
+ LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
            StringUtils.stringifyException(e));
- } finally {
- // We need to clean this out one way or another.
- txnHandler.markCleaned(ci);
+ txnHandler.markFailed(ci);
      }
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d0f46a..07ac0c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -137,6 +137,10 @@ public class CompactorMR {
     */
    void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
             ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+
+ if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
+ throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
+ }
      JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);

      // Figure out and encode what files we need to read. We do this here (rather than in

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
new file mode 100644
index 0000000..947f17c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class HouseKeeperServiceBase implements HouseKeeperService {
+ private static final Logger LOG = LoggerFactory.getLogger(HouseKeeperServiceBase.class);
+ private ScheduledExecutorService pool = null;
+ protected final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
+ protected HiveConf hiveConf;
+
+ @Override
+ public void start(HiveConf hiveConf) throws Exception {
+ this.hiveConf = hiveConf;
+ HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
+ if(!mgr.supportsAcid()) {
+ LOG.info(this.getClass().getName() + " not started since " +
+ mgr.getClass().getName() + " does not support Acid.");
+ return;//there are no transactions in this case
+ }
+ pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ private final AtomicInteger threadCounter = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, this.getClass().getName() + "-" + threadCounter.getAndIncrement());
+ }
+ });
+
+ TimeUnit tu = TimeUnit.MILLISECONDS;
+ pool.scheduleAtFixedRate(getScheduedAction(hiveConf, isAliveCounter), getStartDelayMs(),
+ getIntervalMs(), tu);
+ LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + getStartDelayMs() + "/" +
+ getIntervalMs() + " " + tu);
+ }
+
+ @Override
+ public void stop() {
+ if(pool != null && !pool.isShutdown()) {
+ pool.shutdown();
+ }
+ pool = null;
+ }
+
+ /**
+ * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1.
+ * Starts with {@link java.lang.Integer#MIN_VALUE}
+ */
+ @Override
+ public int getIsAliveCounter() {
+ return isAliveCounter.get();
+ }
+
+ /**
+ * Delay in millis before first run of the task of this service.
+ */
+ protected abstract long getStartDelayMs();
+ /**
+ * Determines how fequently the service is running its task.
+ */
+ protected abstract long getIntervalMs();
+
+ /**
+ * The actual task implementation. Must increment the counter on each iteration.
+ */
+ protected abstract Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a8fe57d..2ef06de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -78,7 +78,7 @@ public class Initiator extends CompactorThread {

          // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
          // don't doom the entire thread.
- try {
+ try {//todo: add method to only get current i.e. skip history - more efficient
            ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
            ValidTxnList txns =
                CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -119,6 +119,11 @@ public class Initiator extends CompactorThread {
                      ci.getFullPartitionName() + " so we will not initiate another compaction");
                  continue;
                }
+ if(txnHandler.checkFailedCompactions(ci)) {
+ //todo: make 'a' state entry in completed_compactions
+ LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
+ continue;
+ }

                // Figure out who we should run the file operations as
                Partition p = resolvePartition(ci);
@@ -134,9 +139,9 @@ public class Initiator extends CompactorThread {
                if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
              } catch (Throwable t) {
                LOG.error("Caught exception while trying to determine if we should compact " +
- ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " +
+ ci + ". Marking clean to avoid repeated failures, " +
                    "" + StringUtils.stringifyException(t));
- txnHandler.markCleaned(ci);
+ txnHandler.markFailed(ci);
              }
            }

@@ -300,7 +305,7 @@ public class Initiator extends CompactorThread {
      if (ci.partName != null) rqst.setPartitionname(ci.partName);
      rqst.setRunas(runAs);
      LOG.info("Requesting compaction: " + rqst);
- txnHandler.compact(rqst);
+ ci.id = txnHandler.compact(rqst);
    }

    // Because TABLE_NO_AUTO_COMPACT was originally assumed to be NO_AUTO_COMPACT and then was moved

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 045ce63..ce03c8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -17,7 +17,6 @@
   */
  package org.apache.hadoop.hive.ql.txn.compactor;

-import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.apache.hadoop.hive.common.ValidTxnList;
@@ -70,7 +69,8 @@ public class Worker extends CompactorThread {
        throw new RuntimeException(e);
      }
    }
-
+//todo: this doesn;t check if compaction is already running (even though Initiator does but we
+// don't go through Initiator for user initiated compactions)
    @Override
    public void run() {
      do {
@@ -174,9 +174,9 @@ public class Worker extends CompactorThread {
            }
            txnHandler.markCompacted(ci);
          } catch (Exception e) {
- LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() +
+ LOG.error("Caught exception while trying to compact " + ci +
                ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
- txnHandler.markCleaned(ci);
+ txnHandler.markFailed(ci);
          }
        } catch (Throwable t) {
          LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " +

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index b784585..7a1a3d2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -24,10 +24,21 @@ import org.apache.hadoop.fs.FileUtil;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.FileUtils;
  import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
  import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
  import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
  import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
  import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
  import org.apache.hadoop.hive.ql.txn.compactor.Worker;
  import org.junit.After;
  import org.junit.Assert;
@@ -42,6 +53,7 @@ import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Comparator;
  import java.util.List;
+import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;

  /**
@@ -454,6 +466,211 @@ public class TestTxnCommands2 {
      //insert overwrite not supported for ACID tables
    }
    /**
+ * HIVE-12353
+ * @throws Exception
+ */
+ @Test
+ public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+ String tblName = "hive12353";
+ runStatementOnDriver("drop table if exists " + tblName);
+ runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
+ for(int i = 0; i < 5; i++) {
+ //generate enough delta files so that Initiator can trigger auto compaction
+ runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')");
+ }
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+
+ int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ //create failed compactions
+ for(int i = 0; i < numFailedCompactions; i++) {
+ //each of these should fail
+ txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+ runWorker(hiveConf);
+ }
+ //this should not schedule a new compaction due to prior failures
+ Initiator init = new Initiator();
+ init.setThreadId((int)init.getId());
+ init.setHiveConf(hiveConf);
+ init.init(stop, new AtomicBoolean());
+ init.run();
+
+ CompactionsByState cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
+ Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
+
+ hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
+ AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
+ runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
+ cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
+ Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
+
+ txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
+ runWorker(hiveConf);//will fail
+ txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+ runWorker(hiveConf);//will fail
+ cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
+ Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
+ runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
+ //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
+ cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+ Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
+
+
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
+ txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+ //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
+ cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+ Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
+ Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+
+ runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
+ cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+ Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
+ Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+
+ runCleaner(hiveConf); // transition to Success state
+ runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
+ cbs = countCompacts(txnHandler);
+ Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+ Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
+ Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+ }
+ private static class CompactionsByState {
+ private int attempted;
+ private int failed;
+ private int initiated;
+ private int readyToClean;
+ private int succeeded;
+ private int working;
+ private int total;
+ }
+ private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
+ ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+ CompactionsByState compactionsByState = new CompactionsByState();
+ compactionsByState.total = resp.getCompactsSize();
+ for(ShowCompactResponseElement compact : resp.getCompacts()) {
+ if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
+ compactionsByState.failed++;
+ }
+ else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
+ compactionsByState.readyToClean++;
+ }
+ else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
+ compactionsByState.initiated++;
+ }
+ else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
+ compactionsByState.succeeded++;
+ }
+ else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
+ compactionsByState.working++;
+ }
+ else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
+ compactionsByState.attempted++;
+ }
+ }
+ return compactionsByState;
+ }
+ private static void runWorker(HiveConf hiveConf) throws MetaException {
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+ private static void runCleaner(HiveConf hiveConf) throws MetaException {
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Cleaner t = new Cleaner();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+
+ private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
+ int lastCount = houseKeeperService.getIsAliveCounter();
+ houseKeeperService.start(conf);
+ while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ try {
+ Thread.sleep(100);//make sure it has run at least once
+ }
+ catch(InterruptedException ex) {
+ //...
+ }
+ }
+ houseKeeperService.stop();
+ }
+
+ /**
+ * HIVE-12352 has details
+ * @throws Exception
+ */
+ @Test
+ public void writeBetweenWorkerAndCleaner() throws Exception {
+ String tblName = "hive12352";
+ runStatementOnDriver("drop table if exists " + tblName);
+ runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
+
+ //create some data
+ runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
+ runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
+
+ //run Worker to execute compaction
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+ txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ //delete something, but make sure txn is rolled back
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+ runStatementOnDriver("delete from " + tblName + " where a = 1");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1\tfoo");
+ expected.add("2\tbar");
+ expected.add("3\tblah");
+ Assert.assertEquals("", expected,
+ runStatementOnDriver("select a,b from " + tblName + " order by a"));
+
+ //run Cleaner
+ Cleaner c = new Cleaner();
+ c.setThreadId((int)c.getId());
+ c.setHiveConf(hiveConf);
+ c.init(stop, new AtomicBoolean());
+ c.run();
+
+ //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
+ Initiator i = new Initiator();
+ i.setThreadId((int)i.getId());
+ i.setHiveConf(hiveConf);
+ i.init(stop, new AtomicBoolean());
+ i.run();
+
+ //check that aborted operation didn't become committed
+ Assert.assertEquals("", expected,
+ runStatementOnDriver("select a,b from " + tblName + " order by a"));
+ }
+ /**
     * takes raw data and turns it into a string as if from Driver.getResults()
     * sorts rows in dictionary order
     */

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index bca5002..899f5a1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -17,6 +17,7 @@
   */
  package org.apache.hadoop.hive.ql.txn.compactor;

+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
  import org.junit.Assert;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@ -71,7 +72,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

      // Check that the files are removed
      List<Path> paths = getDirectories(conf, t, null);
@@ -102,7 +104,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

      // Check that the files are removed
      List<Path> paths = getDirectories(conf, t, p);
@@ -131,7 +134,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

      // Check that the files are removed
      List<Path> paths = getDirectories(conf, t, null);
@@ -169,7 +173,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

      // Check that the files are removed
      List<Path> paths = getDirectories(conf, t, p);
@@ -323,7 +328,8 @@ public class TestCleaner extends CompactorTest {
      // Check there are no compactions requests left.
      rsp = txnHandler.showCompact(new ShowCompactRequest());
      compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    }

    @Test
@@ -396,7 +402,8 @@ public class TestCleaner extends CompactorTest {
      // Check there are no compactions requests left.
      rsp = txnHandler.showCompact(new ShowCompactRequest());
      compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    }

    @Test
@@ -421,7 +428,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

      // Check that the files are removed
      List<Path> paths = getDirectories(conf, t, p);
@@ -451,7 +459,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    }

    @Test
@@ -478,7 +487,8 @@ public class TestCleaner extends CompactorTest {

      // Check there are no compactions requests left.
      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(0, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    }
    @Override
    boolean useHive130DeltaDirName() {

http://git-wip-us.apache.org/repos/asf/hive/blob/165430b6/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index fe1d0d3..d0db406 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -17,6 +17,7 @@
   */
  package org.apache.hadoop.hive.ql.txn.compactor;

+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.apache.hadoop.fs.*;
@@ -933,7 +934,8 @@ public class TestWorker extends CompactorTest {

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
    }

    @Test
@@ -957,6 +959,7 @@ public class TestWorker extends CompactorTest {

      ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
      List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    }
  }

Search Discussions

  • Ekoifman at Jan 22, 2016 at 2:39 am
    Repository: hive
    Updated Branches:
       refs/heads/branch-2.0 80f80b0e3 -> e8388ae67


    HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned(). it should not. (Eugene Koifman, reviewed by Alan Gates)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e8388ae6
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e8388ae6
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e8388ae6

    Branch: refs/heads/branch-2.0
    Commit: e8388ae67dbe097284c8eb7d7e0be00316b90d5d
    Parents: 80f80b0
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Thu Jan 21 18:39:04 2016 -0800
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Thu Jan 21 18:39:04 2016 -0800

    ----------------------------------------------------------------------
      .../org/apache/hadoop/hive/conf/HiveConf.java | 24 +-
      .../hive/ql/txn/compactor/TestCompactor.java | 57 -----
      .../hadoop/hive/metastore/HiveMetaStore.java | 5 +-
      .../hive/metastore/HouseKeeperService.java | 6 +
      .../hive/metastore/txn/CompactionInfo.java | 56 +++-
      .../metastore/txn/CompactionTxnHandler.java | 256 +++++++++++++++++--
      .../hadoop/hive/metastore/txn/TxnDbUtil.java | 21 +-
      .../hadoop/hive/metastore/txn/TxnHandler.java | 93 ++++---
      .../metastore/txn/TestCompactionTxnHandler.java | 3 +-
      .../hive/ql/txn/AcidHouseKeeperService.java | 65 ++---
      .../hadoop/hive/ql/txn/compactor/Cleaner.java | 10 +-
      .../hive/ql/txn/compactor/CompactorMR.java | 4 +
      .../hadoop/hive/ql/txn/compactor/Initiator.java | 13 +-
      .../hadoop/hive/ql/txn/compactor/Worker.java | 8 +-
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 217 ++++++++++++++++
      .../hive/ql/txn/compactor/TestCleaner.java | 28 +-
      .../hive/ql/txn/compactor/TestWorker.java | 7 +-
      17 files changed, 678 insertions(+), 195 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    ----------------------------------------------------------------------
    diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    index 33b3f3f..26ba4f0 100644
    --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    @@ -987,6 +987,7 @@ public class HiveConf extends Configuration {
          HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false),
          HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
          HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false),
    + HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false),

          HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
              "Merge small files at the end of a map-only job"),
    @@ -1562,11 +1563,32 @@ public class HiveConf extends Configuration {
          HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
              "Number of aborted transactions involving a given table or partition that will trigger\n" +
              "a major compaction."),
    -
    +
    + COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2,
    + new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " +
    + "after which automatic compactions will not be scheduled any more. Note that this must be less " +
    + "than hive.compactor.history.retention.failed."),
    +
          HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
              new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
          COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
            "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
    +
    + COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3,
    + new RangeValidator(0, 100), "Determines how many successful compaction records will be " +
    + "retained in compaction history for a given table/partition."),
    +
    + COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3,
    + new RangeValidator(0, 100), "Determines how many failed compaction records will be " +
    + "retained in compaction history for a given table/partition."),
    +
    + COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2,
    + new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
    + "retained in compaction history for a given table/partition."),
    +
    + COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m",
    + new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"),
    +
          HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
            new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
          HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    index da367ca..226a1fa 100644
    --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    @@ -790,63 +790,6 @@ public class TestCompactor {
          }
        }

    - /**
    - * HIVE-12352 has details
    - * @throws Exception
    - */
    - @Test
    - public void writeBetweenWorkerAndCleaner() throws Exception {
    - String tblName = "HIVE12352";
    - executeStatementOnDriver("drop table if exists " + tblName, driver);
    - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
    - " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
    - " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
    -
    - //create some data
    - executeStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')", driver);
    - executeStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3", driver);
    -
    - //run Worker to execute compaction
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    - txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    - Worker t = new Worker();
    - t.setThreadId((int) t.getId());
    - t.setHiveConf(conf);
    - AtomicBoolean stop = new AtomicBoolean(true);
    - AtomicBoolean looped = new AtomicBoolean();
    - t.init(stop, looped);
    - t.run();
    -
    - //delete something, but make sure txn is rolled back
    - conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
    - executeStatementOnDriver("delete from " + tblName + " where a = 1", driver);
    - conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
    -
    - List<String> expected = new ArrayList<>();
    - expected.add("1\tfoo");
    - expected.add("2\tbar");
    - expected.add("3\tblah");
    - Assert.assertEquals("", expected,
    - execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
    -
    - //run Cleaner
    - Cleaner c = new Cleaner();
    - c.setThreadId((int)c.getId());
    - c.setHiveConf(conf);
    - c.init(stop, new AtomicBoolean());
    - c.run();
    -
    - //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
    - Initiator i = new Initiator();
    - i.setThreadId((int)i.getId());
    - i.setHiveConf(conf);
    - i.init(stop, new AtomicBoolean());
    - i.run();
    -
    - //check that aborted operation didn't become committed
    - Assert.assertEquals("", expected,
    - execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
    - }
        @Test
        public void majorCompactAfterAbort() throws Exception {
          String dbName = "default";

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    index 00602e1..8f99228 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    @@ -6220,7 +6220,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
          if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
            return;
          }
    - Class c = Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService");
    + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
    + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
    + }
    + private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
          //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
          //should be called form it
          HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance();

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    index eb4ea93..539ace0 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    @@ -36,4 +36,10 @@ public interface HouseKeeperService {
         * Returns short description of services this module provides.
         */
        public String getServiceDescription();
    +
    + /**
    + * This is incremented each time the service is performed. Can be useful to
    + * check if serivce is still alive.
    + */
    + public int getIsAliveCounter();
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    index d3cb7d5..73255d2 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    @@ -19,6 +19,10 @@ package org.apache.hadoop.hive.metastore.txn;

      import org.apache.hadoop.hive.metastore.api.CompactionType;

    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +
      /**
       * Information on a possible or running compaction.
       */
    @@ -27,13 +31,18 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
        public String dbname;
        public String tableName;
        public String partName;
    + char state;
        public CompactionType type;
    + String workerId;
    + long start;
        public String runAs;
        public boolean tooManyAborts = false;
        /**
    - * {@code null} means it wasn't set (e.g. in case of upgrades)
    + * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
         */
    - public Long highestTxnId;
    + public long highestTxnId;
    + byte[] metaInfo;
    + String hadoopJobId;

        private String fullPartitionName = null;
        private String fullTableName = null;
    @@ -44,6 +53,11 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
          this.partName = partName;
          this.type = type;
        }
    + CompactionInfo(long id, String dbname, String tableName, String partName, char state) {
    + this(dbname, tableName, partName, null);
    + this.id = id;
    + this.state = state;
    + }
        CompactionInfo() {}

        public String getFullPartitionName() {
    @@ -82,9 +96,47 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
            "dbname:" + dbname + "," +
            "tableName:" + tableName + "," +
            "partName:" + partName + "," +
    + "state:" + state + "," +
            "type:" + type + "," +
            "runAs:" + runAs + "," +
            "tooManyAborts:" + tooManyAborts + "," +
            "highestTxnId:" + highestTxnId;
        }
    +
    + /**
    + * loads object from a row in Select * from COMPACTION_QUEUE
    + * @param rs ResultSet after call to rs.next()
    + * @throws SQLException
    + */
    + static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException {
    + CompactionInfo fullCi = new CompactionInfo();
    + fullCi.id = rs.getLong(1);
    + fullCi.dbname = rs.getString(2);
    + fullCi.tableName = rs.getString(3);
    + fullCi.partName = rs.getString(4);
    + fullCi.state = rs.getString(5).charAt(0);//cq_state
    + fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0));
    + fullCi.workerId = rs.getString(7);
    + fullCi.start = rs.getLong(8);
    + fullCi.runAs = rs.getString(9);
    + fullCi.highestTxnId = rs.getLong(10);
    + fullCi.metaInfo = rs.getBytes(11);
    + fullCi.hadoopJobId = rs.getString(12);
    + return fullCi;
    + }
    + static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException {
    + pStmt.setLong(1, ci.id);
    + pStmt.setString(2, ci.dbname);
    + pStmt.setString(3, ci.tableName);
    + pStmt.setString(4, ci.partName);
    + pStmt.setString(5, Character.toString(ci.state));
    + pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type)));
    + pStmt.setString(7, ci.workerId);
    + pStmt.setLong(8, ci.start);
    + pStmt.setLong(9, endTime);
    + pStmt.setString(10, ci.runAs);
    + pStmt.setLong(11, ci.highestTxnId);
    + pStmt.setBytes(12, ci.metaInfo);
    + pStmt.setString(13, ci.hadoopJobId);
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    index 9130322..18b288d 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    @@ -174,16 +174,7 @@ public class CompactionTxnHandler extends TxnHandler {
                info.dbname = rs.getString(2);
                info.tableName = rs.getString(3);
                info.partName = rs.getString(4);
    - switch (rs.getString(5).charAt(0)) {
    - case MAJOR_TYPE:
    - info.type = CompactionType.MAJOR;
    - break;
    - case MINOR_TYPE:
    - info.type = CompactionType.MINOR;
    - break;
    - default:
    - throw new MetaException("Unexpected compaction type " + rs.getString(5));
    - }
    + info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
                // Now, update this record as being worked on by this worker.
                long now = getDbTime(dbConn);
                s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
    @@ -291,8 +282,7 @@ public class CompactionTxnHandler extends TxnHandler {
                  default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
                }
                info.runAs = rs.getString(6);
    - long highestTxnId = rs.getLong(7);
    - info.highestTxnId = rs.wasNull() ? null : highestTxnId;
    + info.highestTxnId = rs.getLong(7);
                rc.add(info);
              }
              LOG.debug("Going to rollback");
    @@ -323,13 +313,19 @@ public class CompactionTxnHandler extends TxnHandler {
          try {
            Connection dbConn = null;
            Statement stmt = null;
    + PreparedStatement pStmt = null;
            ResultSet rs = null;
            try {
    - //do we need serializable? Once we have the HWM as above, no. Before that
    - //it's debatable, but problem described above applies either way
    - //Thus can drop to RC
    - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
              stmt = dbConn.createStatement();
    + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
    + if(rs.next()) {
    + info = CompactionInfo.loadFullFromCompactionQueue(rs);
    + }
    + else {
    + throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
    + }
    + close(rs);
              String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
              LOG.debug("Going to execute update <" + s + ">");
              int updCount = stmt.executeUpdate(s);
    @@ -338,6 +334,10 @@ public class CompactionTxnHandler extends TxnHandler {
                LOG.debug("Going to rollback");
                dbConn.rollback();
              }
    + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
    + info.state = SUCCEEDED_STATE;
    + CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
    + updCount = pStmt.executeUpdate();

              // Remove entries from completed_txn_components as well, so we don't start looking there
              // again but only up to the highest txn ID include in this compaction job.
    @@ -347,7 +347,7 @@ public class CompactionTxnHandler extends TxnHandler {
              if (info.partName != null) {
                s += " and ctc_partition = '" + info.partName + "'";
              }
    - if(info.highestTxnId != null) {
    + if(info.highestTxnId != 0) {
                s += " and ctc_txnid <= " + info.highestTxnId;
              }
              LOG.debug("Going to execute update <" + s + ">");
    @@ -358,7 +358,7 @@ public class CompactionTxnHandler extends TxnHandler {

              s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
                TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
    - info.tableName + "'" + (info.highestTxnId == null ? "" : " and txn_id <= " + info.highestTxnId);
    + info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
              if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
              LOG.debug("Going to execute update <" + s + ">");
              rs = stmt.executeQuery(s);
    @@ -406,6 +406,7 @@ public class CompactionTxnHandler extends TxnHandler {
              throw new MetaException("Unable to connect to transaction database " +
                StringUtils.stringifyException(e));
            } finally {
    + closeStmt(pStmt);
              close(rs, stmt, dbConn);
            }
          } catch (RetryException e) {
    @@ -668,6 +669,225 @@ public class CompactionTxnHandler extends TxnHandler {
            setCompactionHighestTxnId(ci, highestTxnId);
          }
        }
    + private static class RetentionCounters {
    + int attemptedRetention = 0;
    + int failedRetention = 0;
    + int succeededRetention = 0;
    + RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
    + this.attemptedRetention = attemptedRetention;
    + this.failedRetention = failedRetention;
    + this.succeededRetention = succeededRetention;
    + }
    + }
    + private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
    + switch (ci.state) {
    + case ATTEMPTED_STATE:
    + if(--rc.attemptedRetention < 0) {
    + deleteSet.add(ci.id);
    + }
    + break;
    + case FAILED_STATE:
    + if(--rc.failedRetention < 0) {
    + deleteSet.add(ci.id);
    + }
    + break;
    + case SUCCEEDED_STATE:
    + if(--rc.succeededRetention < 0) {
    + deleteSet.add(ci.id);
    + }
    + break;
    + default:
    + //do nothing to hanlde future RU/D where we may want to add new state types
    + }
    + }
    +
    + /**
    + * For any given compactable entity (partition, table if not partitioned) the history of compactions
    + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
    + * history such that a configurable number of each type of state is present. Any other entries
    + * can be purged. This scheme has advantage of always retaining the last failure/success even if
    + * it's not recent.
    + * @throws MetaException
    + */
    + public void purgeCompactionHistory() throws MetaException {
    + Connection dbConn = null;
    + Statement stmt = null;
    + ResultSet rs = null;
    + List<Long> deleteSet = new ArrayList<>();
    + RetentionCounters rc = null;
    + try {
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
    + thus this query groups by entity and withing group sorts most recent first*/
    + rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
    + "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
    + String lastCompactedEntity = null;
    + /*In each group, walk from most recent and count occurences of each state type. Once you
    + * have counted enough (for each state) to satisfy retention policy, delete all other
    + * instances of this status.*/
    + while(rs.next()) {
    + CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
    + if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
    + lastCompactedEntity = ci.getFullPartitionName();
    + rc = new RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
    + getFailedCompactionRetention(),
    + conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
    + }
    + checkForDeletion(deleteSet, ci, rc);
    + }
    + close(rs);
    +
    + String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id IN(";
    + StringBuilder queryStr = new StringBuilder(baseDeleteSql);
    + for(int i = 0; i < deleteSet.size(); i++) {
    + if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) {
    + queryStr.setCharAt(queryStr.length() - 1, ')');
    + stmt.executeUpdate(queryStr.toString());
    + dbConn.commit();
    + queryStr = new StringBuilder(baseDeleteSql);
    + }
    + queryStr.append(deleteSet.get(i)).append(',');
    + }
    + if(queryStr.length() > baseDeleteSql.length()) {
    + queryStr.setCharAt(queryStr.length() - 1, ')');
    + int updCnt = stmt.executeUpdate(queryStr.toString());
    + dbConn.commit();
    + }
    + dbConn.commit();
    + } catch (SQLException e) {
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "purgeCompactionHistory()");
    + throw new MetaException("Unable to connect to transaction database " +
    + StringUtils.stringifyException(e));
    + } finally {
    + close(rs, stmt, dbConn);
    + }
    + } catch (RetryException ex) {
    + purgeCompactionHistory();
    + }
    + }
    + /**
    + * this ensures that the number of failed compaction entries retained is > than number of failed
    + * compaction threshold which prevents new compactions from being scheduled.
    + */
    + public int getFailedCompactionRetention() {
    + int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    + int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
    + if(failedRetention < failedThreshold) {
    + LOG.warn("Invalid configuration " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
    + "=" + failedRetention + " < " + HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
    + failedRetention + ". Will use " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
    + "=" + failedRetention);
    + failedRetention = failedThreshold;
    + }
    + return failedRetention;
    + }
    + /**
    + * Returns {@code true} if there already exists sufficient number of consecutive failures for
    + * this table/partition so that no new automatic compactions will be scheduled.
    + * User initiated compactions don't do this check.
    + *
    + * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should.
    + * That would be a meta operations, i.e. first find all partitions for this table (which have
    + * txn info) and schedule each compaction separately. This avoids complications in this logic.
    + */
    + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
    + Connection dbConn = null;
    + Statement stmt = null;
    + ResultSet rs = null;
    + try {
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
    + "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
    + "CC_TABLE = " + quoteString(ci.tableName) +
    + (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
    + " order by CC_ID desc");
    + int numFailed = 0;
    + int numTotal = 0;
    + int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    + while(rs.next() && ++numTotal <= failedThreshold) {
    + if(rs.getString(1).charAt(0) == FAILED_STATE) {
    + numFailed++;
    + }
    + else {
    + numFailed--;
    + }
    + }
    + return numFailed == failedThreshold;
    + }
    + catch (SQLException e) {
    + LOG.error("Unable to delete from compaction queue " + e.getMessage());
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
    + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
    + return false;//weren't able to check
    + } finally {
    + close(rs, stmt, dbConn);
    + }
    + } catch (RetryException e) {
    + return checkFailedCompactions(ci);
    + }
    + }
    + /**
    + * If there is an entry in compaction_queue with ci.id, remove it
    + * Make entry in completed_compactions with status 'f'.
    + *
    + * but what abount markCleaned() which is called when table is had been deleted...
    + */
    + public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
    + //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
    + try {
    + Connection dbConn = null;
    + Statement stmt = null;
    + PreparedStatement pStmt = null;
    + ResultSet rs = null;
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
    + if(rs.next()) {
    + ci = CompactionInfo.loadFullFromCompactionQueue(rs);
    + String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
    + LOG.debug("Going to execute update <" + s + ">");
    + int updCnt = stmt.executeUpdate(s);
    + }
    + else {
    + throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
    + }
    + close(rs, stmt, null);
    +
    + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
    + ci.state = FAILED_STATE;
    + CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
    + int updCount = pStmt.executeUpdate();
    + LOG.debug("Going to commit");
    + closeStmt(pStmt);
    + dbConn.commit();
    + } catch (SQLException e) {
    + LOG.error("Unable to delete from compaction queue " + e.getMessage());
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + try {
    + checkRetryable(dbConn, e, "markFailed(" + ci + ")");
    + }
    + catch(MetaException ex) {
    + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
    + }
    + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
    + } finally {
    + close(rs, stmt, null);
    + close(null, pStmt, dbConn);
    + }
    + } catch (RetryException e) {
    + markFailed(ci);
    + }
    + }
    +
      }



    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    index 2015526..2a7545c 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    @@ -118,10 +118,27 @@ public final class TxnDbUtil {
                " CQ_WORKER_ID varchar(128)," +
                " CQ_START bigint," +
                " CQ_RUN_AS varchar(128)," +
    - " CQ_HIGHEST_TXN_ID bigint)");
    + " CQ_HIGHEST_TXN_ID bigint," +
    + " CQ_META_INFO varchar(2048) for bit data," +
    + " CQ_HADOOP_JOB_ID varchar(32))");

            stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
            stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
    +
    + stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
    + " CC_ID bigint PRIMARY KEY," +
    + " CC_DATABASE varchar(128) NOT NULL," +
    + " CC_TABLE varchar(128) NOT NULL," +
    + " CC_PARTITION varchar(767)," +
    + " CC_STATE char(1) NOT NULL," +
    + " CC_TYPE char(1) NOT NULL," +
    + " CC_WORKER_ID varchar(128)," +
    + " CC_START bigint," +
    + " CC_END bigint," +
    + " CC_RUN_AS varchar(128)," +
    + " CC_HIGHEST_TXN_ID bigint," +
    + " CC_META_INFO varchar(2048) for bit data," +
    + " CC_HADOOP_JOB_ID varchar(32))");

            conn.commit();
          } catch (SQLException e) {
    @@ -161,7 +178,7 @@ public final class TxnDbUtil {
            dropTable(stmt, "NEXT_LOCK_ID");
            dropTable(stmt, "COMPACTION_QUEUE");
            dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
    -
    + dropTable(stmt, "COMPLETED_COMPACTIONS");
            conn.commit();
          } finally {
            closeResources(conn, stmt, null);

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    index 12ee52d..a65551a 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    @@ -64,14 +64,20 @@ import java.util.concurrent.TimeUnit;
      @InterfaceAudience.Private
      @InterfaceStability.Evolving
      public class TxnHandler {
    - // Compactor states
    + // Compactor states (Should really be enum)
        static final public String INITIATED_RESPONSE = "initiated";
        static final public String WORKING_RESPONSE = "working";
        static final public String CLEANING_RESPONSE = "ready for cleaning";
    + static final public String FAILED_RESPONSE = "failed";
    + static final public String SUCCEEDED_RESPONSE = "succeeded";
    + static final public String ATTEMPTED_RESPONSE = "attempted";

        static final protected char INITIATED_STATE = 'i';
        static final protected char WORKING_STATE = 'w';
        static final protected char READY_FOR_CLEANING = 'r';
    + static final char FAILED_STATE = 'f';
    + static final char SUCCEEDED_STATE = 's';
    + static final char ATTEMPTED_STATE = 'a';

        // Compactor types
        static final protected char MAJOR_TYPE = 'a';
    @@ -759,7 +765,7 @@ public class TxnHandler {
          }
        }

    - public void compact(CompactionRequest rqst) throws MetaException {
    + public long compact(CompactionRequest rqst) throws MetaException {
          // Put a compaction request in the queue.
          try {
            Connection dbConn = null;
    @@ -826,6 +832,7 @@ public class TxnHandler {
              stmt.executeUpdate(s);
              LOG.debug("Going to commit");
              dbConn.commit();
    + return id;
            } catch (SQLException e) {
              LOG.debug("Going to rollback");
              rollbackDBConn(dbConn);
    @@ -837,7 +844,7 @@ public class TxnHandler {
              closeDbConn(dbConn);
            }
          } catch (RetryException e) {
    - compact(rqst);
    + return compact(rqst);
          }
        }

    @@ -850,7 +857,13 @@ public class TxnHandler {
              dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
              stmt = dbConn.createStatement();
              String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
    - "cq_start, cq_run_as from COMPACTION_QUEUE";
    + "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
    + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
    + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
    + //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013)
    + //to sort so that currently running jobs are at the end of the list (bottom of screen)
    + //and currently running ones are in sorted by start time
    + //w/o order by likely currently running compactions will be first (LHS of Union)
              LOG.debug("Going to execute query <" + s + ">");
              ResultSet rs = stmt.executeQuery(s);
              while (rs.next()) {
    @@ -862,16 +875,26 @@ public class TxnHandler {
                  case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
                  case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
                  case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
    - default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
    + case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
    + case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
    + default:
    + //do nothing to handle RU/D if we add another status
                }
                switch (rs.getString(5).charAt(0)) {
                  case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
                  case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
    - default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
    + default:
    + //do nothing to handle RU/D if we add another status
                }
                e.setWorkerid(rs.getString(6));
                e.setStart(rs.getLong(7));
    - e.setRunAs(rs.getString(8));
    + long endTime = rs.getLong(8);
    + if(endTime != -1) {
    + e.setEndTime(endTime);
    + }
    + e.setRunAs(rs.getString(9));
    + e.setHadoopJobId(rs.getString(10));
    + long id = rs.getLong(11);//for debugging
                response.addToCompacts(e);
              }
              LOG.debug("Going to rollback");
    @@ -2331,41 +2354,29 @@ public class TxnHandler {
              throw new MetaException(msg);
          }
        }
    - /**
    - * the caller is expected to retry if this fails
    - *
    - * @return
    - * @throws SQLException
    - * @throws MetaException
    - */
    - private long generateNewExtLockId() throws SQLException, MetaException {
    - Connection dbConn = null;
    - Statement stmt = null;
    - ResultSet rs = null;
    - try {
    - dbConn = getDbConn(getRequiredIsolationLevel());
    - stmt = dbConn.createStatement();
    -
    - // Get the next lock id.
    - String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
    - LOG.debug("Going to execute query <" + s + ">");
    - rs = stmt.executeQuery(s);
    - if (!rs.next()) {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - throw new MetaException("Transaction tables not properly " +
    - "initialized, no record found in next_lock_id");
    - }
    - long extLockId = rs.getLong(1);
    - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
    - LOG.debug("Going to execute update <" + s + ">");
    - stmt.executeUpdate(s);
    - LOG.debug("Going to commit.");
    - dbConn.commit();
    - return extLockId;
    + static String quoteString(String input) {
    + return "'" + input + "'";
    + }
    + static CompactionType dbCompactionType2ThriftType(char dbValue) {
    + switch (dbValue) {
    + case MAJOR_TYPE:
    + return CompactionType.MAJOR;
    + case MINOR_TYPE:
    + return CompactionType.MINOR;
    + default:
    + LOG.warn("Unexpected compaction type " + dbValue);
    + return null;
          }
    - finally {
    - close(rs, stmt, dbConn);
    + }
    + static Character thriftCompactionType2DbType(CompactionType ct) {
    + switch (ct) {
    + case MAJOR:
    + return MAJOR_TYPE;
    + case MINOR:
    + return MINOR_TYPE;
    + default:
    + LOG.warn("Unexpected compaction type " + ct);
    + return null;
          }
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    index 06e0932..ff2c2c1 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    @@ -219,7 +219,8 @@ public class TestCompactionTxnHandler {
          assertEquals(0, txnHandler.findReadyToClean().size());

          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - assertEquals(0, rsp.getCompactsSize());
    + assertEquals(1, rsp.getCompactsSize());
    + assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    index dee7601..96e4d40 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    @@ -17,17 +17,12 @@
       */
      package org.apache.hadoop.hive.ql.txn;

    +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.apache.hadoop.hive.conf.HiveConf;
    -import org.apache.hadoop.hive.metastore.HouseKeeperService;
      import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
    -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;

    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.ThreadFactory;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;

    @@ -35,58 +30,40 @@ import java.util.concurrent.atomic.AtomicInteger;
       * Performs background tasks for Transaction management in Hive.
       * Runs inside Hive Metastore Service.
       */
    -public class AcidHouseKeeperService implements HouseKeeperService {
    +public class AcidHouseKeeperService extends HouseKeeperServiceBase {
        private static final Logger LOG = LoggerFactory.getLogger(AcidHouseKeeperService.class);
    - private ScheduledExecutorService pool = null;
    - private final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
    +
        @Override
    - public void start(HiveConf hiveConf) throws Exception {
    - HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
    - if(!mgr.supportsAcid()) {
    - LOG.info(AcidHouseKeeperService.class.getName() + " not started since " +
    - mgr.getClass().getName() + " does not support Acid.");
    - return;//there are no transactions in this case
    - }
    - pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
    - private final AtomicInteger threadCounter = new AtomicInteger();
    - @Override
    - public Thread newThread(Runnable r) {
    - return new Thread(r, "DeadTxnReaper-" + threadCounter.getAndIncrement());
    - }
    - });
    - TimeUnit tu = TimeUnit.MILLISECONDS;
    - pool.scheduleAtFixedRate(new TimedoutTxnReaper(hiveConf, this),
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu),
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu),
    - TimeUnit.MILLISECONDS);
    - LOG.info("Started " + this.getClass().getName() + " with delay/interval = " +
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu) + "/" +
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu) + " " + tu);
    + protected long getStartDelayMs() {
    + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, TimeUnit.MILLISECONDS);
        }
        @Override
    - public void stop() {
    - if(pool != null && !pool.isShutdown()) {
    - pool.shutdown();
    - }
    - pool = null;
    + protected long getIntervalMs() {
    + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
    + }
    + @Override
    + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
    + return new TimedoutTxnReaper(hiveConf, isAliveCounter);
        }
    +
        @Override
        public String getServiceDescription() {
          return "Abort expired transactions";
        }
    +
        private static final class TimedoutTxnReaper implements Runnable {
          private final TxnHandler txnHandler;
    - private final AcidHouseKeeperService owner;
    - private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) {
    + private final AtomicInteger isAliveCounter;
    + private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
            txnHandler = new TxnHandler(hiveConf);
    - this.owner = owner;
    + this.isAliveCounter = isAliveCounter;
          }
          @Override
          public void run() {
            try {
              long startTime = System.currentTimeMillis();
              txnHandler.performTimeOuts();
    - int count = owner.isAliveCounter.incrementAndGet();
    + int count = isAliveCounter.incrementAndGet();
              LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
            }
            catch(Throwable t) {
    @@ -94,12 +71,4 @@ public class AcidHouseKeeperService implements HouseKeeperService {
            }
          }
        }
    -
    - /**
    - * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1.
    - * Starts with {@link java.lang.Integer#MIN_VALUE}
    - */
    - public int getIsAliveCounter() {
    - return isAliveCounter.get();
    - }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    index b847202..fbf5481 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    @@ -189,6 +189,7 @@ public class Cleaner extends CompactorThread {
            if (t == null) {
              // The table was dropped before we got around to cleaning it.
              LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped");
    + txnHandler.markCleaned(ci);
              return;
            }
            Partition p = null;
    @@ -198,6 +199,7 @@ public class Cleaner extends CompactorThread {
                // The partition was dropped before we got around to cleaning it.
                LOG.info("Unable to find partition " + ci.getFullPartitionName() +
                    ", assuming it was dropped");
    + txnHandler.markCleaned(ci);
                return;
              }
            }
    @@ -223,13 +225,11 @@ public class Cleaner extends CompactorThread {
                }
              });
            }
    -
    + txnHandler.markCleaned(ci);
          } catch (Exception e) {
    - LOG.error("Caught exception when cleaning, unable to complete cleaning " +
    + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
                StringUtils.stringifyException(e));
    - } finally {
    - // We need to clean this out one way or another.
    - txnHandler.markCleaned(ci);
    + txnHandler.markFailed(ci);
          }
        }


    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    index 7d0f46a..07ac0c2 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    @@ -137,6 +137,10 @@ public class CompactorMR {
         */
        void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
                 ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
    +
    + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
    + throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
    + }
          JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);

          // Figure out and encode what files we need to read. We do this here (rather than in

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    index a8fe57d..2ef06de 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    @@ -78,7 +78,7 @@ public class Initiator extends CompactorThread {

              // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
              // don't doom the entire thread.
    - try {
    + try {//todo: add method to only get current i.e. skip history - more efficient
                ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
                ValidTxnList txns =
                    CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
    @@ -119,6 +119,11 @@ public class Initiator extends CompactorThread {
                          ci.getFullPartitionName() + " so we will not initiate another compaction");
                      continue;
                    }
    + if(txnHandler.checkFailedCompactions(ci)) {
    + //todo: make 'a' state entry in completed_compactions
    + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
    + continue;
    + }

                    // Figure out who we should run the file operations as
                    Partition p = resolvePartition(ci);
    @@ -134,9 +139,9 @@ public class Initiator extends CompactorThread {
                    if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
                  } catch (Throwable t) {
                    LOG.error("Caught exception while trying to determine if we should compact " +
    - ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " +
    + ci + ". Marking clean to avoid repeated failures, " +
                        "" + StringUtils.stringifyException(t));
    - txnHandler.markCleaned(ci);
    + txnHandler.markFailed(ci);
                  }
                }

    @@ -300,7 +305,7 @@ public class Initiator extends CompactorThread {
          if (ci.partName != null) rqst.setPartitionname(ci.partName);
          rqst.setRunas(runAs);
          LOG.info("Requesting compaction: " + rqst);
    - txnHandler.compact(rqst);
    + ci.id = txnHandler.compact(rqst);
        }

        // Because TABLE_NO_AUTO_COMPACT was originally assumed to be NO_AUTO_COMPACT and then was moved

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    index 045ce63..ce03c8e 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    @@ -17,7 +17,6 @@
       */
      package org.apache.hadoop.hive.ql.txn.compactor;

    -import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.apache.hadoop.hive.common.ValidTxnList;
    @@ -70,7 +69,8 @@ public class Worker extends CompactorThread {
            throw new RuntimeException(e);
          }
        }
    -
    +//todo: this doesn;t check if compaction is already running (even though Initiator does but we
    +// don't go through Initiator for user initiated compactions)
        @Override
        public void run() {
          do {
    @@ -174,9 +174,9 @@ public class Worker extends CompactorThread {
                }
                txnHandler.markCompacted(ci);
              } catch (Exception e) {
    - LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() +
    + LOG.error("Caught exception while trying to compact " + ci +
                    ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
    - txnHandler.markCleaned(ci);
    + txnHandler.markFailed(ci);
              }
            } catch (Throwable t) {
              LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " +

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index b784585..7a1a3d2 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -24,10 +24,21 @@ import org.apache.hadoop.fs.FileUtil;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.common.FileUtils;
      import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.HouseKeeperService;
    +import org.apache.hadoop.hive.metastore.api.CompactionRequest;
    +import org.apache.hadoop.hive.metastore.api.CompactionType;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
    +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
      import org.apache.hadoop.hive.ql.session.SessionState;
    +import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
      import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
    +import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
      import org.apache.hadoop.hive.ql.txn.compactor.Worker;
      import org.junit.After;
      import org.junit.Assert;
    @@ -42,6 +53,7 @@ import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.Comparator;
      import java.util.List;
    +import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;

      /**
    @@ -454,6 +466,211 @@ public class TestTxnCommands2 {
          //insert overwrite not supported for ACID tables
        }
        /**
    + * HIVE-12353
    + * @throws Exception
    + */
    + @Test
    + public void testInitiatorWithMultipleFailedCompactions() throws Exception {
    + String tblName = "hive12353";
    + runStatementOnDriver("drop table if exists " + tblName);
    + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
    + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
    + " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
    + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
    + for(int i = 0; i < 5; i++) {
    + //generate enough delta files so that Initiator can trigger auto compaction
    + runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')");
    + }
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
    +
    + int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    + CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
    + AtomicBoolean stop = new AtomicBoolean(true);
    + //create failed compactions
    + for(int i = 0; i < numFailedCompactions; i++) {
    + //each of these should fail
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + runWorker(hiveConf);
    + }
    + //this should not schedule a new compaction due to prior failures
    + Initiator init = new Initiator();
    + init.setThreadId((int)init.getId());
    + init.setHiveConf(hiveConf);
    + init.init(stop, new AtomicBoolean());
    + init.run();
    +
    + CompactionsByState cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
    + Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
    +
    + hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
    + AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
    + runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
    + Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
    +
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
    + runWorker(hiveConf);//will fail
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + runWorker(hiveConf);//will fail
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
    + Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
    + runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
    + //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
    +
    +
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
    + Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
    +
    + runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
    + Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
    +
    + runCleaner(hiveConf); // transition to Success state
    + runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
    + Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
    + }
    + private static class CompactionsByState {
    + private int attempted;
    + private int failed;
    + private int initiated;
    + private int readyToClean;
    + private int succeeded;
    + private int working;
    + private int total;
    + }
    + private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
    + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    + CompactionsByState compactionsByState = new CompactionsByState();
    + compactionsByState.total = resp.getCompactsSize();
    + for(ShowCompactResponseElement compact : resp.getCompacts()) {
    + if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.failed++;
    + }
    + else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
    + compactionsByState.readyToClean++;
    + }
    + else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.initiated++;
    + }
    + else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.succeeded++;
    + }
    + else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
    + compactionsByState.working++;
    + }
    + else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.attempted++;
    + }
    + }
    + return compactionsByState;
    + }
    + private static void runWorker(HiveConf hiveConf) throws MetaException {
    + AtomicBoolean stop = new AtomicBoolean(true);
    + Worker t = new Worker();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean looped = new AtomicBoolean();
    + t.init(stop, looped);
    + t.run();
    + }
    + private static void runCleaner(HiveConf hiveConf) throws MetaException {
    + AtomicBoolean stop = new AtomicBoolean(true);
    + Cleaner t = new Cleaner();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean looped = new AtomicBoolean();
    + t.init(stop, looped);
    + t.run();
    + }
    +
    + private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
    + int lastCount = houseKeeperService.getIsAliveCounter();
    + houseKeeperService.start(conf);
    + while(houseKeeperService.getIsAliveCounter() <= lastCount) {
    + try {
    + Thread.sleep(100);//make sure it has run at least once
    + }
    + catch(InterruptedException ex) {
    + //...
    + }
    + }
    + houseKeeperService.stop();
    + }
    +
    + /**
    + * HIVE-12352 has details
    + * @throws Exception
    + */
    + @Test
    + public void writeBetweenWorkerAndCleaner() throws Exception {
    + String tblName = "hive12352";
    + runStatementOnDriver("drop table if exists " + tblName);
    + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
    + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
    + " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
    +
    + //create some data
    + runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
    + runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
    +
    + //run Worker to execute compaction
    + CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + Worker t = new Worker();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean stop = new AtomicBoolean(true);
    + AtomicBoolean looped = new AtomicBoolean();
    + t.init(stop, looped);
    + t.run();
    +
    + //delete something, but make sure txn is rolled back
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
    + runStatementOnDriver("delete from " + tblName + " where a = 1");
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
    +
    + List<String> expected = new ArrayList<>();
    + expected.add("1\tfoo");
    + expected.add("2\tbar");
    + expected.add("3\tblah");
    + Assert.assertEquals("", expected,
    + runStatementOnDriver("select a,b from " + tblName + " order by a"));
    +
    + //run Cleaner
    + Cleaner c = new Cleaner();
    + c.setThreadId((int)c.getId());
    + c.setHiveConf(hiveConf);
    + c.init(stop, new AtomicBoolean());
    + c.run();
    +
    + //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
    + Initiator i = new Initiator();
    + i.setThreadId((int)i.getId());
    + i.setHiveConf(hiveConf);
    + i.init(stop, new AtomicBoolean());
    + i.run();
    +
    + //check that aborted operation didn't become committed
    + Assert.assertEquals("", expected,
    + runStatementOnDriver("select a,b from " + tblName + " order by a"));
    + }
    + /**
         * takes raw data and turns it into a string as if from Driver.getResults()
         * sorts rows in dictionary order
         */

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    index bca5002..899f5a1 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    @@ -17,6 +17,7 @@
       */
      package org.apache.hadoop.hive.ql.txn.compactor;

    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.junit.Assert;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
    @@ -71,7 +72,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, null);
    @@ -102,7 +104,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -131,7 +134,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, null);
    @@ -169,7 +173,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -323,7 +328,8 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          rsp = txnHandler.showCompact(new ShowCompactRequest());
          compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -396,7 +402,8 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          rsp = txnHandler.showCompact(new ShowCompactRequest());
          compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -421,7 +428,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -451,7 +459,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -478,7 +487,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }
        @Override
        boolean useHive130DeltaDirName() {

    http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    index fe1d0d3..d0db406 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    @@ -17,6 +17,7 @@
       */
      package org.apache.hadoop.hive.ql.txn.compactor;

    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.apache.hadoop.fs.*;
    @@ -933,7 +934,8 @@ public class TestWorker extends CompactorTest {

          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
        }

        @Test
    @@ -957,6 +959,7 @@ public class TestWorker extends CompactorTest {

          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }
      }
  • Ekoifman at Jan 22, 2016 at 3:01 am
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 62200ef2f -> aa35e21ef


    HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned(). it should not. (Eugene Koifman, reviewed by Alan Gates)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa35e21e
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa35e21e
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa35e21e

    Branch: refs/heads/branch-1
    Commit: aa35e21efce950e897a9f2575930c865facc4dcf
    Parents: 62200ef
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Thu Jan 21 19:01:17 2016 -0800
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Thu Jan 21 19:01:17 2016 -0800

    ----------------------------------------------------------------------
      .../org/apache/hadoop/hive/conf/HiveConf.java | 24 +-
      .../hive/ql/txn/compactor/TestCompactor.java | 57 -----
      .../hadoop/hive/metastore/HiveMetaStore.java | 5 +-
      .../hive/metastore/HouseKeeperService.java | 6 +
      .../hive/metastore/txn/CompactionInfo.java | 56 +++-
      .../metastore/txn/CompactionTxnHandler.java | 256 +++++++++++++++++--
      .../hadoop/hive/metastore/txn/TxnDbUtil.java | 21 +-
      .../hadoop/hive/metastore/txn/TxnHandler.java | 93 ++++---
      .../metastore/txn/TestCompactionTxnHandler.java | 3 +-
      .../ql/txn/AcidCompactionHistoryService.java | 83 ++++++
      .../hive/ql/txn/AcidHouseKeeperService.java | 59 ++---
      .../hadoop/hive/ql/txn/compactor/Cleaner.java | 10 +-
      .../hive/ql/txn/compactor/CompactorMR.java | 4 +
      .../txn/compactor/HouseKeeperServiceBase.java | 92 +++++++
      .../hadoop/hive/ql/txn/compactor/Initiator.java | 13 +-
      .../hadoop/hive/ql/txn/compactor/Worker.java | 7 +-
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 217 ++++++++++++++++
      .../hive/ql/txn/compactor/TestCleaner.java | 28 +-
      .../hive/ql/txn/compactor/TestWorker.java | 7 +-
      19 files changed, 853 insertions(+), 188 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    ----------------------------------------------------------------------
    diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    index 1b6aff0..1f2677f 100644
    --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    @@ -937,6 +937,7 @@ public class HiveConf extends Configuration {
          HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false),
          HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
          HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false),
    + HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false),

          HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
              "Merge small files at the end of a map-only job"),
    @@ -1523,11 +1524,32 @@ public class HiveConf extends Configuration {
          HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
              "Number of aborted transactions involving a given table or partition that will trigger\n" +
              "a major compaction."),
    -
    +
    + COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2,
    + new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " +
    + "after which automatic compactions will not be scheduled any more. Note that this must be less " +
    + "than hive.compactor.history.retention.failed."),
    +
          HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
              new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
          COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
            "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
    +
    + COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3,
    + new RangeValidator(0, 100), "Determines how many successful compaction records will be " +
    + "retained in compaction history for a given table/partition."),
    +
    + COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3,
    + new RangeValidator(0, 100), "Determines how many failed compaction records will be " +
    + "retained in compaction history for a given table/partition."),
    +
    + COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2,
    + new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
    + "retained in compaction history for a given table/partition."),
    +
    + COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m",
    + new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"),
    +
          HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
            new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
          HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    index 59e9674..4336b53 100644
    --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    @@ -772,63 +772,6 @@ public class TestCompactor {
          }
        }

    - /**
    - * HIVE-12352 has details
    - * @throws Exception
    - */
    - @Test
    - public void writeBetweenWorkerAndCleaner() throws Exception {
    - String tblName = "HIVE12352";
    - executeStatementOnDriver("drop table if exists " + tblName, driver);
    - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
    - " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
    - " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
    -
    - //create some data
    - executeStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')", driver);
    - executeStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3", driver);
    -
    - //run Worker to execute compaction
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    - txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    - Worker t = new Worker();
    - t.setThreadId((int) t.getId());
    - t.setHiveConf(conf);
    - AtomicBoolean stop = new AtomicBoolean(true);
    - AtomicBoolean looped = new AtomicBoolean();
    - t.init(stop, looped);
    - t.run();
    -
    - //delete something, but make sure txn is rolled back
    - conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
    - executeStatementOnDriver("delete from " + tblName + " where a = 1", driver);
    - conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
    -
    - List<String> expected = new ArrayList<>();
    - expected.add("1\tfoo");
    - expected.add("2\tbar");
    - expected.add("3\tblah");
    - Assert.assertEquals("", expected,
    - execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
    -
    - //run Cleaner
    - Cleaner c = new Cleaner();
    - c.setThreadId((int)c.getId());
    - c.setHiveConf(conf);
    - c.init(stop, new AtomicBoolean());
    - c.run();
    -
    - //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
    - Initiator i = new Initiator();
    - i.setThreadId((int)i.getId());
    - i.setHiveConf(conf);
    - i.init(stop, new AtomicBoolean());
    - i.run();
    -
    - //check that aborted operation didn't become committed
    - Assert.assertEquals("", expected,
    - execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
    - }
        @Test
        public void majorCompactAfterAbort() throws Exception {
          String dbName = "default";

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    index 7f97b6e..60eb7b0 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    @@ -6301,7 +6301,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
          if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
            return;
          }
    - Class c = Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService");
    + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
    + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
    + }
    + private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
          //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
          //should be called form it
          HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance();

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    index eb4ea93..539ace0 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
    @@ -36,4 +36,10 @@ public interface HouseKeeperService {
         * Returns short description of services this module provides.
         */
        public String getServiceDescription();
    +
    + /**
    + * This is incremented each time the service is performed. Can be useful to
    + * check if serivce is still alive.
    + */
    + public int getIsAliveCounter();
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    index d3cb7d5..73255d2 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    @@ -19,6 +19,10 @@ package org.apache.hadoop.hive.metastore.txn;

      import org.apache.hadoop.hive.metastore.api.CompactionType;

    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +
      /**
       * Information on a possible or running compaction.
       */
    @@ -27,13 +31,18 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
        public String dbname;
        public String tableName;
        public String partName;
    + char state;
        public CompactionType type;
    + String workerId;
    + long start;
        public String runAs;
        public boolean tooManyAborts = false;
        /**
    - * {@code null} means it wasn't set (e.g. in case of upgrades)
    + * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
         */
    - public Long highestTxnId;
    + public long highestTxnId;
    + byte[] metaInfo;
    + String hadoopJobId;

        private String fullPartitionName = null;
        private String fullTableName = null;
    @@ -44,6 +53,11 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
          this.partName = partName;
          this.type = type;
        }
    + CompactionInfo(long id, String dbname, String tableName, String partName, char state) {
    + this(dbname, tableName, partName, null);
    + this.id = id;
    + this.state = state;
    + }
        CompactionInfo() {}

        public String getFullPartitionName() {
    @@ -82,9 +96,47 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
            "dbname:" + dbname + "," +
            "tableName:" + tableName + "," +
            "partName:" + partName + "," +
    + "state:" + state + "," +
            "type:" + type + "," +
            "runAs:" + runAs + "," +
            "tooManyAborts:" + tooManyAborts + "," +
            "highestTxnId:" + highestTxnId;
        }
    +
    + /**
    + * loads object from a row in Select * from COMPACTION_QUEUE
    + * @param rs ResultSet after call to rs.next()
    + * @throws SQLException
    + */
    + static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException {
    + CompactionInfo fullCi = new CompactionInfo();
    + fullCi.id = rs.getLong(1);
    + fullCi.dbname = rs.getString(2);
    + fullCi.tableName = rs.getString(3);
    + fullCi.partName = rs.getString(4);
    + fullCi.state = rs.getString(5).charAt(0);//cq_state
    + fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0));
    + fullCi.workerId = rs.getString(7);
    + fullCi.start = rs.getLong(8);
    + fullCi.runAs = rs.getString(9);
    + fullCi.highestTxnId = rs.getLong(10);
    + fullCi.metaInfo = rs.getBytes(11);
    + fullCi.hadoopJobId = rs.getString(12);
    + return fullCi;
    + }
    + static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException {
    + pStmt.setLong(1, ci.id);
    + pStmt.setString(2, ci.dbname);
    + pStmt.setString(3, ci.tableName);
    + pStmt.setString(4, ci.partName);
    + pStmt.setString(5, Character.toString(ci.state));
    + pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type)));
    + pStmt.setString(7, ci.workerId);
    + pStmt.setLong(8, ci.start);
    + pStmt.setLong(9, endTime);
    + pStmt.setString(10, ci.runAs);
    + pStmt.setLong(11, ci.highestTxnId);
    + pStmt.setBytes(12, ci.metaInfo);
    + pStmt.setString(13, ci.hadoopJobId);
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    index 03cf1d5..cf14b4e 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    @@ -174,16 +174,7 @@ public class CompactionTxnHandler extends TxnHandler {
                info.dbname = rs.getString(2);
                info.tableName = rs.getString(3);
                info.partName = rs.getString(4);
    - switch (rs.getString(5).charAt(0)) {
    - case MAJOR_TYPE:
    - info.type = CompactionType.MAJOR;
    - break;
    - case MINOR_TYPE:
    - info.type = CompactionType.MINOR;
    - break;
    - default:
    - throw new MetaException("Unexpected compaction type " + rs.getString(5));
    - }
    + info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
                // Now, update this record as being worked on by this worker.
                long now = getDbTime(dbConn);
                s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
    @@ -291,8 +282,7 @@ public class CompactionTxnHandler extends TxnHandler {
                  default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
                }
                info.runAs = rs.getString(6);
    - long highestTxnId = rs.getLong(7);
    - info.highestTxnId = rs.wasNull() ? null : highestTxnId;
    + info.highestTxnId = rs.getLong(7);
                rc.add(info);
              }
              LOG.debug("Going to rollback");
    @@ -323,13 +313,19 @@ public class CompactionTxnHandler extends TxnHandler {
          try {
            Connection dbConn = null;
            Statement stmt = null;
    + PreparedStatement pStmt = null;
            ResultSet rs = null;
            try {
    - //do we need serializable? Once we have the HWM as above, no. Before that
    - //it's debatable, but problem described above applies either way
    - //Thus can drop to RC
    - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
              stmt = dbConn.createStatement();
    + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
    + if(rs.next()) {
    + info = CompactionInfo.loadFullFromCompactionQueue(rs);
    + }
    + else {
    + throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
    + }
    + close(rs);
              String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
              LOG.debug("Going to execute update <" + s + ">");
              int updCount = stmt.executeUpdate(s);
    @@ -338,6 +334,10 @@ public class CompactionTxnHandler extends TxnHandler {
                LOG.debug("Going to rollback");
                dbConn.rollback();
              }
    + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
    + info.state = SUCCEEDED_STATE;
    + CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
    + updCount = pStmt.executeUpdate();

              // Remove entries from completed_txn_components as well, so we don't start looking there
              // again but only up to the highest txn ID include in this compaction job.
    @@ -347,7 +347,7 @@ public class CompactionTxnHandler extends TxnHandler {
              if (info.partName != null) {
                s += " and ctc_partition = '" + info.partName + "'";
              }
    - if(info.highestTxnId != null) {
    + if(info.highestTxnId != 0) {
                s += " and ctc_txnid <= " + info.highestTxnId;
              }
              LOG.debug("Going to execute update <" + s + ">");
    @@ -358,7 +358,7 @@ public class CompactionTxnHandler extends TxnHandler {

              s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
                TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
    - info.tableName + "'" + (info.highestTxnId == null ? "" : " and txn_id <= " + info.highestTxnId);
    + info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
              if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
              LOG.debug("Going to execute update <" + s + ">");
              rs = stmt.executeQuery(s);
    @@ -406,6 +406,7 @@ public class CompactionTxnHandler extends TxnHandler {
              throw new MetaException("Unable to connect to transaction database " +
                StringUtils.stringifyException(e));
            } finally {
    + closeStmt(pStmt);
              close(rs, stmt, dbConn);
            }
          } catch (RetryException e) {
    @@ -668,6 +669,225 @@ public class CompactionTxnHandler extends TxnHandler {
            setCompactionHighestTxnId(ci, highestTxnId);
          }
        }
    + private static class RetentionCounters {
    + int attemptedRetention = 0;
    + int failedRetention = 0;
    + int succeededRetention = 0;
    + RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
    + this.attemptedRetention = attemptedRetention;
    + this.failedRetention = failedRetention;
    + this.succeededRetention = succeededRetention;
    + }
    + }
    + private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
    + switch (ci.state) {
    + case ATTEMPTED_STATE:
    + if(--rc.attemptedRetention < 0) {
    + deleteSet.add(ci.id);
    + }
    + break;
    + case FAILED_STATE:
    + if(--rc.failedRetention < 0) {
    + deleteSet.add(ci.id);
    + }
    + break;
    + case SUCCEEDED_STATE:
    + if(--rc.succeededRetention < 0) {
    + deleteSet.add(ci.id);
    + }
    + break;
    + default:
    + //do nothing to hanlde future RU/D where we may want to add new state types
    + }
    + }
    +
    + /**
    + * For any given compactable entity (partition, table if not partitioned) the history of compactions
    + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
    + * history such that a configurable number of each type of state is present. Any other entries
    + * can be purged. This scheme has advantage of always retaining the last failure/success even if
    + * it's not recent.
    + * @throws MetaException
    + */
    + public void purgeCompactionHistory() throws MetaException {
    + Connection dbConn = null;
    + Statement stmt = null;
    + ResultSet rs = null;
    + List<Long> deleteSet = new ArrayList<>();
    + RetentionCounters rc = null;
    + try {
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
    + thus this query groups by entity and withing group sorts most recent first*/
    + rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
    + "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
    + String lastCompactedEntity = null;
    + /*In each group, walk from most recent and count occurences of each state type. Once you
    + * have counted enough (for each state) to satisfy retention policy, delete all other
    + * instances of this status.*/
    + while(rs.next()) {
    + CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
    + if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
    + lastCompactedEntity = ci.getFullPartitionName();
    + rc = new RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
    + getFailedCompactionRetention(),
    + conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
    + }
    + checkForDeletion(deleteSet, ci, rc);
    + }
    + close(rs);
    +
    + String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id IN(";
    + StringBuilder queryStr = new StringBuilder(baseDeleteSql);
    + for(int i = 0; i < deleteSet.size(); i++) {
    + if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) {
    + queryStr.setCharAt(queryStr.length() - 1, ')');
    + stmt.executeUpdate(queryStr.toString());
    + dbConn.commit();
    + queryStr = new StringBuilder(baseDeleteSql);
    + }
    + queryStr.append(deleteSet.get(i)).append(',');
    + }
    + if(queryStr.length() > baseDeleteSql.length()) {
    + queryStr.setCharAt(queryStr.length() - 1, ')');
    + int updCnt = stmt.executeUpdate(queryStr.toString());
    + dbConn.commit();
    + }
    + dbConn.commit();
    + } catch (SQLException e) {
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "purgeCompactionHistory()");
    + throw new MetaException("Unable to connect to transaction database " +
    + StringUtils.stringifyException(e));
    + } finally {
    + close(rs, stmt, dbConn);
    + }
    + } catch (RetryException ex) {
    + purgeCompactionHistory();
    + }
    + }
    + /**
    + * this ensures that the number of failed compaction entries retained is > than number of failed
    + * compaction threshold which prevents new compactions from being scheduled.
    + */
    + public int getFailedCompactionRetention() {
    + int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    + int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
    + if(failedRetention < failedThreshold) {
    + LOG.warn("Invalid configuration " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
    + "=" + failedRetention + " < " + HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
    + failedRetention + ". Will use " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
    + "=" + failedRetention);
    + failedRetention = failedThreshold;
    + }
    + return failedRetention;
    + }
    + /**
    + * Returns {@code true} if there already exists sufficient number of consecutive failures for
    + * this table/partition so that no new automatic compactions will be scheduled.
    + * User initiated compactions don't do this check.
    + *
    + * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should.
    + * That would be a meta operations, i.e. first find all partitions for this table (which have
    + * txn info) and schedule each compaction separately. This avoids complications in this logic.
    + */
    + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
    + Connection dbConn = null;
    + Statement stmt = null;
    + ResultSet rs = null;
    + try {
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
    + "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
    + "CC_TABLE = " + quoteString(ci.tableName) +
    + (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
    + " order by CC_ID desc");
    + int numFailed = 0;
    + int numTotal = 0;
    + int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    + while(rs.next() && ++numTotal <= failedThreshold) {
    + if(rs.getString(1).charAt(0) == FAILED_STATE) {
    + numFailed++;
    + }
    + else {
    + numFailed--;
    + }
    + }
    + return numFailed == failedThreshold;
    + }
    + catch (SQLException e) {
    + LOG.error("Unable to delete from compaction queue " + e.getMessage());
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
    + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
    + return false;//weren't able to check
    + } finally {
    + close(rs, stmt, dbConn);
    + }
    + } catch (RetryException e) {
    + return checkFailedCompactions(ci);
    + }
    + }
    + /**
    + * If there is an entry in compaction_queue with ci.id, remove it
    + * Make entry in completed_compactions with status 'f'.
    + *
    + * but what abount markCleaned() which is called when table is had been deleted...
    + */
    + public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
    + //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
    + try {
    + Connection dbConn = null;
    + Statement stmt = null;
    + PreparedStatement pStmt = null;
    + ResultSet rs = null;
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
    + if(rs.next()) {
    + ci = CompactionInfo.loadFullFromCompactionQueue(rs);
    + String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
    + LOG.debug("Going to execute update <" + s + ">");
    + int updCnt = stmt.executeUpdate(s);
    + }
    + else {
    + throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
    + }
    + close(rs, stmt, null);
    +
    + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
    + ci.state = FAILED_STATE;
    + CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
    + int updCount = pStmt.executeUpdate();
    + LOG.debug("Going to commit");
    + closeStmt(pStmt);
    + dbConn.commit();
    + } catch (SQLException e) {
    + LOG.error("Unable to delete from compaction queue " + e.getMessage());
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + try {
    + checkRetryable(dbConn, e, "markFailed(" + ci + ")");
    + }
    + catch(MetaException ex) {
    + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
    + }
    + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
    + } finally {
    + close(rs, stmt, null);
    + close(null, pStmt, dbConn);
    + }
    + } catch (RetryException e) {
    + markFailed(ci);
    + }
    + }
    +
      }



    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    index c14b11d..5704b5f 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    @@ -118,10 +118,27 @@ public final class TxnDbUtil {
                " CQ_WORKER_ID varchar(128)," +
                " CQ_START bigint," +
                " CQ_RUN_AS varchar(128)," +
    - " CQ_HIGHEST_TXN_ID bigint)");
    + " CQ_HIGHEST_TXN_ID bigint," +
    + " CQ_META_INFO varchar(2048) for bit data," +
    + " CQ_HADOOP_JOB_ID varchar(32))");

            stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
            stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
    +
    + stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
    + " CC_ID bigint PRIMARY KEY," +
    + " CC_DATABASE varchar(128) NOT NULL," +
    + " CC_TABLE varchar(128) NOT NULL," +
    + " CC_PARTITION varchar(767)," +
    + " CC_STATE char(1) NOT NULL," +
    + " CC_TYPE char(1) NOT NULL," +
    + " CC_WORKER_ID varchar(128)," +
    + " CC_START bigint," +
    + " CC_END bigint," +
    + " CC_RUN_AS varchar(128)," +
    + " CC_HIGHEST_TXN_ID bigint," +
    + " CC_META_INFO varchar(2048) for bit data," +
    + " CC_HADOOP_JOB_ID varchar(32))");

            conn.commit();
          } catch (SQLException e) {
    @@ -161,7 +178,7 @@ public final class TxnDbUtil {
            dropTable(stmt, "NEXT_LOCK_ID");
            dropTable(stmt, "COMPACTION_QUEUE");
            dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
    -
    + dropTable(stmt, "COMPLETED_COMPACTIONS");
            conn.commit();
          } finally {
            closeResources(conn, stmt, null);

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    index a8bb3b8..2411c3e 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    @@ -64,14 +64,20 @@ import java.util.concurrent.TimeUnit;
      @InterfaceAudience.Private
      @InterfaceStability.Evolving
      public class TxnHandler {
    - // Compactor states
    + // Compactor states (Should really be enum)
        static final public String INITIATED_RESPONSE = "initiated";
        static final public String WORKING_RESPONSE = "working";
        static final public String CLEANING_RESPONSE = "ready for cleaning";
    + static final public String FAILED_RESPONSE = "failed";
    + static final public String SUCCEEDED_RESPONSE = "succeeded";
    + static final public String ATTEMPTED_RESPONSE = "attempted";

        static final protected char INITIATED_STATE = 'i';
        static final protected char WORKING_STATE = 'w';
        static final protected char READY_FOR_CLEANING = 'r';
    + static final char FAILED_STATE = 'f';
    + static final char SUCCEEDED_STATE = 's';
    + static final char ATTEMPTED_STATE = 'a';

        // Compactor types
        static final protected char MAJOR_TYPE = 'a';
    @@ -759,7 +765,7 @@ public class TxnHandler {
          }
        }

    - public void compact(CompactionRequest rqst) throws MetaException {
    + public long compact(CompactionRequest rqst) throws MetaException {
          // Put a compaction request in the queue.
          try {
            Connection dbConn = null;
    @@ -826,6 +832,7 @@ public class TxnHandler {
              stmt.executeUpdate(s);
              LOG.debug("Going to commit");
              dbConn.commit();
    + return id;
            } catch (SQLException e) {
              LOG.debug("Going to rollback");
              rollbackDBConn(dbConn);
    @@ -837,7 +844,7 @@ public class TxnHandler {
              closeDbConn(dbConn);
            }
          } catch (RetryException e) {
    - compact(rqst);
    + return compact(rqst);
          }
        }

    @@ -850,7 +857,13 @@ public class TxnHandler {
              dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
              stmt = dbConn.createStatement();
              String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
    - "cq_start, cq_run_as from COMPACTION_QUEUE";
    + "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
    + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
    + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
    + //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013)
    + //to sort so that currently running jobs are at the end of the list (bottom of screen)
    + //and currently running ones are in sorted by start time
    + //w/o order by likely currently running compactions will be first (LHS of Union)
              LOG.debug("Going to execute query <" + s + ">");
              ResultSet rs = stmt.executeQuery(s);
              while (rs.next()) {
    @@ -862,16 +875,26 @@ public class TxnHandler {
                  case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
                  case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
                  case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
    - default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
    + case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
    + case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
    + default:
    + //do nothing to handle RU/D if we add another status
                }
                switch (rs.getString(5).charAt(0)) {
                  case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
                  case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
    - default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
    + default:
    + //do nothing to handle RU/D if we add another status
                }
                e.setWorkerid(rs.getString(6));
                e.setStart(rs.getLong(7));
    - e.setRunAs(rs.getString(8));
    + long endTime = rs.getLong(8);
    + if(endTime != -1) {
    + e.setEndTime(endTime);
    + }
    + e.setRunAs(rs.getString(9));
    + e.setHadoopJobId(rs.getString(10));
    + long id = rs.getLong(11);//for debugging
                response.addToCompacts(e);
              }
              LOG.debug("Going to rollback");
    @@ -2374,41 +2397,29 @@ public class TxnHandler {
              throw new MetaException(msg);
          }
        }
    - /**
    - * the caller is expected to retry if this fails
    - *
    - * @return
    - * @throws SQLException
    - * @throws MetaException
    - */
    - private long generateNewExtLockId() throws SQLException, MetaException {
    - Connection dbConn = null;
    - Statement stmt = null;
    - ResultSet rs = null;
    - try {
    - dbConn = getDbConn(getRequiredIsolationLevel());
    - stmt = dbConn.createStatement();
    -
    - // Get the next lock id.
    - String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
    - LOG.debug("Going to execute query <" + s + ">");
    - rs = stmt.executeQuery(s);
    - if (!rs.next()) {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - throw new MetaException("Transaction tables not properly " +
    - "initialized, no record found in next_lock_id");
    - }
    - long extLockId = rs.getLong(1);
    - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
    - LOG.debug("Going to execute update <" + s + ">");
    - stmt.executeUpdate(s);
    - LOG.debug("Going to commit.");
    - dbConn.commit();
    - return extLockId;
    + static String quoteString(String input) {
    + return "'" + input + "'";
    + }
    + static CompactionType dbCompactionType2ThriftType(char dbValue) {
    + switch (dbValue) {
    + case MAJOR_TYPE:
    + return CompactionType.MAJOR;
    + case MINOR_TYPE:
    + return CompactionType.MINOR;
    + default:
    + LOG.warn("Unexpected compaction type " + dbValue);
    + return null;
          }
    - finally {
    - close(rs, stmt, dbConn);
    + }
    + static Character thriftCompactionType2DbType(CompactionType ct) {
    + switch (ct) {
    + case MAJOR:
    + return MAJOR_TYPE;
    + case MINOR:
    + return MINOR_TYPE;
    + default:
    + LOG.warn("Unexpected compaction type " + ct);
    + return null;
          }
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    index 32c3d80..051da60 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    @@ -203,7 +203,8 @@ public class TestCompactionTxnHandler {
          assertEquals(0, txnHandler.findReadyToClean().size());

          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - assertEquals(0, rsp.getCompactsSize());
    + assertEquals(1, rsp.getCompactsSize());
    + assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    new file mode 100644
    index 0000000..a91ca5c
    --- /dev/null
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    @@ -0,0 +1,83 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements. See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.txn;
    +
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.HouseKeeperService;
    +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
    +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
    +import org.apache.hadoop.hive.ql.metadata.Hive;
    +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Purges obsolete items from compaction history data
    + */
    +public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
    + private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class);
    +
    + @Override
    + protected long getStartDelayMs() {
    + return 0;
    + }
    + @Override
    + protected long getIntervalMs() {
    + return hiveConf.getTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
    + }
    + @Override
    + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
    + return new ObsoleteEntryReaper(hiveConf, isAliveCounter);
    + }
    +
    + @Override
    + public String getServiceDescription() {
    + return "Removes obsolete entries from Compaction History";
    + }
    +
    + private static final class ObsoleteEntryReaper implements Runnable {
    + private final CompactionTxnHandler txnHandler;
    + private final AtomicInteger isAliveCounter;
    + private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
    + txnHandler = new CompactionTxnHandler(hiveConf);
    + this.isAliveCounter = isAliveCounter;
    + }
    +
    + @Override
    + public void run() {
    + try {
    + long startTime = System.currentTimeMillis();
    + txnHandler.purgeCompactionHistory();
    + int count = isAliveCounter.incrementAndGet();
    + LOG.info("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
    + }
    + catch(Throwable t) {
    + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
    + }
    + }
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    index 23a77e6..38151fb 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.HouseKeeperService;
      import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
      import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
    +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;

      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
    @@ -35,58 +36,40 @@ import java.util.concurrent.atomic.AtomicInteger;
       * Performs background tasks for Transaction management in Hive.
       * Runs inside Hive Metastore Service.
       */
    -public class AcidHouseKeeperService implements HouseKeeperService {
    +public class AcidHouseKeeperService extends HouseKeeperServiceBase {
        private static final Log LOG = LogFactory.getLog(AcidHouseKeeperService.class);
    - private ScheduledExecutorService pool = null;
    - private AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
    +
        @Override
    - public void start(HiveConf hiveConf) throws Exception {
    - HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
    - if(!mgr.supportsAcid()) {
    - LOG.info(AcidHouseKeeperService.class.getName() + " not started since " +
    - mgr.getClass().getName() + " does not support Acid.");
    - return;//there are no transactions in this case
    - }
    - pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
    - private final AtomicInteger threadCounter = new AtomicInteger();
    - @Override
    - public Thread newThread(Runnable r) {
    - return new Thread(r, "DeadTxnReaper-" + threadCounter.getAndIncrement());
    - }
    - });
    - TimeUnit tu = TimeUnit.MILLISECONDS;
    - pool.scheduleAtFixedRate(new TimedoutTxnReaper(hiveConf, this),
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu),
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu),
    - TimeUnit.MILLISECONDS);
    - LOG.info("Started " + this.getClass().getName() + " with delay/interval = " +
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu) + "/" +
    - hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu) + " " + tu);
    + protected long getStartDelayMs() {
    + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, TimeUnit.MILLISECONDS);
        }
        @Override
    - public void stop() {
    - if(pool != null && !pool.isShutdown()) {
    - pool.shutdown();
    - }
    - pool = null;
    + protected long getIntervalMs() {
    + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
    + }
    + @Override
    + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
    + return new TimedoutTxnReaper(hiveConf, isAliveCounter);
        }
    +
        @Override
        public String getServiceDescription() {
          return "Abort expired transactions";
        }
    +
        private static final class TimedoutTxnReaper implements Runnable {
          private final TxnHandler txnHandler;
    - private final AcidHouseKeeperService owner;
    - private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) {
    + private final AtomicInteger isAliveCounter;
    + private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
            txnHandler = new TxnHandler(hiveConf);
    - this.owner = owner;
    + this.isAliveCounter = isAliveCounter;
          }
          @Override
          public void run() {
            try {
              long startTime = System.currentTimeMillis();
              txnHandler.performTimeOuts();
    - int count = owner.isAliveCounter.incrementAndGet();
    + int count = isAliveCounter.incrementAndGet();
              LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
            }
            catch(Throwable t) {
    @@ -94,12 +77,4 @@ public class AcidHouseKeeperService implements HouseKeeperService {
            }
          }
        }
    -
    - /**
    - * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1.
    - * Starts with {@link java.lang.Integer#MIN_VALUE}
    - */
    - public int getIsAliveCounter() {
    - return isAliveCounter.get();
    - }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    index 622bf54..33580fd 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    @@ -189,6 +189,7 @@ public class Cleaner extends CompactorThread {
            if (t == null) {
              // The table was dropped before we got around to cleaning it.
              LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped");
    + txnHandler.markCleaned(ci);
              return;
            }
            Partition p = null;
    @@ -198,6 +199,7 @@ public class Cleaner extends CompactorThread {
                // The partition was dropped before we got around to cleaning it.
                LOG.info("Unable to find partition " + ci.getFullPartitionName() +
                    ", assuming it was dropped");
    + txnHandler.markCleaned(ci);
                return;
              }
            }
    @@ -223,13 +225,11 @@ public class Cleaner extends CompactorThread {
                }
              });
            }
    -
    + txnHandler.markCleaned(ci);
          } catch (Exception e) {
    - LOG.error("Caught exception when cleaning, unable to complete cleaning " +
    + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
                StringUtils.stringifyException(e));
    - } finally {
    - // We need to clean this out one way or another.
    - txnHandler.markCleaned(ci);
    + txnHandler.markFailed(ci);
          }
        }


    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    index 436c36d..e2cc253 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    @@ -136,6 +136,10 @@ public class CompactorMR {
         */
        void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
                 ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
    +
    + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
    + throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
    + }
          JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);

          // Figure out and encode what files we need to read. We do this here (rather than in

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
    new file mode 100644
    index 0000000..947f17c
    --- /dev/null
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
    @@ -0,0 +1,92 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements. See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */package org.apache.hadoop.hive.ql.txn.compactor;
    +
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.HouseKeeperService;
    +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
    +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public abstract class HouseKeeperServiceBase implements HouseKeeperService {
    + private static final Logger LOG = LoggerFactory.getLogger(HouseKeeperServiceBase.class);
    + private ScheduledExecutorService pool = null;
    + protected final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
    + protected HiveConf hiveConf;
    +
    + @Override
    + public void start(HiveConf hiveConf) throws Exception {
    + this.hiveConf = hiveConf;
    + HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
    + if(!mgr.supportsAcid()) {
    + LOG.info(this.getClass().getName() + " not started since " +
    + mgr.getClass().getName() + " does not support Acid.");
    + return;//there are no transactions in this case
    + }
    + pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
    + private final AtomicInteger threadCounter = new AtomicInteger();
    + @Override
    + public Thread newThread(Runnable r) {
    + return new Thread(r, this.getClass().getName() + "-" + threadCounter.getAndIncrement());
    + }
    + });
    +
    + TimeUnit tu = TimeUnit.MILLISECONDS;
    + pool.scheduleAtFixedRate(getScheduedAction(hiveConf, isAliveCounter), getStartDelayMs(),
    + getIntervalMs(), tu);
    + LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + getStartDelayMs() + "/" +
    + getIntervalMs() + " " + tu);
    + }
    +
    + @Override
    + public void stop() {
    + if(pool != null && !pool.isShutdown()) {
    + pool.shutdown();
    + }
    + pool = null;
    + }
    +
    + /**
    + * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1.
    + * Starts with {@link java.lang.Integer#MIN_VALUE}
    + */
    + @Override
    + public int getIsAliveCounter() {
    + return isAliveCounter.get();
    + }
    +
    + /**
    + * Delay in millis before first run of the task of this service.
    + */
    + protected abstract long getStartDelayMs();
    + /**
    + * Determines how fequently the service is running its task.
    + */
    + protected abstract long getIntervalMs();
    +
    + /**
    + * The actual task implementation. Must increment the counter on each iteration.
    + */
    + protected abstract Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter);
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    index 7b0c436..c023c27 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    @@ -78,7 +78,7 @@ public class Initiator extends CompactorThread {

              // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
              // don't doom the entire thread.
    - try {
    + try {//todo: add method to only get current i.e. skip history - more efficient
                ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
                ValidTxnList txns =
                    CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
    @@ -119,6 +119,11 @@ public class Initiator extends CompactorThread {
                          ci.getFullPartitionName() + " so we will not initiate another compaction");
                      continue;
                    }
    + if(txnHandler.checkFailedCompactions(ci)) {
    + //todo: make 'a' state entry in completed_compactions
    + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
    + continue;
    + }

                    // Figure out who we should run the file operations as
                    Partition p = resolvePartition(ci);
    @@ -134,9 +139,9 @@ public class Initiator extends CompactorThread {
                    if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
                  } catch (Throwable t) {
                    LOG.error("Caught exception while trying to determine if we should compact " +
    - ci.getFullPartitionName() + ". Marking clean to avoid repeated failures, " +
    + ci + ". Marking clean to avoid repeated failures, " +
                        "" + StringUtils.stringifyException(t));
    - txnHandler.markCleaned(ci);
    + txnHandler.markFailed(ci);
                  }
                }

    @@ -300,7 +305,7 @@ public class Initiator extends CompactorThread {
          if (ci.partName != null) rqst.setPartitionname(ci.partName);
          rqst.setRunas(runAs);
          LOG.info("Requesting compaction: " + rqst);
    - txnHandler.compact(rqst);
    + ci.id = txnHandler.compact(rqst);
        }

        // Because TABLE_NO_AUTO_COMPACT was originally assumed to be NO_AUTO_COMPACT and then was moved

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    index 6c11ddc..59a765b 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    @@ -69,7 +69,8 @@ public class Worker extends CompactorThread {
            throw new RuntimeException(e);
          }
        }
    -
    +//todo: this doesn;t check if compaction is already running (even though Initiator does but we
    +// don't go through Initiator for user initiated compactions)
        @Override
        public void run() {
          do {
    @@ -173,9 +174,9 @@ public class Worker extends CompactorThread {
                }
                txnHandler.markCompacted(ci);
              } catch (Exception e) {
    - LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() +
    + LOG.error("Caught exception while trying to compact " + ci +
                    ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
    - txnHandler.markCleaned(ci);
    + txnHandler.markFailed(ci);
              }
            } catch (Throwable t) {
              LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " +

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index 5a01695..350365c 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -24,10 +24,21 @@ import org.apache.hadoop.fs.FileUtil;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.common.FileUtils;
      import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.HouseKeeperService;
    +import org.apache.hadoop.hive.metastore.api.CompactionRequest;
    +import org.apache.hadoop.hive.metastore.api.CompactionType;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
    +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
      import org.apache.hadoop.hive.ql.session.SessionState;
    +import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
      import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
    +import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
      import org.apache.hadoop.hive.ql.txn.compactor.Worker;
      import org.junit.After;
      import org.junit.Assert;
    @@ -42,6 +53,7 @@ import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.Comparator;
      import java.util.List;
    +import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;

      /**
    @@ -454,6 +466,211 @@ public class TestTxnCommands2 {
          //insert overwrite not supported for ACID tables
        }
        /**
    + * HIVE-12353
    + * @throws Exception
    + */
    + @Test
    + public void testInitiatorWithMultipleFailedCompactions() throws Exception {
    + String tblName = "hive12353";
    + runStatementOnDriver("drop table if exists " + tblName);
    + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
    + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
    + " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
    + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
    + for(int i = 0; i < 5; i++) {
    + //generate enough delta files so that Initiator can trigger auto compaction
    + runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')");
    + }
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
    +
    + int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    + CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
    + AtomicBoolean stop = new AtomicBoolean(true);
    + //create failed compactions
    + for(int i = 0; i < numFailedCompactions; i++) {
    + //each of these should fail
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + runWorker(hiveConf);
    + }
    + //this should not schedule a new compaction due to prior failures
    + Initiator init = new Initiator();
    + init.setThreadId((int)init.getId());
    + init.setHiveConf(hiveConf);
    + init.init(stop, new AtomicBoolean());
    + init.run();
    +
    + CompactionsByState cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
    + Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
    +
    + hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
    + AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
    + runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
    + Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
    +
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
    + runWorker(hiveConf);//will fail
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + runWorker(hiveConf);//will fail
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
    + Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
    + runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
    + //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
    +
    +
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
    + Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
    +
    + runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
    + Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
    +
    + runCleaner(hiveConf); // transition to Success state
    + runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
    + cbs = countCompacts(txnHandler);
    + Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
    + Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
    + Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
    + }
    + private static class CompactionsByState {
    + private int attempted;
    + private int failed;
    + private int initiated;
    + private int readyToClean;
    + private int succeeded;
    + private int working;
    + private int total;
    + }
    + private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
    + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    + CompactionsByState compactionsByState = new CompactionsByState();
    + compactionsByState.total = resp.getCompactsSize();
    + for(ShowCompactResponseElement compact : resp.getCompacts()) {
    + if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.failed++;
    + }
    + else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
    + compactionsByState.readyToClean++;
    + }
    + else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.initiated++;
    + }
    + else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.succeeded++;
    + }
    + else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
    + compactionsByState.working++;
    + }
    + else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
    + compactionsByState.attempted++;
    + }
    + }
    + return compactionsByState;
    + }
    + private static void runWorker(HiveConf hiveConf) throws MetaException {
    + AtomicBoolean stop = new AtomicBoolean(true);
    + Worker t = new Worker();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean looped = new AtomicBoolean();
    + t.init(stop, looped);
    + t.run();
    + }
    + private static void runCleaner(HiveConf hiveConf) throws MetaException {
    + AtomicBoolean stop = new AtomicBoolean(true);
    + Cleaner t = new Cleaner();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean looped = new AtomicBoolean();
    + t.init(stop, looped);
    + t.run();
    + }
    +
    + private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
    + int lastCount = houseKeeperService.getIsAliveCounter();
    + houseKeeperService.start(conf);
    + while(houseKeeperService.getIsAliveCounter() <= lastCount) {
    + try {
    + Thread.sleep(100);//make sure it has run at least once
    + }
    + catch(InterruptedException ex) {
    + //...
    + }
    + }
    + houseKeeperService.stop();
    + }
    +
    + /**
    + * HIVE-12352 has details
    + * @throws Exception
    + */
    + @Test
    + public void writeBetweenWorkerAndCleaner() throws Exception {
    + String tblName = "hive12352";
    + runStatementOnDriver("drop table if exists " + tblName);
    + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
    + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
    + " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
    +
    + //create some data
    + runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
    + runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
    +
    + //run Worker to execute compaction
    + CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
    + txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
    + Worker t = new Worker();
    + t.setThreadId((int) t.getId());
    + t.setHiveConf(hiveConf);
    + AtomicBoolean stop = new AtomicBoolean(true);
    + AtomicBoolean looped = new AtomicBoolean();
    + t.init(stop, looped);
    + t.run();
    +
    + //delete something, but make sure txn is rolled back
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
    + runStatementOnDriver("delete from " + tblName + " where a = 1");
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
    +
    + List<String> expected = new ArrayList<>();
    + expected.add("1\tfoo");
    + expected.add("2\tbar");
    + expected.add("3\tblah");
    + Assert.assertEquals("", expected,
    + runStatementOnDriver("select a,b from " + tblName + " order by a"));
    +
    + //run Cleaner
    + Cleaner c = new Cleaner();
    + c.setThreadId((int)c.getId());
    + c.setHiveConf(hiveConf);
    + c.init(stop, new AtomicBoolean());
    + c.run();
    +
    + //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
    + Initiator i = new Initiator();
    + i.setThreadId((int)i.getId());
    + i.setHiveConf(hiveConf);
    + i.init(stop, new AtomicBoolean());
    + i.run();
    +
    + //check that aborted operation didn't become committed
    + Assert.assertEquals("", expected,
    + runStatementOnDriver("select a,b from " + tblName + " order by a"));
    + }
    + /**
         * takes raw data and turns it into a string as if from Driver.getResults()
         * sorts rows in dictionary order
         */

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    index 0db732c..913c8bc 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    @@ -17,6 +17,7 @@
       */
      package org.apache.hadoop.hive.ql.txn.compactor;

    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.junit.Assert;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
    @@ -71,7 +72,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, null);
    @@ -102,7 +104,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -131,7 +134,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, null);
    @@ -169,7 +173,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -323,7 +328,8 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          rsp = txnHandler.showCompact(new ShowCompactRequest());
          compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -396,7 +402,8 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          rsp = txnHandler.showCompact(new ShowCompactRequest());
          compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -421,7 +428,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -451,7 +459,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -478,7 +487,8 @@ public class TestCleaner extends CompactorTest {

          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    - Assert.assertEquals(0, rsp.getCompactsSize());
    + Assert.assertEquals(1, rsp.getCompactsSize());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }
        @Override
        boolean useHive130DeltaDirName() {

    http://git-wip-us.apache.org/repos/asf/hive/blob/aa35e21e/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    index 245e839..8862402 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.fs.*;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.metastore.api.*;
    +import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.apache.hadoop.hive.ql.io.AcidUtils;
      import org.junit.Assert;
      import org.junit.Before;
    @@ -933,7 +934,8 @@ public class TestWorker extends CompactorTest {

          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
        }

        @Test
    @@ -957,6 +959,7 @@ public class TestWorker extends CompactorTest {

          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
    - Assert.assertEquals(0, compacts.size());
    + Assert.assertEquals(1, compacts.size());
    + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }
      }

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJan 22, '16 at 1:56a
activeJan 22, '16 at 3:01a
posts3
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 3 posts

People

Translate

site design / logo © 2021 Grokbase