Grokbase Groups Hive commits May 2016
FAQ
Repository: hive
Updated Branches:
   refs/heads/master 3726ce590 -> 4959ff5bb


HIVE-13691 No record with CQ_ID=0 found in COMPACTION_QUEUE (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4959ff5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4959ff5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4959ff5b

Branch: refs/heads/master
Commit: 4959ff5bb7dc590e21f680b9d9be0f2270414309
Parents: 3726ce5
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Wed May 18 10:36:45 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Wed May 18 10:36:45 2016 -0700

----------------------------------------------------------------------
  .../metastore/txn/CompactionTxnHandler.java | 27 +++++--
  .../hadoop/hive/metastore/txn/TxnHandler.java | 36 +++++----
  .../hadoop/hive/ql/txn/compactor/Initiator.java | 5 +-
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 84 +++++++++++++-------
  4 files changed, 99 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ab7da68..d2d6462 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -691,7 +691,7 @@ class CompactionTxnHandler extends TxnHandler {
    }

    /**
- * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * For any given compactable entity (partition; table if not partitioned) the history of compactions
     * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
     * history such that a configurable number of each type of state is present. Any other entries
     * can be purged. This scheme has advantage of always retaining the last failure/success even if
@@ -793,7 +793,7 @@ class CompactionTxnHandler extends TxnHandler {
            "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
            "CC_TABLE = " + quoteString(ci.tableName) +
            (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
- " order by CC_ID desc");
+ " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
          int numFailed = 0;
          int numTotal = 0;
          int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
@@ -824,8 +824,8 @@ class CompactionTxnHandler extends TxnHandler {
    /**
     * If there is an entry in compaction_queue with ci.id, remove it
     * Make entry in completed_compactions with status 'f'.
- *
- * but what abount markCleaned() which is called when table is had been deleted...
+ * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction,
+ * which we record as ATTEMPTED_STATE entry in history.
     */
    public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
      //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
@@ -845,12 +845,27 @@ class CompactionTxnHandler extends TxnHandler {
            int updCnt = stmt.executeUpdate(s);
          }
          else {
- throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+ if(ci.id > 0) {
+ //the record with valid CQ_ID has disappeared - this is a sign of something wrong
+ throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+ }
+ }
+ if(ci.id == 0) {
+ //The failure occurred before we even made an entry in COMPACTION_QUEUE
+ //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+ ci.id = generateCompactionQueueId(stmt);
+ //mostly this indicates that the Initiator is paying attention to some table even though
+ //compactions are not happening.
+ ci.state = ATTEMPTED_STATE;
+ //this is not strictly accurate, but 'type' cannot be null.
+ ci.type = CompactionType.MINOR;
+ }
+ else {
+ ci.state = FAILED_STATE;
          }
          close(rs, stmt, null);

          pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
- ci.state = FAILED_STATE;
          CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
          int updCount = pStmt.executeUpdate();
          LOG.debug("Going to commit");

http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index f061767..bc818e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -18,6 +18,7 @@
  package org.apache.hadoop.hive.metastore.txn;

  import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Service;
  import com.jolbox.bonecp.BoneCPConfig;
  import com.jolbox.bonecp.BoneCPDataSource;
  import org.apache.commons.dbcp.ConnectionFactory;
@@ -1252,6 +1253,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
      }
    }

+ long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException {
+ // Get the id for the next entry in the queue
+ String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
+ LOG.debug("going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new IllegalStateException("Transaction tables not properly initiated, " +
+ "no record found in next_compaction_queue_id");
+ }
+ long id = rs.getLong(1);
+ s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ return id;
+ }
    public long compact(CompactionRequest rqst) throws MetaException {
      // Put a compaction request in the queue.
      try {
@@ -1261,21 +1277,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          lockInternal();
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
-
- // Get the id for the next entry in the queue
- String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
- LOG.debug("going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- if (!rs.next()) {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- throw new MetaException("Transaction tables not properly initiated, " +
- "no record found in next_compaction_queue_id");
- }
- long id = rs.getLong(1);
- s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+
+ long id = generateCompactionQueueId(stmt);

          StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
            "cq_table, ");
@@ -1315,7 +1318,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            buf.append(rqst.getRunas());
          }
          buf.append("')");
- s = buf.toString();
+ String s = buf.toString();
          LOG.debug("Going to execute update <" + s + ">");
          stmt.executeUpdate(s);
          LOG.debug("Going to commit");
@@ -1366,6 +1369,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
              case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
              case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
              case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
+ case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break;
              default:
                //do nothing to handle RU/D if we add another status
            }

http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 949cbd5..a55fa1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -126,8 +126,9 @@ public class Initiator extends CompactorThread {
                  continue;
                }
                if(txnHandler.checkFailedCompactions(ci)) {
- //todo: make 'a' state entry in completed_compactions
- LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
+ LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last "
+ + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed.");
+ txnHandler.markFailed(ci);
                  continue;
                }


http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 903337d..d80a03e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -492,6 +492,15 @@ public class TestTxnCommands2 {
      Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2);
      //insert overwrite not supported for ACID tables
    }
+ private static void checkCompactionState(CompactionsByState expected, CompactionsByState actual) {
+ Assert.assertEquals(TxnStore.ATTEMPTED_RESPONSE, expected.attempted, actual.attempted);
+ Assert.assertEquals(TxnStore.FAILED_RESPONSE, expected.failed, actual.failed);
+ Assert.assertEquals(TxnStore.INITIATED_RESPONSE, expected.initiated, actual.initiated);
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, expected.readyToClean, actual.readyToClean);
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, expected.succeeded, actual.succeeded);
+ Assert.assertEquals(TxnStore.WORKING_RESPONSE, expected.working, actual.working);
+ Assert.assertEquals("total", expected.total, actual.total);
+ }
    /**
     * HIVE-12353
     * @throws Exception
@@ -519,58 +528,60 @@ public class TestTxnCommands2 {
        txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
        runWorker(hiveConf);
      }
- //this should not schedule a new compaction due to prior failures
+ //this should not schedule a new compaction due to prior failures, but will create Attempted entry
      Initiator init = new Initiator();
      init.setThreadId((int)init.getId());
      init.setHiveConf(hiveConf);
      init.init(stop, new AtomicBoolean());
      init.run();
-
- CompactionsByState cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
- Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
+ int numAttemptedCompactions = 1;
+ checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));

      hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
      AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
      runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
- Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
+ checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));

      txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
      runWorker(hiveConf);//will fail
      txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
      runWorker(hiveConf);//will fail
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
- Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
+ init.run();
+ numAttemptedCompactions++;
+ init.run();
+ numAttemptedCompactions++;
+ checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler));
+
      runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
      //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
-
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,0,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)), countCompacts(txnHandler));

      hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
      txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
- //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
- Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+ //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated (explicitly by user)
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),1,0,0,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));

      runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
- Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
-
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,1,0,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
+
      runCleaner(hiveConf); // transition to Success state
      runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
- Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
    }

    /**
@@ -624,6 +635,18 @@ public class TestTxnCommands2 {
      private int succeeded;
      private int working;
      private int total;
+ CompactionsByState() {
+ this(0,0,0,0,0,0,0);
+ }
+ CompactionsByState(int attempted, int failed, int initiated, int readyToClean, int succeeded, int working, int total) {
+ this.attempted = attempted;
+ this.failed = failed;
+ this.initiated = initiated;
+ this.readyToClean = readyToClean;
+ this.succeeded = succeeded;
+ this.working = working;
+ this.total = total;
+ }
    }
    private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
      ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
@@ -648,6 +671,9 @@ public class TestTxnCommands2 {
        else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
          compactionsByState.attempted++;
        }
+ else {
+ throw new IllegalStateException("Unexpected state: " + compact.getState());
+ }
      }
      return compactionsByState;
    }

Search Discussions

Discussion Posts

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 2 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 18, '16 at 5:36p
activeMay 18, '16 at 5:48p
posts2
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 2 posts

People

Translate

site design / logo © 2021 Grokbase