FAQ
HIVE-11948 Investigate TxnHandler and CompactionTxnHandler to see where we improve concurrency(Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a80841b7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a80841b7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a80841b7

Branch: refs/heads/branch-1
Commit: a80841b73431de470cd0e53fcee3d04ca85ef7f5
Parents: b1c1bf2
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Tue Nov 17 16:43:42 2015 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Tue Nov 17 16:43:42 2015 -0800

----------------------------------------------------------------------
  .../hive/hcatalog/streaming/TestStreaming.java | 6 +-
  .../metastore/txn/CompactionTxnHandler.java | 125 +++--
  .../hadoop/hive/metastore/txn/TxnHandler.java | 523 ++++++++++++++-----
  .../metastore/txn/TestCompactionTxnHandler.java | 37 --
  .../hive/metastore/txn/TestTxnHandler.java | 10 +-
  .../java/org/apache/hadoop/hive/ql/Driver.java | 2 +-
  6 files changed, 463 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 58cfbaa..806dbdb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -618,7 +618,7 @@ public class TestStreaming {
    }

    @Test
- public void testHearbeat() throws Exception {
+ public void testHeartbeat() throws Exception {
      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
      DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
      StreamingConnection connection = endPt.newConnection(false, null);
@@ -632,14 +632,14 @@ public class TestStreaming {
      Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
      ShowLocksResponseElement lock = response.getLocks().get(0);
      long acquiredAt = lock.getAcquiredat();
- long heartbeatAt = lock.getAcquiredat();
+ long heartbeatAt = lock.getLastheartbeat();
      txnBatch.heartbeat();
      response = msClient.showLocks();
      Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
      lock = response.getLocks().get(0);
      Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
      Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
- ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt);
+ ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt);
    }
    @Test
    public void testTransactionBatchEmptyAbort() throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 44ee5c6..eab801a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -56,6 +56,7 @@ public class CompactionTxnHandler extends TxnHandler {
      Connection dbConn = null;
      Set<CompactionInfo> response = new HashSet<CompactionInfo>();
      Statement stmt = null;
+ ResultSet rs = null;
      try {
        try {
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -64,7 +65,7 @@ public class CompactionTxnHandler extends TxnHandler {
          String s = "select distinct ctc_database, ctc_table, " +
            "ctc_partition from COMPLETED_TXN_COMPONENTS";
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          while (rs.next()) {
            CompactionInfo info = new CompactionInfo();
            info.dbname = rs.getString(1);
@@ -72,6 +73,7 @@ public class CompactionTxnHandler extends TxnHandler {
            info.partName = rs.getString(3);
            response.add(info);
          }
+ rs.close();

          // Check for aborted txns
          s = "select tc_database, tc_table, tc_partition " +
@@ -97,8 +99,7 @@ public class CompactionTxnHandler extends TxnHandler {
          LOG.error("Unable to connect to transaction database " + e.getMessage());
          checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
        } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
        }
        return response;
      }
@@ -118,7 +119,7 @@ public class CompactionTxnHandler extends TxnHandler {
        Connection dbConn = null;
        Statement stmt = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
          LOG.debug("Going to execute update <" + s + ">");
@@ -153,46 +154,58 @@ public class CompactionTxnHandler extends TxnHandler {
    public CompactionInfo findNextToCompact(String workerId) throws MetaException {
      try {
        Connection dbConn = null;
- CompactionInfo info = new CompactionInfo();
-
        Statement stmt = null;
+ ResultSet rs = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "select cq_id, cq_database, cq_table, cq_partition, " +
            "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          if (!rs.next()) {
            LOG.debug("No compactions found ready to compact");
            dbConn.rollback();
            return null;
          }
- info.id = rs.getLong(1);
- info.dbname = rs.getString(2);
- info.tableName = rs.getString(3);
- info.partName = rs.getString(4);
- switch (rs.getString(5).charAt(0)) {
- case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
- case MINOR_TYPE: info.type = CompactionType.MINOR; break;
- default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
- }
-
- // Now, update this record as being worked on by this worker.
- long now = getDbTime(dbConn);
- s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
- "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
- LOG.debug("Going to execute update <" + s + ">");
- int updCount = stmt.executeUpdate(s);
- if (updCount != 1) {
+ do {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE:
+ info.type = CompactionType.MAJOR;
+ break;
+ case MINOR_TYPE:
+ info.type = CompactionType.MINOR;
+ break;
+ default:
+ throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ }
+ // Now, update this record as being worked on by this worker.
+ long now = getDbTime(dbConn);
+ s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
+ "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
+ " AND cq_state='" + INITIATED_STATE + "'";
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCount = stmt.executeUpdate(s);
+ if(updCount == 1) {
+ dbConn.commit();
+ return info;
+ }
+ if(updCount == 0) {
+ LOG.debug("Another Worker picked up " + info);
+ continue;
+ }
            LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
- info + ". updCnt=" + updCount);
- LOG.debug("Going to rollback");
+ info + ". updCnt=" + updCount + ".");
            dbConn.rollback();
- }
- LOG.debug("Going to commit");
- dbConn.commit();
- return info;
+ return null;
+ } while( rs.next());
+ dbConn.rollback();
+ return null;
        } catch (SQLException e) {
          LOG.error("Unable to select next element for compaction, " + e.getMessage());
          LOG.debug("Going to rollback");
@@ -201,8 +214,7 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        return findNextToCompact(workerId);
@@ -219,7 +231,7 @@ public class CompactionTxnHandler extends TxnHandler {
        Connection dbConn = null;
        Statement stmt = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
            "cq_worker_id = null where cq_id = " + info.id;
@@ -240,8 +252,8 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
          closeStmt(stmt);
+ closeDbConn(dbConn);
        }
      } catch (RetryException e) {
        markCompacted(info);
@@ -258,6 +270,7 @@ public class CompactionTxnHandler extends TxnHandler {
      List<CompactionInfo> rc = new ArrayList<CompactionInfo>();

      Statement stmt = null;
+ ResultSet rs = null;
      try {
        try {
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -265,7 +278,7 @@ public class CompactionTxnHandler extends TxnHandler {
          String s = "select cq_id, cq_database, cq_table, cq_partition, " +
            "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          while (rs.next()) {
            CompactionInfo info = new CompactionInfo();
            info.id = rs.getLong(1);
@@ -291,8 +304,7 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        return findReadyToClean();
@@ -303,23 +315,28 @@ public class CompactionTxnHandler extends TxnHandler {
     * This will remove an entry from the queue after
     * it has been compacted.
     *
- * todo: possibly a problem? Worker will start with DB in state X (wrt this partition).
+ * todo: Worker will start with DB in state X (wrt this partition).
     * while it's working more txns will happen, against partition it's compacting.
     * then this will delete state up to X and since then. There may be new delta files created
     * between compaction starting and cleaning. These will not be compacted until more
     * transactions happen. So this ideally should only delete
     * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run
- * at READ_COMMITTED
+ * at READ_COMMITTED. So this means we'd want to store HWM in COMPACTION_QUEUE when
+ * Worker picks up the job.
     *
     * Also, by using this method when Worker fails, we prevent future compactions from
- * running until more data is written to tale or compaction is invoked explicitly
+ * running until more data is written to table or compaction is invoked explicitly
     * @param info info on the compaction entry to remove
     */
    public void markCleaned(CompactionInfo info) throws MetaException {
      try {
        Connection dbConn = null;
        Statement stmt = null;
+ ResultSet rs = null;
        try {
+ //do we need serializable? Once we have the HWM as above, no. Before that
+ //it's debatable, but problem described above applies either way
+ //Thus can drop to RC
          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
@@ -344,19 +361,20 @@ public class CompactionTxnHandler extends TxnHandler {
              "marking compaction entry as clean!");
          }

-
+ //todo: add distinct in query
          s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
            TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
            info.tableName + "'";
          if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
          LOG.debug("Going to execute update <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          Set<Long> txnids = new HashSet<Long>();
          while (rs.next()) txnids.add(rs.getLong(1));
          if (txnids.size() > 0) {

            // Remove entries from txn_components, as there may be aborted txn components
            StringBuilder buf = new StringBuilder();
+ //todo: add a safeguard to make sure IN clause is not too large; break up by txn id
            buf.append("delete from TXN_COMPONENTS where tc_txnid in (");
            boolean first = true;
            for (long id : txnids) {
@@ -394,8 +412,7 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        markCleaned(info);
@@ -409,14 +426,17 @@ public class CompactionTxnHandler extends TxnHandler {
      try {
        Connection dbConn = null;
        Statement stmt = null;
+ ResultSet rs = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ //Aborted is a terminal state, so nothing about the txn can change
+ //after that, so READ COMMITTED is sufficient.
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "select txn_id from TXNS where " +
            "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
            "txn_state = '" + TXN_ABORTED + "'";
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          Set<Long> txnids = new HashSet<Long>();
          while (rs.next()) txnids.add(rs.getLong(1));
          if (txnids.size() > 0) {
@@ -443,8 +463,7 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        cleanEmptyAbortedTxns();
@@ -465,7 +484,7 @@ public class CompactionTxnHandler extends TxnHandler {
        Connection dbConn = null;
        Statement stmt = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
            + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
@@ -485,8 +504,8 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
          closeStmt(stmt);
+ closeDbConn(dbConn);
        }
      } catch (RetryException e) {
        revokeFromLocalWorkers(hostname);
@@ -507,7 +526,7 @@ public class CompactionTxnHandler extends TxnHandler {
        Connection dbConn = null;
        Statement stmt = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          long latestValidStart = getDbTime(dbConn) - timeout;
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
@@ -528,8 +547,8 @@ public class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
- closeDbConn(dbConn);
          closeStmt(stmt);
+ closeDbConn(dbConn);
        }
      } catch (RetryException e) {
        revokeTimedoutWorkers(timeout);

http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0b3d565..15b747d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -52,6 +52,11 @@ import java.util.concurrent.TimeUnit;
   * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
   * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
   * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
   */
  public class TxnHandler {
    // Compactor states
@@ -150,12 +155,20 @@ public class TxnHandler {
        // subsequently shows up in the open list that's ok.
        Connection dbConn = null;
        Statement stmt = null;
+ ResultSet rs = null;
        try {
+ /**
+ * This method can run at READ_COMMITTED as long as long as
+ * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
+ * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
+ * adding corresponding entries into TXNS. The reason is that any txnid below HWM
+ * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
+ */
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "select ntxn_next - 1 from NEXT_TXN_ID";
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          if (!rs.next()) {
            throw new MetaException("Transaction tables not properly " +
              "initialized, no record found in next_txn_id");
@@ -165,7 +178,7 @@ public class TxnHandler {
            throw new MetaException("Transaction tables not properly " +
              "initialized, null record found in next_txn_id");
          }
-
+ close(rs);
          List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
          //need the WHERE clause below to ensure consistent results with READ_COMMITTED
          s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm;
@@ -199,8 +212,7 @@ public class TxnHandler {
          throw new MetaException("Unable to select from transaction database: " + getMessage(e)
            + StringUtils.stringifyException(e));
        } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        return getOpenTxnsInfo();
@@ -215,12 +227,16 @@ public class TxnHandler {
        // subsequently shows up in the open list that's ok.
        Connection dbConn = null;
        Statement stmt = null;
+ ResultSet rs = null;
        try {
+ /**
+ * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
+\ */
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
          String s = "select ntxn_next - 1 from NEXT_TXN_ID";
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          if (!rs.next()) {
            throw new MetaException("Transaction tables not properly " +
              "initialized, no record found in next_txn_id");
@@ -230,7 +246,7 @@ public class TxnHandler {
            throw new MetaException("Transaction tables not properly " +
              "initialized, null record found in next_txn_id");
          }
-
+ close(rs);
          Set<Long> openList = new HashSet<Long>();
          //need the WHERE clause below to ensure consistent results with READ_COMMITTED
          s = "select txn_id from TXNS where txn_id <= " + hwm;
@@ -249,8 +265,7 @@ public class TxnHandler {
          throw new MetaException("Unable to select from transaction database, "
            + StringUtils.stringifyException(e));
        } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        return getOpenTxns();
@@ -284,17 +299,35 @@ public class TxnHandler {
      try {
        Connection dbConn = null;
        Statement stmt = null;
+ ResultSet rs = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ /**
+ * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
+ * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
+ * Also, advancing the counter must work when multiple metastores are running, thus either
+ * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation. The former is preferred since it prevents
+ * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
+ *
+ * In the current design, there can be several metastore instances running in a given Warehouse.
+ * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example,
+ * a client may go to MS1 and start a transaction with ID 500 to update a particular row.
+ * Now the same client will start another transaction, except it ends up on MS2 and may get
+ * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot
+ * on read will thing the version of the row from transaction ID 500 is the latest one.
+ *
+ * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This
+ * set could support a write-through cache for added performance.
+ */
+ dbConn = getDbConn(getRequiredIsolationLevel());
          // Make sure the user has not requested an insane amount of txns.
          int maxTxns = HiveConf.getIntVar(conf,
            HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
          if (numTxns > maxTxns) numTxns = maxTxns;

          stmt = dbConn.createStatement();
- String s = "select ntxn_next from NEXT_TXN_ID";
+ String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID");
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          if (!rs.next()) {
            throw new MetaException("Transaction database not properly " +
              "configured, can't find next transaction id.");
@@ -312,10 +345,11 @@ public class TxnHandler {
          List<Long> txnIds = new ArrayList<Long>(numTxns);
          for (long i = first; i < first + numTxns; i++) {
            ps.setLong(1, i);
+ //todo: this would be more efficient with a single insert with multiple rows in values()
+ //need add a safeguard to not exceed the DB capabilities.
            ps.executeUpdate();
            txnIds.add(i);
          }
-
          LOG.debug("Going to commit");
          dbConn.commit();
          return new OpenTxnsResponse(txnIds);
@@ -326,8 +360,7 @@ public class TxnHandler {
          throw new MetaException("Unable to select from transaction database "
            + StringUtils.stringifyException(e));
        } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
        return openTxns(rqst);
@@ -369,6 +402,24 @@ public class TxnHandler {
        Connection dbConn = null;
        Statement stmt = null;
        try {
+ /**
+ * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS)
+ * can happen. Otherwise we may end up with orphaned locks. While lock() and commitTxn() should not
+ * normally run concurrently (for same txn) but could due to bugs in the client which could then
+ * (w/o SERIALIZABLE) corrupt internal transaction manager state. Also competes with abortTxn()
+ *
+ * Sketch of an improvement:
+ * Introduce a new transaction state in TXNS, state 'c'. This is a transient Committed state.
+ * commitTxn() would mark the txn 'c' in TXNS in an independent txn. Other operation like
+ * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would
+ * treat 'c' txn as 'open'. Then this method could run in READ COMMITTED since the
+ * entry for this txn in TXNS in 'c' acts like a monitor.
+ * Since the move to 'c' state is in one txn (to make it visible) and the rest of the
+ * operations in another (could even be made separate txns), there is a possibility of failure
+ * between the 2. Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns.
+ *
+ * Or perhaps Select * TXNS where txn_id = " + txnid; for update
+ */
          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to
@@ -423,7 +474,7 @@ public class TxnHandler {
        Connection dbConn = null;
        try {
          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- return lock(dbConn, rqst, true);
+ return lock(dbConn, rqst);
        } catch (SQLException e) {
          LOG.debug("Going to rollback");
          rollbackDBConn(dbConn);
@@ -438,48 +489,49 @@ public class TxnHandler {
      }
    }

- public LockResponse lockNoWait(LockRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException {
- try {
- Connection dbConn = null;
- try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- return lock(dbConn, rqst, false);
- } catch (SQLException e) {
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "lockNoWait(" + rqst + ")");
- throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
- } finally {
- closeDbConn(dbConn);
- }
- } catch (RetryException e) {
- return lockNoWait(rqst);
- }
- }
-
+ /**
+ * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one.
+ * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change.
+ *
+ * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(),
+ * in practice more often)
+ * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB.
+ *
+ * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
+ * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
+ *
+ * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking
+ * against doesn't move from W to A in another txn) but this method can heartbeat in
+ * separate txn at READ_COMMITTED.
+ */
    public LockResponse checkLock(CheckLockRequest rqst)
      throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
      try {
        Connection dbConn = null;
+ long extLockId = rqst.getLockid();
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- long extLockId = rqst.getLockid();
-
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          // Heartbeat on the lockid first, to assure that our lock is still valid.
          // Then look up the lock info (hopefully in the cache). If these locks
          // are associated with a transaction then heartbeat on that as well.
- heartbeatLock(dbConn, extLockId);
- long txnid = getTxnIdFromLockId(dbConn, extLockId);
- if (txnid > 0) heartbeatTxn(dbConn, txnid);
- return checkLock(dbConn, extLockId, true);
+ Long txnid = getTxnIdFromLockId(dbConn, extLockId);
+ if(txnid == null) {
+ throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ }
+ if (txnid > 0) {
+ heartbeatTxn(dbConn, txnid);
+ }
+ else {
+ heartbeatLock(dbConn, extLockId);
+ }
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ return checkLock(dbConn, extLockId);
        } catch (SQLException e) {
          LOG.debug("Going to rollback");
          rollbackDBConn(dbConn);
          checkRetryable(dbConn, e, "checkLock(" + rqst + " )");
          throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
        }
@@ -489,39 +541,56 @@ public class TxnHandler {

    }

+ /**
+ * This would have been made simpler if all locks were associated with a txn. Then only txn needs to
+ * be heartbeated, committed, etc. no need for client to track individual locks.
+ */
    public void unlock(UnlockRequest rqst)
      throws NoSuchLockException, TxnOpenException, MetaException {
      try {
        Connection dbConn = null;
        Statement stmt = null;
+ long extLockId = rqst.getLockid();
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- // Odd as it seems, we need to heartbeat first because this touches the
- // lock table and assures that our locks our still valid. If they are
- // not, this will throw an exception and the heartbeat will fail.
- long extLockId = rqst.getLockid();
- heartbeatLock(dbConn, extLockId);
- long txnid = getTxnIdFromLockId(dbConn, extLockId);
- // If there is a valid txnid, throw an exception,
- // as locks associated with transactions should be unlocked only when the
- // transaction is committed or aborted.
- if (txnid > 0) {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- String msg = "Unlocking locks associated with transaction" +
- " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " +
- "transaction " + JavaUtils.txnIdToString(txnid);
- LOG.error(msg);
- throw new TxnOpenException(msg);
- }
+ /**
+ * This method is logically like commit for read-only auto commit queries.
+ * READ_COMMITTED since this only has 1 delete statement and no new entries with the
+ * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are
+ * created in a single atomic operation.
+ * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)}
+ * but hl_lock_ext_id is not known until that method returns.
+ * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}
+ * but using SERIALIZABLE doesn't materially change the interaction.
+ * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg.
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          stmt = dbConn.createStatement();
- String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId;
+ //hl_txnid <> 0 means it's associated with a transaction
+ String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid = 0";
          LOG.debug("Going to execute update <" + s + ">");
          int rc = stmt.executeUpdate(s);
          if (rc < 1) {
            LOG.debug("Going to rollback");
            dbConn.rollback();
- throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ Long txnid = getTxnIdFromLockId(dbConn, extLockId);
+ if(txnid == null) {
+ LOG.error("No lock found for unlock(" + rqst + ")");
+ throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ }
+ if(txnid != 0) {
+ String msg = "Unlocking locks associated with transaction" +
+ " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " +
+ "transaction " + JavaUtils.txnIdToString(txnid);
+ LOG.error(msg);
+ throw new TxnOpenException(msg);
+ }
+ if(txnid == 0) {
+ //we didn't see this lock when running DELETE stmt above but now it showed up
+ //so should "should never happen" happened...
+ String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid);
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
          }
          LOG.debug("Going to commit");
          dbConn.commit();
@@ -530,7 +599,7 @@ public class TxnHandler {
          rollbackDBConn(dbConn);
          checkRetryable(dbConn, e, "unlock(" + rqst + ")");
          throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
        } finally {
          closeStmt(stmt);
          closeDbConn(dbConn);
@@ -615,6 +684,10 @@ public class TxnHandler {
      }
    }

+ /**
+ * {@code ids} should only have txnid or lockid but not both, ideally.
+ * Currently DBTxnManager.heartbeat() enforces this.
+ */
    public void heartbeat(HeartbeatRequest ids)
      throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
      try {
@@ -647,9 +720,17 @@ public class TxnHandler {
        rsp.setNosuch(nosuch);
        rsp.setAborted(aborted);
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ /**
+ * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)}
+ * only has 1 update statement in it and
+ * we only update existing txns, i.e. nothing can add additional txns that this operation
+ * would care about (which would have required SERIALIZABLE)
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
            try {
+ //todo: this is expensive call: at least 2 update queries per txn
+ //is this really worth it?
              heartbeatTxn(dbConn, txn);
            } catch (NoSuchTxnException e) {
              nosuch.add(txn);
@@ -678,11 +759,11 @@ public class TxnHandler {
        Connection dbConn = null;
        Statement stmt = null;
        try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(getRequiredIsolationLevel());
          stmt = dbConn.createStatement();

          // Get the id for the next entry in the queue
- String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID";
+ String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID");
          LOG.debug("going to execute query <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
          if (!rs.next()) {
@@ -1293,19 +1374,31 @@ public class TxnHandler {
      }
    }

+ private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+ return abortTxns(dbConn, txnids, -1);
+ }
    /**
- * Abort a group of txns
+ * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining
+ * trasnactions in a batch on IOExceptions.
     * @param dbConn An active connection
     * @param txnids list of transactions to abort
+ * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
+ * hearbetated after #performTimeOuts() select and this operation.
     * @return Number of aborted transactions
     * @throws SQLException
     */
- private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+ private int abortTxns(Connection dbConn, List<Long> txnids, long max_heartbeat) throws SQLException {
      Statement stmt = null;
      int updateCnt = 0;
      if (txnids.isEmpty()) {
        return 0;
      }
+ if(Connection.TRANSACTION_SERIALIZABLE != dbConn.getTransactionIsolation()) {
+ /** Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently
+ * which would cause them to become orphaned.
+ */
+ throw new IllegalStateException("Expected SERIALIZABLE isolation. Found " + dbConn.getTransactionIsolation());
+ }
      try {
        stmt = dbConn.createStatement();

@@ -1321,6 +1414,8 @@ public class TxnHandler {
        LOG.debug("Going to execute update <" + buf.toString() + ">");
        stmt.executeUpdate(buf.toString());

+ //todo: seems like we should do this first and if it misses, don't bother with
+ //delete from HIVE_LOCKS since it will be rolled back
        buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED +
          "' where txn_state = '" + TXN_OPEN + "' and txn_id in (");
        first = true;
@@ -1330,6 +1425,9 @@ public class TxnHandler {
          buf.append(id);
        }
        buf.append(')');
+ if(max_heartbeat > 0) {
+ buf.append(" and txn_last_heartbeat < ").append(max_heartbeat);
+ }
        LOG.debug("Going to execute update <" + buf.toString() + ">");
        updateCnt = stmt.executeUpdate(buf.toString());

@@ -1340,22 +1438,33 @@ public class TxnHandler {
    }

    /**
+ * Isolation Level Notes:
+ * Run at SERIALIZABLE to make sure no one is adding new locks while we are checking conflicts here.
+ *
+ * Ramblings:
+ * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC
+ * since this is just in Wait state.
+ * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of
+ * failure, the W locks will timeout if failure does not propagate to client in some way, or it
+ * will and client will Abort).
+ * Actually, whether we can do this depends on what happens when you try to get a lock and notice
+ * a conflicting locks in W mode do we wait in this case? if so it's a problem because while you
+ * are checking new locks someone may insert new W locks that you don't see...
+ * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume
+ * that additional W locks will have higher IDs????
+ *
+ * We can use Select for Update to generate the next LockID. In fact we can easily do this in a separate txn.
+ * This avoids contention on NEXT_LOCK_ID. The rest of the logic will be still need to be done at Serializable, I think,
+ * but it will not be updating the same row from 2 DB.
+ *
     * Request a lock
     * @param dbConn database connection
     * @param rqst lock information
- * @param wait whether to wait for this lock. The function will return immediately one way or
- * another. If true and the lock could not be acquired the response will have a
- * state of WAITING. The caller will then need to poll using
- * {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}. If
- * false and the lock could not be acquired, then the response will have a state
- * of NOT_ACQUIRED. The caller will need to call
- * {@link #lockNoWait(org.apache.hadoop.hive.metastore.api.LockRequest)} again to
- * attempt another lock.
     * @return information on whether the lock was acquired.
     * @throws NoSuchTxnException
     * @throws TxnAbortedException
     */
- private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait)
+ private LockResponse lock(Connection dbConn, LockRequest rqst)
      throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
      // We want to minimize the number of concurrent lock requests being issued. If we do not we
      // get a large number of deadlocks in the database, since this method has to both clean
@@ -1368,13 +1477,25 @@ public class TxnHandler {
      // etc.) that should not interfere with this one.
      synchronized (lockLock) {
        Statement stmt = null;
+ ResultSet rs = null;
        try {
+ long txnid = rqst.getTxnid();
+ if (txnid > 0) {
+ // Heartbeat the transaction so we know it is valid and we avoid it timing out while we
+ // are locking.
+ heartbeatTxn(dbConn, txnid);
+ }
          stmt = dbConn.createStatement();

- // Get the next lock id.
- String s = "select nl_next from NEXT_LOCK_ID";
+ /** Get the next lock id.
+ * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
+ * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7,
+ * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks,
+ * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
+ * doesn't block on locks acquired later than one it's checking*/
+ String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
          LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
          if (!rs.next()) {
            LOG.debug("Going to rollback");
            dbConn.rollback();
@@ -1385,18 +1506,19 @@ public class TxnHandler {
          s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
          LOG.debug("Going to execute update <" + s + ">");
          stmt.executeUpdate(s);
- LOG.debug("Going to commit.");
- dbConn.commit();

- long txnid = rqst.getTxnid();
          if (txnid > 0) {
- // Heartbeat the transaction so we know it is valid and we avoid it timing out while we
- // are locking.
- heartbeatTxn(dbConn, txnid);
-
            // For each component in this lock request,
            // add an entry to the txn_components table
            // This must be done before HIVE_LOCKS is accessed
+
+ //Isolation note:
+ //the !wait option is not actually used anywhere. W/o that,
+ // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id
+ //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable.
+ //
+ // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long
+ //as check lock is at serializable (or any other way to make sure it's exclusive)
            for (LockComponent lc : rqst.getComponent()) {
              String dbName = lc.getDbname();
              String tblName = lc.getTablename();
@@ -1429,34 +1551,42 @@ public class TxnHandler {
              " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
              "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
              " values (" + extLockId + ", " +
- + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
+ + intLockId + "," + txnid + ", '" +
              dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
              + ", " + (partName == null ? "null" : "'" + partName + "'") +
- ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" +
+ ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " +
+ //for locks associated with a txn, we always heartbeat txn and timeout based on that
+ (isValidTxn(txnid) ? 0 : now) + ", '" +
              rqst.getUser() + "', '" + rqst.getHostname() + "')";
            LOG.debug("Going to execute update <" + s + ">");
            stmt.executeUpdate(s);
          }
- LockResponse rsp = checkLock(dbConn, extLockId, wait);
- if (!wait && rsp.getState() != LockState.ACQUIRED) {
- LOG.debug("Lock not acquired, going to rollback");
- dbConn.rollback();
- rsp = new LockResponse();
- rsp.setState(LockState.NOT_ACQUIRED);
- }
- return rsp;
+ /**to make txns shorter we could commit here and start a new txn for checkLock. This would
+ * require moving checkRetryable() down into here. Could we then run the part before this
+ * commit are READ_COMMITTED?*/
+ return checkLock(dbConn, extLockId);
        } catch (NoSuchLockException e) {
          // This should never happen, as we just added the lock id
          throw new MetaException("Couldn't find a lock we just created!");
        } finally {
+ close(rs);
          closeStmt(stmt);
        }
      }
    }
-
+ private static boolean isValidTxn(long txnId) {
+ return txnId != 0;
+ }
+ /**
+ * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
+ * all locks for given extLockId or none. Would be more efficient to update state on all locks
+ * at once. Semantics are the same since this is all part of the same txn@serializable.
+ *
+ * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+ * hl_lock_ext_id by only checking earlier locks.
+ */
    private LockResponse checkLock(Connection dbConn,
- long extLockId,
- boolean alwaysCommit)
+ long extLockId)
      throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
      List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
      LockResponse response = new LockResponse();
@@ -1609,19 +1739,15 @@ public class TxnHandler {
              case WAIT:
                if(!ignoreConflict(info, locks[i])) {
                  wait(dbConn, save);
- if (alwaysCommit) {
- // In the case where lockNoWait has been called we don't want to commit because
- // it's going to roll everything back. In every other case we want to commit here.
- LOG.debug("Going to commit");
- dbConn.commit();
- }
+ LOG.debug("Going to commit");
+ dbConn.commit();
                  response.setState(LockState.WAITING);
                  LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")");
                  return response;
                }
                //fall through to ACQUIRE
              case ACQUIRE:
- acquire(dbConn, stmt, extLockId, info.intLockId);
+ acquire(dbConn, stmt, extLockId, info);
                acquired = true;
                break;
              case KEEP_LOOKING:
@@ -1633,7 +1759,7 @@ public class TxnHandler {

          // If we've arrived here and we have not already acquired, it means there's nothing in the
          // way of the lock, so acquire the lock.
- if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
+ if (!acquired) acquire(dbConn, stmt, extLockId, info);
        }

        // We acquired all of the locks, so commit and return acquired.
@@ -1677,26 +1803,31 @@ public class TxnHandler {
      dbConn.rollback(save);
    }

- private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId)
+ private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo)
      throws SQLException, NoSuchLockException, MetaException {
      long now = getDbTime(dbConn);
      String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
- "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
- extLockId + " and hl_lock_int_id = " + intLockId;
+ //if lock is part of txn, heartbeat info is in txn record
+ "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
+ ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+ extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
      LOG.debug("Going to execute update <" + s + ">");
      int rc = stmt.executeUpdate(s);
      if (rc < 1) {
        LOG.debug("Going to rollback");
        dbConn.rollback();
        throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," +
- + intLockId + ")");
+ + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId));
      }
      // We update the database, but we don't commit because there may be other
      // locks together with this, and we only want to acquire one if we can
      // acquire all.
    }

- // Heartbeats on the lock table. This commits, so do not enter it with any state
+ /**
+ * Heartbeats on the lock table. This commits, so do not enter it with any state.
+ * Should not be called on a lock that belongs to transaction.
+ */
    private void heartbeatLock(Connection dbConn, long extLockId)
      throws NoSuchLockException, SQLException, MetaException {
      // If the lock id is 0, then there are no locks in this heartbeat
@@ -1731,7 +1862,6 @@ public class TxnHandler {
      try {
        stmt = dbConn.createStatement();
        long now = getDbTime(dbConn);
- ensureValidTxn(dbConn, txnid, stmt);
        String s = "update TXNS set txn_last_heartbeat = " + now +
          " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'";
        LOG.debug("Going to execute update <" + s + ">");
@@ -1742,10 +1872,6 @@ public class TxnHandler {
          dbConn.rollback();
          throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid));
        }
- //update locks for this txn to the same heartbeat
- s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
        LOG.debug("Going to commit");
        dbConn.commit();
      } finally {
@@ -1760,6 +1886,7 @@ public class TxnHandler {
      LOG.debug("Going to execute query <" + s + ">");
      ResultSet rs = stmt.executeQuery(s);
      if (!rs.next()) {
+ //todo: add LIMIT 1 instead of count - should be more efficient
        s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
        ResultSet rs2 = stmt.executeQuery(s);
        boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
@@ -1775,28 +1902,28 @@ public class TxnHandler {
        LOG.debug("Going to rollback");
        dbConn.rollback();
        throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) +
- " already aborted");//todo: add time of abort, which is not currently tracked
+ " already aborted");//todo: add time of abort, which is not currently tracked. Requires schema change
      }
    }

- // NEVER call this function without first calling heartbeat(long, long)
- private long getTxnIdFromLockId(Connection dbConn, long extLockId)
+ private Long getTxnIdFromLockId(Connection dbConn, long extLockId)
      throws NoSuchLockException, MetaException, SQLException {
      Statement stmt = null;
+ ResultSet rs = null;
      try {
        stmt = dbConn.createStatement();
        String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
          extLockId;
        LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
        if (!rs.next()) {
- throw new MetaException("This should never happen! We already " +
- "checked the lock existed but now we can't find it!");
+ return null;
        }
        long txnid = rs.getLong(1);
- LOG.debug("Return " + JavaUtils.txnIdToString(rs.wasNull() ? -1 : txnid));
- return (rs.wasNull() ? -1 : txnid);
+ LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid));
+ return txnid;
      } finally {
+ close(rs);
        closeStmt(stmt);
      }
    }
@@ -1832,14 +1959,13 @@ public class TxnHandler {
    // for read-only autoCommit=true statements. This does a commit,
    // and thus should be done before any calls to heartbeat that will leave
    // open transactions.
- private void timeOutLocks(Connection dbConn) {
+ private void timeOutLocks(Connection dbConn, long now) {
      Statement stmt = null;
      try {
- long now = getDbTime(dbConn);
        stmt = dbConn.createStatement();
        // Remove any timed out locks from the table.
        String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
- (now - timeout) + " and (hl_txnid = 0 or hl_txnid is NULL)";//when txnid is > 0, the lock is
+ (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is
        //associated with a txn and is handled by performTimeOuts()
        //want to avoid expiring locks for a txn w/o expiring the txn itself
        LOG.debug("Going to execute update <" + s + ">");
@@ -1891,6 +2017,8 @@ public class TxnHandler {
      }
    }
    /**
+ * Isolation Level Notes
+ * Plain: RC is OK
     * This will find transactions that have timed out and abort them.
     * Will also delete locks which are not associated with a transaction and have timed out
     * Tries to keep transactions (against metastore db) small to reduce lock contention.
@@ -1900,9 +2028,19 @@ public class TxnHandler {
      Statement stmt = null;
      ResultSet rs = null;
      try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ //We currently commit after selecting the TXNS to abort. So whether SERIALIZABLE
+ //READ_COMMITTED, the effect is the same. We could use FOR UPDATE on Select from TXNS
+ //and do the whole performTimeOuts() in a single huge transaction, but the only benefit
+ //would be to make sure someone cannot heartbeat one of these txns at the same time.
+ //The attempt to heartbeat would block and fail immediately after it's unblocked.
+ //With current (RC + multiple txns) implementation it is possible for someone to send
+ //heartbeat at the very end of the expire interval, and just after the Select from TXNS
+ //is made, in which case heartbeat will succeed but txn will still be Aborted.
+ //Solving this corner case is not worth the perf penalty. The client should heartbeat in a
+ //timely way.
        long now = getDbTime(dbConn);
- timeOutLocks(dbConn);
+ timeOutLocks(dbConn, now);
        while(true) {
          stmt = dbConn.createStatement();
          String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN +
@@ -1923,16 +2061,26 @@ public class TxnHandler {
              timedOutTxns.add(currentBatch);
            }
          } while(rs.next());
- close(rs, stmt, null);
          dbConn.commit();
+ close(rs, stmt, dbConn);
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ int numTxnsAborted = 0;
          for(List<Long> batchToAbort : timedOutTxns) {
- abortTxns(dbConn, batchToAbort);
- dbConn.commit();
- //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
- LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString());
+ if(abortTxns(dbConn, batchToAbort, now - timeout) == batchToAbort.size()) {
+ dbConn.commit();
+ numTxnsAborted += batchToAbort.size();
+ //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
+ LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString());
+ }
+ else {
+ //could not abort all txns in this batch - this may happen because in parallel with this
+ //operation there was activity on one of the txns in this batch (commit/abort/heartbeat)
+ //This is not likely but may happen if client experiences long pause between heartbeats or
+ //unusually long/extreme pauses between heartbeat() calls and other logic in checkLock(),
+ //lock(), etc.
+ dbConn.rollback();
+ }
          }
- int numTxnsAborted = (timedOutTxns.size() - 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE +
- timedOutTxns.get(timedOutTxns.size() - 1).size();
          LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout");
        }
      } catch (SQLException ex) {
@@ -2110,4 +2258,97 @@ public class TxnHandler {
    private static String getMessage(SQLException ex) {
      return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
    }
+ /**
+ * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc.
+ * Different DBs support different concurrency management options. This class relies on SELECT ... FOR UPDATE
+ * functionality. Where that is not available, SERIALIZABLE isolation is used.
+ * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that
+ * if FOR UPDATE is not available, must run operation at SERIALIZABLE.
+ */
+ private int getRequiredIsolationLevel() throws MetaException, SQLException {
+ if(dbProduct == null) {
+ Connection tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ determineDatabaseProduct(tmp);
+ closeDbConn(tmp);
+ }
+ switch (dbProduct) {
+ case DERBY:
+ return Connection.TRANSACTION_SERIALIZABLE;
+ case MYSQL:
+ case ORACLE:
+ case POSTGRES:
+ case SQLSERVER:
+ return Connection.TRANSACTION_READ_COMMITTED;
+ default:
+ String msg = "Unrecognized database product name <" + dbProduct + ">";
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
+ /**
+ * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent
+ * construct. If the DB doesn't support, return original select. This method must always
+ * agree with {@link #getRequiredIsolationLevel()}
+ */
+ private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException {
+ DatabaseProduct prod = determineDatabaseProduct(dbConn);
+ switch (prod) {
+ case DERBY:
+ //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
+ //sadly in Derby, FOR UPDATE doesn't meant what it should
+ return selectStatement;
+ case MYSQL:
+ //http://dev.mysql.com/doc/refman/5.7/en/select.html
+ case ORACLE:
+ //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
+ case POSTGRES:
+ //http://www.postgresql.org/docs/9.0/static/sql-select.html
+ return selectStatement + " for update";
+ case SQLSERVER:
+ //https://msdn.microsoft.com/en-us/library/ms189499.aspx
+ //https://msdn.microsoft.com/en-us/library/ms187373.aspx
+ return selectStatement + " with(updlock)";
+ default:
+ String msg = "Unrecognized database product name <" + prod + ">";
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
+ /**
+ * the caller is expected to retry if this fails
+ *
+ * @return
+ * @throws SQLException
+ * @throws MetaException
+ */
+ private long generateNewExtLockId() throws SQLException, MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(getRequiredIsolationLevel());
+ stmt = dbConn.createStatement();
+
+ // Get the next lock id.
+ String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_lock_id");
+ }
+ long extLockId = rs.getLong(1);
+ s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ LOG.debug("Going to commit.");
+ dbConn.commit();
+ return extLockId;
+ }
+ finally {
+ close(rs, stmt, dbConn);
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index ad99427..32c3d80 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -264,43 +264,6 @@ public class TestCompactionTxnHandler {
    }

    @Test
- public void testLockNoWait() throws Exception {
- // Test that we can acquire the lock alone
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lockNoWait(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.unlock(new UnlockRequest(res.getLockid()));
-
- // test that another lock blocks it
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertEquals(LockState.ACQUIRED, res.getState());
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lockNoWait(req);
- assertEquals(LockState.NOT_ACQUIRED, res.getState());
- assertEquals(1, TxnDbUtil.findNumCurrentLocks());
- }
-
- @Test
    public void testFindPotentialCompactions() throws Exception {
      // Test that committing unlocks
      long txnid = openTxn();

http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 6461435..e53daae 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1067,8 +1067,8 @@ public class TestTxnHandler {
          assertNull(lock.getPartname());
          assertEquals(LockState.ACQUIRED, lock.getState());
          assertEquals(LockType.EXCLUSIVE, lock.getType());
- assertTrue(begining <= lock.getLastheartbeat() &&
- System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+ lock.getTxnid() != 0);
          assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
              + " and " + System.currentTimeMillis(),
              begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1082,8 +1082,8 @@ public class TestTxnHandler {
          assertNull(lock.getPartname());
          assertEquals(LockState.WAITING, lock.getState());
          assertEquals(LockType.SHARED_READ, lock.getType());
- assertTrue(begining <= lock.getLastheartbeat() &&
- System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+ lock.getTxnid() != 0);
          assertEquals(0, lock.getAcquiredat());
          assertEquals("me", lock.getUser());
          assertEquals("localhost", lock.getHostname());
@@ -1095,7 +1095,7 @@ public class TestTxnHandler {
          assertEquals("yourpartition", lock.getPartname());
          assertEquals(LockState.ACQUIRED, lock.getState());
          assertEquals(LockType.SHARED_WRITE, lock.getType());
- assertTrue(begining <= lock.getLastheartbeat() &&
+ assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
              System.currentTimeMillis() >= lock.getLastheartbeat());
          assertTrue(begining <= lock.getAcquiredat() &&
              System.currentTimeMillis() >= lock.getAcquiredat());

http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 33c6ab5..c134653 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1028,7 +1028,7 @@ public class Driver implements CommandProcessor {
          // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction}
          //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
          //for each statement.
- recordValidTxns();
+ recordValidTxns();//todo: we should only need to do this for RO query if it has ACID resources in it.
        }

        return 0;

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 72 of 74 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedNov 7, '15 at 4:23p
activeNov 18, '15 at 1:03a
posts74
users13
websitehive.apache.org

People

Translate

site design / logo © 2021 Grokbase