Grokbase Groups Hive commits May 2015
FAQ
Repository: hive
Updated Branches:
   refs/heads/branch-1.2 7a8eb62db -> 0e380c71c


HIVE-10521 - TxnHandler.timeOutTxns only times out some of the expired transactions (Alan Gates via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e380c71
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e380c71
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e380c71

Branch: refs/heads/branch-1.2
Commit: 0e380c71c42b4ea9d26c5898caffdf151c5c422f
Parents: 7a8eb62
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Wed May 6 19:30:47 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Wed May 6 19:30:47 2015 -0700

----------------------------------------------------------------------
  .../hadoop/hive/metastore/txn/TxnHandler.java | 35 ++++++++++++------
  .../hive/metastore/txn/TestTxnHandler.java | 39 +++++++++++++++-----
  2 files changed, 53 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0e380c71/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 704c3ed..7c3b55c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -75,6 +75,7 @@ public class TxnHandler {
    static final protected char LOCK_SEMI_SHARED = 'w';

    static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
+ static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100;
    static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());

    static private DataSource connPool;
@@ -130,7 +131,8 @@ public class TxnHandler {
      timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
      deadlockCnt = 0;
      buildJumpTable();
- retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
+ retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL,
+ TimeUnit.MILLISECONDS);
      retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
      deadlockRetryInterval = retryInterval / 10;

@@ -334,9 +336,7 @@ public class TxnHandler {
        Connection dbConn = null;
        try {
          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- List<Long> txnids = new ArrayList<Long>(1);
- txnids.add(txnid);
- if (abortTxns(dbConn, txnids) != 1) {
+ if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
            LOG.debug("Going to rollback");
            dbConn.rollback();
            throw new NoSuchTxnException("No such transaction: " + txnid);
@@ -1321,8 +1321,6 @@ public class TxnHandler {
        LOG.debug("Going to execute update <" + buf.toString() + ">");
        updateCnt = stmt.executeUpdate(buf.toString());

- LOG.debug("Going to commit");
- dbConn.commit();
      } finally {
        closeStmt(stmt);
      }
@@ -1818,10 +1816,10 @@ public class TxnHandler {
      }
    }

- // Abort timed out transactions. This calls abortTxn(), which does a commit,
+ // Abort timed out transactions. This does a commit,
    // and thus should be done before any calls to heartbeat that will leave
    // open transactions on the underlying database.
- private void timeOutTxns(Connection dbConn) throws SQLException, MetaException {
+ private void timeOutTxns(Connection dbConn) throws SQLException, MetaException, RetryException {
      long now = getDbTime(dbConn);
      Statement stmt = null;
      try {
@@ -1834,10 +1832,23 @@ public class TxnHandler {
        List<Long> deadTxns = new ArrayList<Long>();
        // Limit the number of timed out transactions we do in one pass to keep from generating a
        // huge delete statement
- for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1));
- // We don't care whether all of the transactions get deleted or not,
- // if some didn't it most likely means someone else deleted them in the interum
- if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+ do {
+ deadTxns.clear();
+ for (int i = 0; i < TIMED_OUT_TXN_ABORT_BATCH_SIZE && rs.next(); i++) {
+ deadTxns.add(rs.getLong(1));
+ }
+ // We don't care whether all of the transactions get deleted or not,
+ // if some didn't it most likely means someone else deleted them in the interum
+ if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+ } while (deadTxns.size() > 0);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "abortTxn");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
      } finally {
        closeStmt(stmt);
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/0e380c71/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index d4266e1..f478184 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -937,16 +937,16 @@ public class TestTxnHandler {
    @Test
    public void testLockTimeout() throws Exception {
      long timeout = txnHandler.setTimeout(1);
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- Thread.currentThread().sleep(10);
      try {
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ Thread.currentThread().sleep(10);
        txnHandler.checkLock(new CheckLockRequest(res.getLockid()));
        fail("Told there was a lock, when it should have timed out.");
      } catch (NoSuchLockException e) {
@@ -956,6 +956,27 @@ public class TestTxnHandler {
    }

    @Test
+ public void testRecoverManyTimeouts() throws Exception {
+ long timeout = txnHandler.setTimeout(1);
+ try {
+ txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost"));
+ Thread.currentThread().sleep(10);
+ txnHandler.getOpenTxns();
+ GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo();
+ int numAborted = 0;
+ for (TxnInfo txnInfo : rsp.getOpen_txns()) {
+ assertEquals(TxnState.ABORTED, txnInfo.getState());
+ numAborted++;
+ }
+ assertEquals(503, numAborted);
+ } finally {
+ txnHandler.setTimeout(timeout);
+ }
+
+
+ }
+
+ @Test
    public void testHeartbeatNoLock() throws Exception {
      HeartbeatRequest h = new HeartbeatRequest();
      h.setLockid(29389839L);

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 4 of 63 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 7, '15 at 12:32a
activeMay 18, '15 at 6:17p
posts63
users12
websitehive.apache.org

People

Translate

site design / logo © 2021 Grokbase