FAQ
Author: ekoifman
Date: Sat Jan 17 02:54:40 2015
New Revision: 1652558

URL: http://svn.apache.org/r1652558
Log:
HIVE-9390 Enhance retry logic wrt DB access in TxnHandler (Eugene Koifman reviewed by Alan Gates)

Modified:
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
     hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1652558&r1=1652557&r2=1652558&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Sat Jan 17 02:54:40 2015
@@ -5377,126 +5377,74 @@ public class HiveMetaStore extends Thrif
      // Transaction and locking methods
      @Override
      public GetOpenTxnsResponse get_open_txns() throws TException {
- try {
- return getTxnHandler().getOpenTxns();
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().getOpenTxns();
      }

      // Transaction and locking methods
      @Override
      public GetOpenTxnsInfoResponse get_open_txns_info() throws TException {
- try {
- return getTxnHandler().getOpenTxnsInfo();
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().getOpenTxnsInfo();
      }

      @Override
      public OpenTxnsResponse open_txns(OpenTxnRequest rqst) throws TException {
- try {
- return getTxnHandler().openTxns(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().openTxns(rqst);
      }

      @Override
      public void abort_txn(AbortTxnRequest rqst) throws NoSuchTxnException, TException {
- try {
- getTxnHandler().abortTxn(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().abortTxn(rqst);
      }

      @Override
      public void commit_txn(CommitTxnRequest rqst)
          throws NoSuchTxnException, TxnAbortedException, TException {
- try {
- getTxnHandler().commitTxn(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().commitTxn(rqst);
      }

      @Override
      public LockResponse lock(LockRequest rqst)
          throws NoSuchTxnException, TxnAbortedException, TException {
- try {
- return getTxnHandler().lock(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().lock(rqst);
      }

      @Override
      public LockResponse check_lock(CheckLockRequest rqst)
          throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException {
- try {
- return getTxnHandler().checkLock(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().checkLock(rqst);
      }

      @Override
      public void unlock(UnlockRequest rqst)
          throws NoSuchLockException, TxnOpenException, TException {
- try {
- getTxnHandler().unlock(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().unlock(rqst);
      }

      @Override
      public ShowLocksResponse show_locks(ShowLocksRequest rqst) throws TException {
- try {
- return getTxnHandler().showLocks(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().showLocks(rqst);
      }

      @Override
      public void heartbeat(HeartbeatRequest ids)
          throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException {
- try {
- getTxnHandler().heartbeat(ids);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().heartbeat(ids);
      }

      @Override
      public HeartbeatTxnRangeResponse heartbeat_txn_range(HeartbeatTxnRangeRequest rqst)
        throws TException {
- try {
- return getTxnHandler().heartbeatTxnRange(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().heartbeatTxnRange(rqst);
      }

      @Override
      public void compact(CompactionRequest rqst) throws TException {
- try {
- getTxnHandler().compact(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().compact(rqst);
      }

      @Override
      public ShowCompactResponse show_compact(ShowCompactRequest rqst) throws TException {
- try {
- return getTxnHandler().showCompact(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().showCompact(rqst);
      }

      @Override

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1652558&r1=1652557&r2=1652558&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Sat Jan 17 02:54:40 2015
@@ -52,51 +52,58 @@ public class CompactionTxnHandler extend
     * or runAs set since these are only potential compactions not actual ones.
     */
    public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
      Set<CompactionInfo> response = new HashSet<CompactionInfo>();
      Statement stmt = null;
      try {
- stmt = dbConn.createStatement();
- // Check for completed transactions
- String s = "select distinct ctc_database, ctc_table, " +
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ // Check for completed transactions
+ String s = "select distinct ctc_database, ctc_table, " +
            "ctc_partition from COMPLETED_TXN_COMPONENTS";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString(1);
- info.tableName = rs.getString(2);
- info.partName = rs.getString(3);
- response.add(info);
- }
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ response.add(info);
+ }

- // Check for aborted txns
- s = "select tc_database, tc_table, tc_partition " +
+ // Check for aborted txns
+ s = "select tc_database, tc_table, tc_partition " +
            "from TXNS, TXN_COMPONENTS " +
            "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
            "group by tc_database, tc_table, tc_partition " +
            "having count(*) > " + maxAborted;

- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString(1);
- info.tableName = rs.getString(2);
- info.partName = rs.getString(3);
- info.tooManyAborts = true;
- response.add(info);
- }
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ info.tooManyAborts = true;
+ response.add(info);
+ }

- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e) {
- LOG.error("Unable to connect to transaction database " + e.getMessage());
- } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database " + e.getMessage());
+ checkRetryable(dbConn, e, "findPotentialCompactions");
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ return response;
+ }
+ catch (RetryException e) {
+ return findPotentialCompactions(maxAborted);
      }
- return response;
    }

    /**
@@ -107,35 +114,31 @@ public class CompactionTxnHandler extend
     */
    public void setRunAs(long cq_id, String user) throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
        Statement stmt = null;
        try {
- stmt = dbConn.createStatement();
- String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
- LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) != 1) {
- LOG.error("Unable to update compaction record");
- LOG.debug("Going to rollback");
- dbConn.rollback();
- }
- LOG.debug("Going to commit");
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Unable to update compaction queue, " + e.getMessage());
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "setRunAs");
- } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
- }
- } catch (DeadlockException e) {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
+ LOG.debug("Going to execute update <" + s + ">");
+ if (stmt.executeUpdate(s) != 1) {
+ LOG.error("Unable to update compaction record");
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "setRunAs");
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ } catch (RetryException e) {
        setRunAs(cq_id, user);
- } finally {
- deadlockCnt = 0;
      }
    }

@@ -147,14 +150,15 @@ public class CompactionTxnHandler extend
     */
    public CompactionInfo findNextToCompact(String workerId) throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
        CompactionInfo info = new CompactionInfo();

        Statement stmt = null;
        try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          String s = "select cq_id, cq_database, cq_table, cq_partition, " +
- "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
+ "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
          LOG.debug("Going to execute query <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
          if (!rs.next()) {
@@ -175,7 +179,7 @@ public class CompactionTxnHandler extend
          // Now, update this record as being worked on by this worker.
          long now = getDbTime(dbConn);
          s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
- "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
+ "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
          LOG.debug("Going to execute update <" + s + ">");
          if (stmt.executeUpdate(s) != 1) {
            LOG.error("Unable to update compaction record");
@@ -187,38 +191,34 @@ public class CompactionTxnHandler extend
          return info;
        } catch (SQLException e) {
          LOG.error("Unable to select next element for compaction, " + e.getMessage());
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "findNextToCompact");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findNextToCompact");
          throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
          closeStmt(stmt);
        }
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
        return findNextToCompact(workerId);
- } finally {
- deadlockCnt = 0;
      }
    }

    /**
     * This will mark an entry in the queue as compacted
     * and put it in the ready to clean state.
- * @param info info on the compaciton entry to mark as compacted.
+ * @param info info on the compaction entry to mark as compacted.
     */
    public void markCompacted(CompactionInfo info) throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
        Statement stmt = null;
        try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
- "cq_worker_id = null where cq_id = " + info.id;
+ "cq_worker_id = null where cq_id = " + info.id;
          LOG.debug("Going to execute update <" + s + ">");
          if (stmt.executeUpdate(s) != 1) {
            LOG.error("Unable to update compaction record");
@@ -228,23 +228,18 @@ public class CompactionTxnHandler extend
          LOG.debug("Going to commit");
          dbConn.commit();
        } catch (SQLException e) {
- try {
- LOG.error("Unable to update compaction queue " + e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "markCompacted");
+ LOG.error("Unable to update compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "markCompacted");
          throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
          closeStmt(stmt);
        }
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
        markCompacted(info);
- } finally {
- deadlockCnt = 0;
      }
    }

@@ -254,45 +249,48 @@ public class CompactionTxnHandler extend
     * @return information on the entry in the queue.
     */
    public List<CompactionInfo> findReadyToClean() throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
      List<CompactionInfo> rc = new ArrayList<CompactionInfo>();

      Statement stmt = null;
      try {
- stmt = dbConn.createStatement();
- String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select cq_id, cq_database, cq_table, cq_partition, " +
            "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.id = rs.getLong(1);
- info.dbname = rs.getString(2);
- info.tableName = rs.getString(3);
- info.partName = rs.getString(4);
- switch (rs.getString(5).charAt(0)) {
- case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
- case MINOR_TYPE: info.type = CompactionType.MINOR; break;
- default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+ case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+ default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ }
+ info.runAs = rs.getString(6);
+ rc.add(info);
          }
- info.runAs = rs.getString(6);
- rc.add(info);
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- return rc;
- } catch (SQLException e) {
- LOG.error("Unable to select next element for cleaning, " + e.getMessage());
- try {
          LOG.debug("Going to rollback");
          dbConn.rollback();
- } catch (SQLException e1) {
- }
- throw new MetaException("Unable to connect to transaction database " +
+ return rc;
+ } catch (SQLException e) {
+ LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findReadyToClean");
+ throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
- } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ } catch (RetryException e) {
+ return findReadyToClean();
      }
    }

@@ -303,9 +301,10 @@ public class CompactionTxnHandler extend
     */
    public void markCleaned(CompactionInfo info) throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
        Statement stmt = null;
        try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
          LOG.debug("Going to execute update <" + s + ">");
@@ -318,20 +317,20 @@ public class CompactionTxnHandler extend
          // Remove entries from completed_txn_components as well, so we don't start looking there
          // again.
          s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " +
- "ctc_table = '" + info.tableName + "'";
+ "ctc_table = '" + info.tableName + "'";
          if (info.partName != null) {
            s += " and ctc_partition = '" + info.partName + "'";
          }
          LOG.debug("Going to execute update <" + s + ">");
          if (stmt.executeUpdate(s) < 1) {
            LOG.error("Expected to remove at least one row from completed_txn_components when " +
- "marking compaction entry as clean!");
+ "marking compaction entry as clean!");
          }


          s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
- TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
- info.tableName + "'";
+ TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
+ info.tableName + "'";
          if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
          LOG.debug("Going to execute update <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
@@ -371,23 +370,18 @@ public class CompactionTxnHandler extend
          LOG.debug("Going to commit");
          dbConn.commit();
        } catch (SQLException e) {
- try {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "markCleaned");
+ LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "markCleaned");
          throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
          closeStmt(stmt);
        }
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
        markCleaned(info);
- } finally {
- deadlockCnt = 0;
      }
    }

@@ -396,13 +390,14 @@ public class CompactionTxnHandler extend
     */
    public void cleanEmptyAbortedTxns() throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
        Statement stmt = null;
        try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          String s = "select txn_id from TXNS where " +
- "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
- "txn_state = '" + TXN_ABORTED + "'";
+ "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
+ "txn_state = '" + TXN_ABORTED + "'";
          LOG.debug("Going to execute query <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
          Set<Long> txnids = new HashSet<Long>();
@@ -425,21 +420,16 @@ public class CompactionTxnHandler extend
        } catch (SQLException e) {
          LOG.error("Unable to delete from txns table " + e.getMessage());
          LOG.debug("Going to rollback");
- try {
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
          throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
          closeStmt(stmt);
        }
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
        cleanEmptyAbortedTxns();
- } finally {
- deadlockCnt = 0;
      }
    }

@@ -454,13 +444,14 @@ public class CompactionTxnHandler extend
     */
    public void revokeFromLocalWorkers(String hostname) throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
        Statement stmt = null;
        try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
- + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
- + hostname + "%'";
+ + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
+ + hostname + "%'";
          LOG.debug("Going to execute update <" + s + ">");
          // It isn't an error if the following returns no rows, as the local workers could have died
          // with nothing assigned to them.
@@ -468,24 +459,19 @@ public class CompactionTxnHandler extend
          LOG.debug("Going to commit");
          dbConn.commit();
        } catch (SQLException e) {
- try {
- LOG.error("Unable to change dead worker's records back to initiated state " +
- e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "revokeFromLocalWorkers");
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "revokeFromLocalWorkers");
          throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
          closeStmt(stmt);
        }
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
        revokeFromLocalWorkers(hostname);
- } finally {
- deadlockCnt = 0;
      }
    }

@@ -500,14 +486,15 @@ public class CompactionTxnHandler extend
     */
    public void revokeTimedoutWorkers(long timeout) throws MetaException {
      try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- long latestValidStart = getDbTime(dbConn) - timeout;
+ Connection dbConn = null;
        Statement stmt = null;
        try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ long latestValidStart = getDbTime(dbConn) - timeout;
          stmt = dbConn.createStatement();
          String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
- + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
- + latestValidStart;
+ + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
+ + latestValidStart;
          LOG.debug("Going to execute update <" + s + ">");
          // It isn't an error if the following returns no rows, as the local workers could have died
          // with nothing assigned to them.
@@ -515,24 +502,19 @@ public class CompactionTxnHandler extend
          LOG.debug("Going to commit");
          dbConn.commit();
        } catch (SQLException e) {
- try {
- LOG.error("Unable to change dead worker's records back to initiated state " +
- e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "revokeTimedoutWorkers");
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "revokeTimedoutWorkers");
          throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
        } finally {
          closeDbConn(dbConn);
          closeStmt(stmt);
        }
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
        revokeTimedoutWorkers(timeout);
- } finally {
- deadlockCnt = 0;
      }
    }

@@ -543,53 +525,55 @@ public class CompactionTxnHandler extend
     * @throws MetaException
     */
    public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
      Statement stmt = null;
      ResultSet rs = null;
      try {
- String quote = getIdentifierQuoteString(dbConn);
- stmt = dbConn.createStatement();
- StringBuilder bldr = new StringBuilder();
- bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ String quote = getIdentifierQuoteString(dbConn);
+ stmt = dbConn.createStatement();
+ StringBuilder bldr = new StringBuilder();
+ bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
            .append(" FROM ")
            .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
- .append(quote)
+ .append(quote)
            .append(" WHERE ")
            .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname)
- .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
- .append(" = '").append(ci.tableName).append("'");
- if (ci.partName != null) {
- bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
+ .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
+ .append(" = '").append(ci.tableName).append("'");
+ if (ci.partName != null) {
+ bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
              .append(ci.partName).append("'");
- }
- String s = bldr.toString();
+ }
+ String s = bldr.toString();

        /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
            "PART_COL_STATS")
           + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
          + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
- LOG.debug("Going to execute <" + s + ">");
- rs = stmt.executeQuery(s);
- List<String> columns = new ArrayList<String>();
- while(rs.next()) {
- columns.add(rs.getString(1));
- }
- LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
- (ci.partName == null ? "" : "/" + ci.partName));
- dbConn.commit();
- return columns;
- } catch (SQLException e) {
- try {
+ LOG.debug("Going to execute <" + s + ">");
+ rs = stmt.executeQuery(s);
+ List<String> columns = new ArrayList<String>();
+ while (rs.next()) {
+ columns.add(rs.getString(1));
+ }
+ LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+ (ci.partName == null ? "" : "/" + ci.partName));
+ dbConn.commit();
+ return columns;
+ } catch (SQLException e) {
          LOG.error("Failed to find columns to analyze stats on for " + ci.tableName +
- (ci.partName == null ? "" : "/" + ci.partName), e);
- dbConn.rollback();
- } catch (SQLException e1) {
- //nothing we can do here
+ (ci.partName == null ? "" : "/" + ci.partName), e);
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findColumnsWithStats");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
        }
- throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
- } finally {
- close(rs, stmt, dbConn);
+ } catch (RetryException ex) {
+ return findColumnsWithStats(ci);
      }
    }
  }

Search Discussions

  • Ekoifman at Jan 17, 2015 at 2:54 am
    Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1652558&r1=1652557&r2=1652558&view=diff
    ==============================================================================
    --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
    +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Sat Jan 17 02:54:40 2015
    @@ -77,7 +77,7 @@ public class TxnHandler {
        static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());

        static private DataSource connPool;
    - private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock
    + private final static Object lockLock = new Object(); // Random object to lock on for the lock
        // method

        /**
    @@ -87,10 +87,13 @@ public class TxnHandler {
        protected HiveConf conf;
        protected DatabaseProduct dbProduct;

    - // Transaction timeout, in milliseconds.
    + // (End user) Transaction timeout, in milliseconds.
        private long timeout;

        private String identifierQuoteString; // quotes to use for quoting tables, where necessary
    + private final long retryInterval;
    + private final int retryLimit;
    + private int retryNum;

        // DEADLOCK DETECTION AND HANDLING
        // A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock
    @@ -125,113 +128,122 @@ public class TxnHandler {
          timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
          deadlockCnt = 0;
          buildJumpTable();
    + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
    + retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
    +
        }

        public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
    - // We need to figure out the current transaction number and the list of
    - // open transactions. To avoid needing a transaction on the underlying
    - // database we'll look at the current transaction number first. If it
    - // subsequently shows up in the open list that's ok.
    - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    - Statement stmt = null;
          try {
    - stmt = dbConn.createStatement();
    - String s = "select ntxn_next - 1 from NEXT_TXN_ID";
    - LOG.debug("Going to execute query <" + s + ">");
    - ResultSet rs = stmt.executeQuery(s);
    - if (!rs.next()) {
    - throw new MetaException("Transaction tables not properly " +
    + // We need to figure out the current transaction number and the list of
    + // open transactions. To avoid needing a transaction on the underlying
    + // database we'll look at the current transaction number first. If it
    + // subsequently shows up in the open list that's ok.
    + Connection dbConn = null;
    + Statement stmt = null;
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + String s = "select ntxn_next - 1 from NEXT_TXN_ID";
    + LOG.debug("Going to execute query <" + s + ">");
    + ResultSet rs = stmt.executeQuery(s);
    + if (!rs.next()) {
    + throw new MetaException("Transaction tables not properly " +
                  "initialized, no record found in next_txn_id");
    - }
    - long hwm = rs.getLong(1);
    - if (rs.wasNull()) {
    - throw new MetaException("Transaction tables not properly " +
    + }
    + long hwm = rs.getLong(1);
    + if (rs.wasNull()) {
    + throw new MetaException("Transaction tables not properly " +
                  "initialized, null record found in next_txn_id");
    - }
    -
    - List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
    - s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
    - LOG.debug("Going to execute query<" + s + ">");
    - rs = stmt.executeQuery(s);
    - while (rs.next()) {
    - char c = rs.getString(2).charAt(0);
    - TxnState state;
    - switch (c) {
    - case TXN_ABORTED:
    - state = TxnState.ABORTED;
    - break;
    + }

    - case TXN_OPEN:
    - state = TxnState.OPEN;
    - break;
    + List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
    + s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
    + LOG.debug("Going to execute query<" + s + ">");
    + rs = stmt.executeQuery(s);
    + while (rs.next()) {
    + char c = rs.getString(2).charAt(0);
    + TxnState state;
    + switch (c) {
    + case TXN_ABORTED:
    + state = TxnState.ABORTED;
    + break;
    +
    + case TXN_OPEN:
    + state = TxnState.OPEN;
    + break;

    - default:
    - throw new MetaException("Unexpected transaction state " + c +
    + default:
    + throw new MetaException("Unexpected transaction state " + c +
                      " found in txns table");
    + }
    + txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
              }
    - txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
    - }
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - return new GetOpenTxnsInfoResponse(hwm, txnInfo);
    - } catch (SQLException e) {
    - try {
              LOG.debug("Going to rollback");
              dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - throw new MetaException("Unable to select from transaction database, "
    + return new GetOpenTxnsInfoResponse(hwm, txnInfo);
    + } catch (SQLException e) {
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "getOpenTxnsInfo");
    + throw new MetaException("Unable to select from transaction database: " + getMessage(e)
                + StringUtils.stringifyException(e));
    - } finally {
    - closeStmt(stmt);
    - closeDbConn(dbConn);
    + } finally {
    + closeStmt(stmt);
    + closeDbConn(dbConn);
    + }
    + } catch (RetryException e) {
    + return getOpenTxnsInfo();
          }
        }

        public GetOpenTxnsResponse getOpenTxns() throws MetaException {
    - // We need to figure out the current transaction number and the list of
    - // open transactions. To avoid needing a transaction on the underlying
    - // database we'll look at the current transaction number first. If it
    - // subsequently shows up in the open list that's ok.
    - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    - Statement stmt = null;
          try {
    - timeOutTxns(dbConn);
    - stmt = dbConn.createStatement();
    - String s = "select ntxn_next - 1 from NEXT_TXN_ID";
    - LOG.debug("Going to execute query <" + s + ">");
    - ResultSet rs = stmt.executeQuery(s);
    - if (!rs.next()) {
    - throw new MetaException("Transaction tables not properly " +
    + // We need to figure out the current transaction number and the list of
    + // open transactions. To avoid needing a transaction on the underlying
    + // database we'll look at the current transaction number first. If it
    + // subsequently shows up in the open list that's ok.
    + Connection dbConn = null;
    + Statement stmt = null;
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + timeOutTxns(dbConn);
    + stmt = dbConn.createStatement();
    + String s = "select ntxn_next - 1 from NEXT_TXN_ID";
    + LOG.debug("Going to execute query <" + s + ">");
    + ResultSet rs = stmt.executeQuery(s);
    + if (!rs.next()) {
    + throw new MetaException("Transaction tables not properly " +
                  "initialized, no record found in next_txn_id");
    - }
    - long hwm = rs.getLong(1);
    - if (rs.wasNull()) {
    - throw new MetaException("Transaction tables not properly " +
    + }
    + long hwm = rs.getLong(1);
    + if (rs.wasNull()) {
    + throw new MetaException("Transaction tables not properly " +
                  "initialized, null record found in next_txn_id");
    - }
    + }

    - Set<Long> openList = new HashSet<Long>();
    - s = "select txn_id from TXNS";
    - LOG.debug("Going to execute query<" + s + ">");
    - rs = stmt.executeQuery(s);
    - while (rs.next()) {
    - openList.add(rs.getLong(1));
    - }
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - return new GetOpenTxnsResponse(hwm, openList);
    - } catch (SQLException e) {
    - try {
    + Set<Long> openList = new HashSet<Long>();
    + s = "select txn_id from TXNS";
    + LOG.debug("Going to execute query<" + s + ">");
    + rs = stmt.executeQuery(s);
    + while (rs.next()) {
    + openList.add(rs.getLong(1));
    + }
              LOG.debug("Going to rollback");
              dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - throw new MetaException("Unable to select from transaction database, "
    + return new GetOpenTxnsResponse(hwm, openList);
    + } catch (SQLException e) {
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "getOpenTxns");
    + throw new MetaException("Unable to select from transaction database, "
                + StringUtils.stringifyException(e));
    - } finally {
    - closeStmt(stmt);
    - closeDbConn(dbConn);
    + } finally {
    + closeStmt(stmt);
    + closeDbConn(dbConn);
    + }
    + } catch (RetryException e) {
    + return getOpenTxns();
          }
        }

    @@ -259,12 +271,13 @@ public class TxnHandler {
        public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
          int numTxns = rqst.getNum_txns();
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            Statement stmt = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              // Make sure the user has not requested an insane amount of txns.
              int maxTxns = HiveConf.getIntVar(conf,
    - HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
    + HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
              if (numTxns > maxTxns) numTxns = maxTxns;

              stmt = dbConn.createStatement();
    @@ -273,7 +286,7 @@ public class TxnHandler {
              ResultSet rs = stmt.executeQuery(s);
              if (!rs.next()) {
                throw new MetaException("Transaction database not properly " +
    - "configured, can't find next transaction id.");
    + "configured, can't find next transaction id.");
              }
              long first = rs.getLong(1);
              s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
    @@ -281,8 +294,8 @@ public class TxnHandler {
              stmt.executeUpdate(s);
              long now = getDbTime(dbConn);
              s = "insert into TXNS (txn_id, txn_state, txn_started, " +
    - "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " +
    - now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
    + "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " +
    + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
              LOG.debug("Going to prepare statement <" + s + ">");
              PreparedStatement ps = dbConn.prepareStatement(s);
              List<Long> txnIds = new ArrayList<Long>(numTxns);
    @@ -296,30 +309,26 @@ public class TxnHandler {
              dbConn.commit();
              return new OpenTxnsResponse(txnIds);
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "openTxns");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "openTxns");
              throw new MetaException("Unable to select from transaction database "
                + StringUtils.stringifyException(e));
            } finally {
              closeStmt(stmt);
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            return openTxns(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
          long txnid = rqst.getTxnid();
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              List<Long> txnids = new ArrayList<Long>(1);
              txnids.add(txnid);
              if (abortTxns(dbConn, txnids) != 1) {
    @@ -331,31 +340,27 @@ public class TxnHandler {
              LOG.debug("Going to commit");
              dbConn.commit();
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "abortTxn");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "abortTxn");
              throw new MetaException("Unable to update transaction database "
                + StringUtils.stringifyException(e));
            } finally {
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            abortTxn(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public void commitTxn(CommitTxnRequest rqst)
    - throws NoSuchTxnException, TxnAbortedException, MetaException {
    + throws NoSuchTxnException, TxnAbortedException, MetaException {
          long txnid = rqst.getTxnid();
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            Statement stmt = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              stmt = dbConn.createStatement();
              // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to
              // commit it, but it does two things. One, it makes sure the transaction is still valid.
    @@ -367,11 +372,11 @@ public class TxnHandler {
              // Move the record from txn_components into completed_txn_components so that the compactor
              // knows where to look to compact.
              String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
    - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
    + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
              LOG.debug("Going to execute insert <" + s + ">");
              if (stmt.executeUpdate(s) < 1) {
                LOG.warn("Expected to move at least one record from txn_components to " +
    - "completed_txn_components when committing txn!");
    + "completed_txn_components when committing txn!");
              }

              // Always access TXN_COMPONENTS before HIVE_LOCKS;
    @@ -388,80 +393,68 @@ public class TxnHandler {
              LOG.debug("Going to commit");
              dbConn.commit();
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "commitTxn");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "commitTxn");
              throw new MetaException("Unable to update transaction database "
                + StringUtils.stringifyException(e));
            } finally {
              closeStmt(stmt);
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            commitTxn(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public LockResponse lock(LockRequest rqst)
    - throws NoSuchTxnException, TxnAbortedException, MetaException {
    + throws NoSuchTxnException, TxnAbortedException, MetaException {
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              return lock(dbConn, rqst, true);
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "lock");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "lock");
              throw new MetaException("Unable to update transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            return lock(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public LockResponse lockNoWait(LockRequest rqst)
    - throws NoSuchTxnException, TxnAbortedException, MetaException {
    + throws NoSuchTxnException, TxnAbortedException, MetaException {
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              return lock(dbConn, rqst, false);
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "lockNoWait");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "lockNoWait");
              throw new MetaException("Unable to update transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            return lockNoWait(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public LockResponse checkLock(CheckLockRequest rqst)
    - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
    + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              long extLockId = rqst.getLockid();
              // Clean up timed out locks
              timeOutLocks(dbConn);
    @@ -474,31 +467,27 @@ public class TxnHandler {
              if (txnid > 0) heartbeatTxn(dbConn, txnid);
              return checkLock(dbConn, extLockId, true);
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "checkLock");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "checkLock");
              throw new MetaException("Unable to update transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            return checkLock(rqst);
    - } finally {
    - deadlockCnt = 0;
          }

        }

        public void unlock(UnlockRequest rqst)
    - throws NoSuchLockException, TxnOpenException, MetaException {
    + throws NoSuchLockException, TxnOpenException, MetaException {
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            Statement stmt = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              // Odd as it seems, we need to heartbeat first because this touches the
              // lock table and assures that our locks our still valid. If they are
              // not, this will throw an exception and the heartbeat will fail.
    @@ -512,8 +501,8 @@ public class TxnHandler {
                LOG.debug("Going to rollback");
                dbConn.rollback();
                String msg = "Unlocking locks associated with transaction" +
    - " not permitted. Lockid " + extLockId + " is associated with " +
    - "transaction " + txnid;
    + " not permitted. Lockid " + extLockId + " is associated with " +
    + "transaction " + txnid;
                LOG.error(msg);
                throw new TxnOpenException(msg);
              }
    @@ -529,97 +518,96 @@ public class TxnHandler {
              LOG.debug("Going to commit");
              dbConn.commit();
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "unlock");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "unlock");
              throw new MetaException("Unable to update transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeStmt(stmt);
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            unlock(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    - ShowLocksResponse rsp = new ShowLocksResponse();
    - List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
    - Statement stmt = null;
          try {
    - stmt = dbConn.createStatement();
    + Connection dbConn = null;
    + ShowLocksResponse rsp = new ShowLocksResponse();
    + List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
    + Statement stmt = null;
    + try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();

    - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
    + String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
                "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS";
    - LOG.debug("Doing to execute query <" + s + ">");
    - ResultSet rs = stmt.executeQuery(s);
    - while (rs.next()) {
    - ShowLocksResponseElement e = new ShowLocksResponseElement();
    - e.setLockid(rs.getLong(1));
    - long txnid = rs.getLong(2);
    - if (!rs.wasNull()) e.setTxnid(txnid);
    - e.setDbname(rs.getString(3));
    - e.setTablename(rs.getString(4));
    - String partition = rs.getString(5);
    - if (partition != null) e.setPartname(partition);
    - switch (rs.getString(6).charAt(0)) {
    - case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
    - case LOCK_WAITING: e.setState(LockState.WAITING); break;
    - default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
    - }
    - switch (rs.getString(7).charAt(0)) {
    - case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
    - case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
    - case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
    - default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
    - }
    - e.setLastheartbeat(rs.getLong(8));
    - long acquiredAt = rs.getLong(9);
    - if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
    - e.setUser(rs.getString(10));
    - e.setHostname(rs.getString(11));
    - elems.add(e);
    - }
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e) {
    - throw new MetaException("Unable to select from transaction database " +
    + LOG.debug("Doing to execute query <" + s + ">");
    + ResultSet rs = stmt.executeQuery(s);
    + while (rs.next()) {
    + ShowLocksResponseElement e = new ShowLocksResponseElement();
    + e.setLockid(rs.getLong(1));
    + long txnid = rs.getLong(2);
    + if (!rs.wasNull()) e.setTxnid(txnid);
    + e.setDbname(rs.getString(3));
    + e.setTablename(rs.getString(4));
    + String partition = rs.getString(5);
    + if (partition != null) e.setPartname(partition);
    + switch (rs.getString(6).charAt(0)) {
    + case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
    + case LOCK_WAITING: e.setState(LockState.WAITING); break;
    + default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
    + }
    + switch (rs.getString(7).charAt(0)) {
    + case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
    + case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
    + case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
    + default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
    + }
    + e.setLastheartbeat(rs.getLong(8));
    + long acquiredAt = rs.getLong(9);
    + if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
    + e.setUser(rs.getString(10));
    + e.setHostname(rs.getString(11));
    + elems.add(e);
    + }
    + LOG.debug("Going to rollback");
    + dbConn.rollback();
    + } catch (SQLException e) {
    + checkRetryable(dbConn, e, "showLocks");
    + throw new MetaException("Unable to select from transaction database " +
                StringUtils.stringifyException(e));
    - } finally {
    - closeStmt(stmt);
    - closeDbConn(dbConn);
    + } finally {
    + closeStmt(stmt);
    + closeDbConn(dbConn);
    + }
    + rsp.setLocks(elems);
    + return rsp;
    + } catch (RetryException e) {
    + return showLocks(rqst);
          }
    - rsp.setLocks(elems);
    - return rsp;
        }

        public void heartbeat(HeartbeatRequest ids)
    - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
    + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              heartbeatLock(dbConn, ids.getLockid());
              heartbeatTxn(dbConn, ids.getTxnid());
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "heartbeat");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "heartbeat");
              throw new MetaException("Unable to select from transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            heartbeat(ids);
          } finally {
            deadlockCnt = 0;
    @@ -627,15 +615,16 @@ public class TxnHandler {
        }

        public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
    - throws MetaException {
    + throws MetaException {
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
            Set<Long> nosuch = new HashSet<Long>();
            Set<Long> aborted = new HashSet<Long>();
            rsp.setNosuch(nosuch);
            rsp.setAborted(aborted);
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
                try {
                  heartbeatTxn(dbConn, txn);
    @@ -647,18 +636,15 @@ public class TxnHandler {
              }
              return rsp;
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "heartbeatTxnRange");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "heartbeatTxnRange");
              throw new MetaException("Unable to select from transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            return heartbeatTxnRange(rqst);
          }
        }
    @@ -666,9 +652,10 @@ public class TxnHandler {
        public void compact(CompactionRequest rqst) throws MetaException {
          // Put a compaction request in the queue.
          try {
    - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Connection dbConn = null;
            Statement stmt = null;
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
              stmt = dbConn.createStatement();

              // Get the id for the next entry in the queue
    @@ -679,7 +666,7 @@ public class TxnHandler {
                LOG.debug("Going to rollback");
                dbConn.rollback();
                throw new MetaException("Transaction tables not properly initiated, " +
    - "no record found in next_compaction_queue_id");
    + "no record found in next_compaction_queue_id");
              }
              long id = rs.getLong(1);
              s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
    @@ -687,7 +674,7 @@ public class TxnHandler {
              stmt.executeUpdate(s);

              StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
    - "cq_table, ");
    + "cq_table, ");
              String partName = rqst.getPartitionname();
              if (partName != null) buf.append("cq_partition, ");
              buf.append("cq_state, cq_type");
    @@ -730,71 +717,69 @@ public class TxnHandler {
              LOG.debug("Going to commit");
              dbConn.commit();
            } catch (SQLException e) {
    - try {
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - detectDeadlock(dbConn, e, "compact");
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "compact");
              throw new MetaException("Unable to select from transaction database " +
    - StringUtils.stringifyException(e));
    + StringUtils.stringifyException(e));
            } finally {
              closeStmt(stmt);
              closeDbConn(dbConn);
            }
    - } catch (DeadlockException e) {
    + } catch (RetryException e) {
            compact(rqst);
    - } finally {
    - deadlockCnt = 0;
          }
        }

        public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
          ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>());
    - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + Connection dbConn = null;
          Statement stmt = null;
          try {
    - stmt = dbConn.createStatement();
    - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
    - "cq_start, cq_run_as from COMPACTION_QUEUE";
    - LOG.debug("Going to execute query <" + s + ">");
    - ResultSet rs = stmt.executeQuery(s);
    - while (rs.next()) {
    - ShowCompactResponseElement e = new ShowCompactResponseElement();
    - e.setDbname(rs.getString(1));
    - e.setTablename(rs.getString(2));
    - e.setPartitionname(rs.getString(3));
    - switch (rs.getString(4).charAt(0)) {
    - case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
    - case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
    - case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
    - default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
    - }
    - switch (rs.getString(5).charAt(0)) {
    - case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
    - case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
    - default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
    - }
    - e.setWorkerid(rs.getString(6));
    - e.setStart(rs.getLong(7));
    - e.setRunAs(rs.getString(8));
    - response.addToCompacts(e);
    - }
    - LOG.debug("Going to rollback");
    - dbConn.rollback();
    - } catch (SQLException e) {
    - LOG.debug("Going to rollback");
            try {
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
    + "cq_start, cq_run_as from COMPACTION_QUEUE";
    + LOG.debug("Going to execute query <" + s + ">");
    + ResultSet rs = stmt.executeQuery(s);
    + while (rs.next()) {
    + ShowCompactResponseElement e = new ShowCompactResponseElement();
    + e.setDbname(rs.getString(1));
    + e.setTablename(rs.getString(2));
    + e.setPartitionname(rs.getString(3));
    + switch (rs.getString(4).charAt(0)) {
    + case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
    + case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
    + case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
    + default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
    + }
    + switch (rs.getString(5).charAt(0)) {
    + case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
    + case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
    + default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
    + }
    + e.setWorkerid(rs.getString(6));
    + e.setStart(rs.getLong(7));
    + e.setRunAs(rs.getString(8));
    + response.addToCompacts(e);
    + }
    + LOG.debug("Going to rollback");
              dbConn.rollback();
    - } catch (SQLException e1) {
    - }
    - throw new MetaException("Unable to select from transaction database " +
    + } catch (SQLException e) {
    + LOG.debug("Going to rollback");
    + rollbackDBConn(dbConn);
    + checkRetryable(dbConn, e, "showCompact");
    + throw new MetaException("Unable to select from transaction database " +
                StringUtils.stringifyException(e));
    - } finally {
    - closeStmt(stmt);
    - closeDbConn(dbConn);
    + } finally {
    + closeStmt(stmt);
    + closeDbConn(dbConn);
    + }
    + return response;
    + } catch (RetryException e) {
    + return showCompact(rqst);
          }
    - return response;
        }

        /**
    @@ -828,7 +813,7 @@ public class TxnHandler {
          return previous_timeout;
        }

    - protected class DeadlockException extends Exception {
    + protected class RetryException extends Exception {

        }

    @@ -839,26 +824,28 @@ public class TxnHandler {
         * @return db connection
         * @throws MetaException if the connection cannot be obtained
         */
    - protected Connection getDbConn(int isolationLevel) throws MetaException {
    + protected Connection getDbConn(int isolationLevel) throws SQLException {
    + Connection dbConn = connPool.getConnection();
    + dbConn.setAutoCommit(false);
    + dbConn.setTransactionIsolation(isolationLevel);
    + return dbConn;
    + }
    +
    + void rollbackDBConn(Connection dbConn) {
          try {
    - Connection dbConn = connPool.getConnection();
    - dbConn.setAutoCommit(false);
    - dbConn.setTransactionIsolation(isolationLevel);
    - return dbConn;
    + if (dbConn != null) dbConn.rollback();
          } catch (SQLException e) {
    - String msg = "Unable to get jdbc connection from pool, " + e.getMessage();
    - throw new MetaException(msg);
    + LOG.warn("Failed to rollback db connection " + getMessage(e));
          }
        }
    -
        protected void closeDbConn(Connection dbConn) {
          try {
            if (dbConn != null) dbConn.close();
          } catch (SQLException e) {
    - LOG.warn("Failed to close db connection " + e.getMessage());
    + LOG.warn("Failed to close db connection " + getMessage(e));
          }
        }
    -
    +
        /**
         * Close statement instance.
         * @param stmt statement instance.
    @@ -867,7 +854,7 @@ public class TxnHandler {
          try {
            if (stmt != null) stmt.close();
          } catch (SQLException e) {
    - LOG.warn("Failed to close statement " + e.getMessage());
    + LOG.warn("Failed to close statement " + getMessage(e));
          }
        }

    @@ -882,7 +869,7 @@ public class TxnHandler {
            }
          }
          catch(SQLException ex) {
    - LOG.warn("Failed to close statement " + ex.getMessage());
    + LOG.warn("Failed to close statement " + getMessage(ex));
          }
        }

    @@ -895,18 +882,18 @@ public class TxnHandler {
          closeDbConn(dbConn);
        }
        /**
    - * Determine if an exception was a deadlock. Unfortunately there is no standard way to do
    + * Determine if an exception was such that it makse sense to retry. Unfortunately there is no standard way to do
         * this, so we have to inspect the error messages and catch the telltale signs for each
         * different database.
         * @param conn database connection
         * @param e exception that was thrown.
         * @param caller name of the method calling this
    - * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock
    + * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when deadlock
         * detected and retry count has not been exceeded.
         */
    - protected void detectDeadlock(Connection conn,
    + protected void checkRetryable(Connection conn,
                                      SQLException e,
    - String caller) throws DeadlockException, MetaException {
    + String caller) throws RetryException, MetaException {

          // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
          // to test these changes.
    @@ -919,19 +906,41 @@ public class TxnHandler {
            determineDatabaseProduct(conn);
          }
          if (e instanceof SQLTransactionRollbackException ||
    - ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
    - dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
    - (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
    - (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
    - || e.getMessage().contains("can't serialize access for this transaction")))) {
    + ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
    + dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
    + (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
    + (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
    + || e.getMessage().contains("can't serialize access for this transaction")))) {
            if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) {
              LOG.warn("Deadlock detected in " + caller + ", trying again.");
    - throw new DeadlockException();
    + throw new RetryException();
            } else {
              LOG.error("Too many repeated deadlocks in " + caller + ", giving up.");
              deadlockCnt = 0;
            }
          }
    + else if(isRetryable(e)) {
    + //in MSSQL this means Communication Link Failure
    + if(retryNum++ < retryLimit) {
    + try {
    + Thread.sleep(retryInterval);
    + }
    + catch(InterruptedException ex) {
    + //
    + }
    + LOG.warn("Retryable error detected in " + caller + ", trying again: " + getMessage(e));
    + throw new RetryException();
    + }
    + else {
    + LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
    + retryNum = 0;
    + }
    + }
    + else {
    + //if here, we got something that will propagate the error (rather than retry), so reset counters
    + deadlockCnt = 0;
    + retryNum = 0;
    + }
        }

        /**
    @@ -1073,10 +1082,10 @@ public class TxnHandler {
          @Override
          public String toString() {
            return "extLockId:" + Long.toString(extLockId) + " intLockId:" +
    - intLockId + " txnId:" + Long.toString
    - (txnId) + " db:" + db + " table:" + table + " partition:" +
    - partition + " state:" + (state == null ? "null" : state.toString())
    - + " type:" + (type == null ? "null" : type.toString());
    + intLockId + " txnId:" + Long.toString
    + (txnId) + " db:" + db + " table:" + table + " partition:" +
    + partition + " state:" + (state == null ? "null" : state.toString())
    + + " type:" + (type == null ? "null" : type.toString());
          }
        }

    @@ -1088,11 +1097,11 @@ public class TxnHandler {
          public int compare(LockInfo info1, LockInfo info2) {
            // We sort by state (acquired vs waiting) and then by extLockId.
            if (info1.state == LockState.ACQUIRED &&
    - info2.state != LockState .ACQUIRED) {
    + info2.state != LockState .ACQUIRED) {
              return -1;
            }
            if (info1.state != LockState.ACQUIRED &&
    - info2.state == LockState .ACQUIRED) {
    + info2.state == LockState .ACQUIRED) {
              return 1;
            }
            if (info1.extLockId < info2.extLockId) {
    @@ -1124,7 +1133,7 @@ public class TxnHandler {

        private void checkQFileTestHack() {
          boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) ||
    - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
    + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
          if (hackOn) {
            LOG.info("Hacking in canned values for transaction manager");
            // Set up the transaction/locking db in the derby metastore
    @@ -1135,7 +1144,7 @@ public class TxnHandler {
              // We may have already created the tables and thus don't need to redo it.
              if (!e.getMessage().contains("already exists")) {
                throw new RuntimeException("Unable to set up transaction database for" +
    - " testing: " + e.getMessage());
    + " testing: " + e.getMessage());
              }
            }
          }
    @@ -1153,7 +1162,7 @@ public class TxnHandler {
          int updateCnt = 0;
          try {
            stmt = dbConn.createStatement();
    -
    +
            // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
            StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
            boolean first = true;
    @@ -1165,7 +1174,7 @@ public class TxnHandler {
            buf.append(')');
            LOG.debug("Going to execute update <" + buf.toString() + ">");
            stmt.executeUpdate(buf.toString());
    -
    +
            buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in (");
            first = true;
            for (Long id : txnids) {
    @@ -1176,7 +1185,7 @@ public class TxnHandler {
            buf.append(')');
            LOG.debug("Going to execute update <" + buf.toString() + ">");
            updateCnt = stmt.executeUpdate(buf.toString());
    -
    +
            LOG.debug("Going to commit");
            dbConn.commit();
          } finally {
    @@ -1202,7 +1211,7 @@ public class TxnHandler {
         * @throws TxnAbortedException
         */
        private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait)
    - throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
    + throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
          // We want to minimize the number of concurrent lock requests being issued. If we do not we
          // get a large number of deadlocks in the database, since this method has to both clean
          // timedout locks and insert new locks. This synchronization barrier will not eliminiate all
    @@ -1227,7 +1236,7 @@ public class TxnHandler {
                LOG.debug("Going to rollback");
                dbConn.rollback();
                throw new MetaException("Transaction tables not properly " +
    - "initialized, no record found in next_lock_id");
    + "initialized, no record found in next_lock_id");
              }
              long extLockId = rs.getLong(1);
              s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
    @@ -1252,8 +1261,8 @@ public class TxnHandler {
                  s = "insert into TXN_COMPONENTS " +
                    "(tc_txnid, tc_database, tc_table, tc_partition) " +
                    "values (" + txnid + ", '" + dbName + "', " +
    - (tblName == null ? "null" : "'" + tblName + "'") + ", " +
    - (partName == null ? "null" : "'" + partName + "'") + ")";
    + (tblName == null ? "null" : "'" + tblName + "'") + ", " +
    + (partName == null ? "null" : "'" + partName + "'") + ")";
                  LOG.debug("Going to execute update <" + s + ">");
                  stmt.executeUpdate(s);
                }
    @@ -1275,13 +1284,13 @@ public class TxnHandler {
                long now = getDbTime(dbConn);
                s = "insert into HIVE_LOCKS " +
                  " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
    - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
    - " values (" + extLockId + ", " +
    - + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
    - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
    - + ", " + (partName == null ? "null" : "'" + partName + "'") +
    - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" +
    - rqst.getUser() + "', '" + rqst.getHostname() + "')";
    + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
    + " values (" + extLockId + ", " +
    + + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
    + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
    + + ", " + (partName == null ? "null" : "'" + partName + "'") +
    + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" +
    + rqst.getUser() + "', '" + rqst.getHostname() + "')";
                LOG.debug("Going to execute update <" + s + ">");
                stmt.executeUpdate(s);
              }
    @@ -1305,7 +1314,7 @@ public class TxnHandler {
        private LockResponse checkLock(Connection dbConn,
                                       long extLockId,
                                       boolean alwaysCommit)
    - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
    + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
          List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);
          LockResponse response = new LockResponse();
          response.setLockid(extLockId);
    @@ -1313,8 +1322,8 @@ public class TxnHandler {
          LOG.debug("Setting savepoint");
          Savepoint save = dbConn.setSavepoint();
          StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
    - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
    - "hl_lock_type from HIVE_LOCKS where hl_db in (");
    + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
    + "hl_lock_type from HIVE_LOCKS where hl_db in (");

          Set<String> strings = new HashSet<String>(locksBeingChecked.size());
          for (LockInfo info : locksBeingChecked) {
    @@ -1430,7 +1439,7 @@ public class TxnHandler {
                // lock the whole database and we need to check it. Otherwise,
                // check if they are operating on the same table, if not, move on.
                if (locks[index].table != null && locks[i].table != null
    - && !locks[index].table.equals(locks[i].table)) {
    + && !locks[index].table.equals(locks[i].table)) {
                  continue;
                }

    @@ -1438,30 +1447,30 @@ public class TxnHandler {
                // lock the whole table and we need to check it. Otherwise,
                // check if they are operating on the same partition, if not, move on.
                if (locks[index].partition != null && locks[i].partition != null
    - && !locks[index].partition.equals(locks[i].partition)) {
    + && !locks[index].partition.equals(locks[i].partition)) {
                  continue;
                }

                // We've found something that matches what we're trying to lock,
                // so figure out if we can lock it too.
                switch (jumpTable.get(locks[index].type).get(locks[i].type).get
    - (locks[i].state)) {
    - case ACQUIRE:
    - acquire(dbConn, stmt, extLockId, info.intLockId);
    - acquired = true;
    - break;
    - case WAIT:
    - wait(dbConn, save);
    - if (alwaysCommit) {
    - // In the case where lockNoWait has been called we don't want to commit because
    - // it's going to roll everything back. In every other case we want to commit here.
    - LOG.debug("Going to commit");
    - dbConn.commit();
    - }
    - response.setState(LockState.WAITING);
    - return response;
    - case KEEP_LOOKING:
    - continue;
    + (locks[i].state)) {
    + case ACQUIRE:
    + acquire(dbConn, stmt, extLockId, info.intLockId);
    + acquired = true;
    + break;
    + case WAIT:
    + wait(dbConn, save);
    + if (alwaysCommit) {
    + // In the case where lockNoWait has been called we don't want to commit because
    + // it's going to roll everything back. In every other case we want to commit here.
    + LOG.debug("Going to commit");
    + dbConn.commit();
    + }
    + response.setState(LockState.WAITING);
    + return response;
    + case KEEP_LOOKING:
    + continue;
                }
                if (acquired) break; // We've acquired this lock component,
                // so get out of the loop and look at the next component.
    @@ -1494,18 +1503,18 @@ public class TxnHandler {
        }

        private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId)
    - throws SQLException, NoSuchLockException, MetaException {
    + throws SQLException, NoSuchLockException, MetaException {
          long now = getDbTime(dbConn);
          String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
    - "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
    - extLockId + " and hl_lock_int_id = " + intLockId;
    + "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
    + extLockId + " and hl_lock_int_id = " + intLockId;
          LOG.debug("Going to execute update <" + s + ">");
          int rc = stmt.executeUpdate(s);
          if (rc < 1) {
            LOG.debug("Going to rollback");
            dbConn.rollback();
            throw new NoSuchLockException("No such lock: (" + extLockId + "," +
    - + intLockId + ")");
    + + intLockId + ")");
          }
          // We update the database, but we don't commit because there may be other
          // locks together with this, and we only want to acquire one if we can
    @@ -1514,7 +1523,7 @@ public class TxnHandler {

        // Heartbeats on the lock table. This commits, so do not enter it with any state
        private void heartbeatLock(Connection dbConn, long extLockId)
    - throws NoSuchLockException, SQLException, MetaException {
    + throws NoSuchLockException, SQLException, MetaException {
          // If the lock id is 0, then there are no locks in this heartbeat
          if (extLockId == 0) return;
          Statement stmt = null;
    @@ -1523,7 +1532,7 @@ public class TxnHandler {
            long now = getDbTime(dbConn);

            String s = "update HIVE_LOCKS set hl_last_heartbeat = " +
    - now + " where hl_lock_ext_id = " + extLockId;
    + now + " where hl_lock_ext_id = " + extLockId;
            LOG.debug("Going to execute update <" + s + ">");
            int rc = stmt.executeUpdate(s);
            if (rc < 1) {
    @@ -1540,7 +1549,7 @@ public class TxnHandler {

        // Heartbeats on the txn table. This commits, so do not enter it with any state
        private void heartbeatTxn(Connection dbConn, long txnid)
    - throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
    + throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
          // If the txnid is 0, then there are no transactions in this heartbeat
          if (txnid == 0) return;
          Statement stmt = null;
    @@ -1560,10 +1569,10 @@ public class TxnHandler {
              LOG.debug("Going to rollback");
              dbConn.rollback();
              throw new TxnAbortedException("Transaction " + txnid +
    - " already aborted");
    + " already aborted");
            }
            s = "update TXNS set txn_last_heartbeat = " + now +
    - " where txn_id = " + txnid;
    + " where txn_id = " + txnid;
            LOG.debug("Going to execute update <" + s + ">");
            stmt.executeUpdate(s);
            LOG.debug("Going to commit");
    @@ -1575,17 +1584,17 @@ public class TxnHandler {

        // NEVER call this function without first calling heartbeat(long, long)
        private long getTxnIdFromLockId(Connection dbConn, long extLockId)
    - throws NoSuchLockException, MetaException, SQLException {
    + throws NoSuchLockException, MetaException, SQLException {
          Statement stmt = null;
          try {
            stmt = dbConn.createStatement();
            String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
    - extLockId;
    + extLockId;
            LOG.debug("Going to execute query <" + s + ">");
            ResultSet rs = stmt.executeQuery(s);
            if (!rs.next()) {
              throw new MetaException("This should never happen! We already " +
    - "checked the lock existed but now we can't find it!");
    + "checked the lock existed but now we can't find it!");
            }
            long txnid = rs.getLong(1);
            LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid));
    @@ -1597,13 +1606,13 @@ public class TxnHandler {

        // NEVER call this function without first calling heartbeat(long, long)
        private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long extLockId)
    - throws NoSuchLockException, MetaException, SQLException {
    + throws NoSuchLockException, MetaException, SQLException {
          Statement stmt = null;
          try {
            stmt = dbConn.createStatement();
            String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
    - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
    - "hl_lock_ext_id = " + extLockId;
    + "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
    + "hl_lock_ext_id = " + extLockId;
            LOG.debug("Going to execute query <" + s + ">");
            ResultSet rs = stmt.executeQuery(s);
            boolean sawAtLeastOne = false;
    @@ -1614,7 +1623,7 @@ public class TxnHandler {
            }
            if (!sawAtLeastOne) {
              throw new MetaException("This should never happen! We already " +
    - "checked the lock existed but now we can't find it!");
    + "checked the lock existed but now we can't find it!");
            }
            return ourLockInfo;
          } finally {
    @@ -1632,7 +1641,7 @@ public class TxnHandler {
            stmt = dbConn.createStatement();
            // Remove any timed out locks from the table.
            String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
    - (now - timeout);
    + (now - timeout);
            LOG.debug("Going to execute update <" + s + ">");
            stmt.executeUpdate(s);
            LOG.debug("Going to commit");
    @@ -1652,7 +1661,7 @@ public class TxnHandler {
            stmt = dbConn.createStatement();
            // Abort any timed out locks from the table.
            String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
    - "' and txn_last_heartbeat < " + (now - timeout);
    + "' and txn_last_heartbeat < " + (now - timeout);
            LOG.debug("Going to execute query <" + s + ">");
            ResultSet rs = stmt.executeQuery(s);
            List<Long> deadTxns = new ArrayList<Long>();
    @@ -1675,12 +1684,12 @@ public class TxnHandler {
          String passwd;
          try {
            passwd = ShimLoader.getHadoopShims().getPassword(conf,
    - HiveConf.ConfVars.METASTOREPWD.varname);
    + HiveConf.ConfVars.METASTOREPWD.varname);
          } catch (IOException err) {
            throw new SQLException("Error getting metastore password", err);
          }
          String connectionPooler = HiveConf.getVar(conf,
    - HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();
    + HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();

          if ("bonecp".equals(connectionPooler)) {
            BoneCPConfig config = new BoneCPConfig();
    @@ -1696,22 +1705,22 @@ public class TxnHandler {
            // This doesn't get used, but it's still necessary, see
            // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup
            PoolableConnectionFactory poolConnFactory =
    - new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
    + new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
            connPool = new PoolingDataSource(objectPool);
          } else {
            throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler);
          }
        }

    - private static synchronized void buildJumpTable() {
    + private static synchronized void buildJumpTable() {
          if (jumpTable != null) return;

          jumpTable =
    - new HashMap<LockType, Map<LockType, Map<LockState, LockAction>>>(3);
    + new HashMap<LockType, Map<LockType, Map<LockState, LockAction>>>(3);

          // SR: Lock we are trying to acquire is shared read
          Map<LockType, Map<LockState, LockAction>> m =
    - new HashMap<LockType, Map<LockState, LockAction>>(3);
    + new HashMap<LockType, Map<LockState, LockAction>>(3);
          jumpTable.put(LockType.SHARED_READ, m);

          // SR.SR: Lock we are examining is shared read
    @@ -1743,7 +1752,7 @@ public class TxnHandler {
          // that something is blocking it that would not block a read.
          m2.put(LockState.WAITING, LockAction.KEEP_LOOKING);

    - // SR.E: Lock we are examining is exclusive
    + // SR.E: Lock we are examining is exclusive
          m2 = new HashMap<LockState, LockAction>(2);
          m.put(LockType.EXCLUSIVE, m2);

    @@ -1777,7 +1786,7 @@ public class TxnHandler {
          m2.put(LockState.ACQUIRED, LockAction.WAIT);
          m2.put(LockState.WAITING, LockAction.WAIT);

    - // SW.E: Lock we are examining is exclusive
    + // SW.E: Lock we are examining is exclusive
          m2 = new HashMap<LockState, LockAction>(2);
          m.put(LockType.EXCLUSIVE, m2);

    @@ -1805,7 +1814,7 @@ public class TxnHandler {
          m2.put(LockState.ACQUIRED, LockAction.WAIT);
          m2.put(LockState.WAITING, LockAction.WAIT);

    - // E.E: Lock we are examining is exclusive
    + // E.E: Lock we are examining is exclusive
          m2 = new HashMap<LockState, LockAction>(2);
          m.put(LockType.EXCLUSIVE, m2);

    @@ -1813,4 +1822,20 @@ public class TxnHandler {
          m2.put(LockState.ACQUIRED, LockAction.WAIT);
          m2.put(LockState.WAITING, LockAction.WAIT);
        }
    + /**
    + * Returns true if {@code ex} should be retried
    + */
    + private static boolean isRetryable(Exception ex) {
    + if(ex instanceof SQLException) {
    + SQLException sqlException = (SQLException)ex;
    + if("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
    + //in MSSQL this means Communication Link Failure
    + return true;
    + }
    + }
    + return false;
    + }
    + private static String getMessage(SQLException ex) {
    + return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
    + }
      }

    Modified: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1652558&r1=1652557&r2=1652558&view=diff
    ==============================================================================
    --- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (original)
    +++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Sat Jan 17 02:54:40 2015
    @@ -1124,11 +1124,11 @@ public class TestTxnHandler {
                      LOG.debug("no exception, no deadlock");
                    } catch (SQLException e) {
                      try {
    - txnHandler.detectDeadlock(conn1, e, "thread t1");
    + txnHandler.checkRetryable(conn1, e, "thread t1");
                        LOG.debug("Got an exception, but not a deadlock, SQLState is " +
                            e.getSQLState() + " class of exception is " + e.getClass().getName() +
                            " msg is <" + e.getMessage() + ">");
    - } catch (TxnHandler.DeadlockException de) {
    + } catch (TxnHandler.RetryException de) {
                        LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
                            "exception is " + e.getClass().getName() + " msg is <" + e
                            .getMessage() + ">");
    @@ -1154,11 +1154,11 @@ public class TestTxnHandler {
                      LOG.debug("no exception, no deadlock");
                    } catch (SQLException e) {
                      try {
    - txnHandler.detectDeadlock(conn2, e, "thread t2");
    + txnHandler.checkRetryable(conn2, e, "thread t2");
                        LOG.debug("Got an exception, but not a deadlock, SQLState is " +
                            e.getSQLState() + " class of exception is " + e.getClass().getName() +
                            " msg is <" + e.getMessage() + ">");
    - } catch (TxnHandler.DeadlockException de) {
    + } catch (TxnHandler.RetryException de) {
                        LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
                            "exception is " + e.getClass().getName() + " msg is <" + e
                            .getMessage() + ">");

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJan 17, '15 at 2:54a
activeJan 17, '15 at 2:54a
posts2
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 2 posts

People

Translate

site design / logo © 2021 Grokbase