FAQ
Repository: hive
Updated Branches:
   refs/heads/master 1e49e873f -> 8f65fabc9


HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f65fabc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f65fabc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f65fabc

Branch: refs/heads/master
Commit: 8f65fabc965d19b2fa1c06c59d5c0fcdd733a106
Parents: 1e49e87
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Wed Mar 23 11:58:40 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Wed Mar 23 11:59:13 2016 -0700

----------------------------------------------------------------------
  .../apache/hadoop/hive/common/ServerUtils.java | 11 ++
  .../deployers/config/hive/hive-site.mysql.xml | 24 ++-
  .../hive/metastore/txn/CompactionInfo.java | 4 +
  .../metastore/txn/CompactionTxnHandler.java | 7 +-
  .../hadoop/hive/metastore/txn/TxnDbUtil.java | 8 +
  .../hadoop/hive/metastore/txn/TxnHandler.java | 169 ++++++++++++++++++-
  .../hadoop/hive/metastore/txn/TxnStore.java | 33 +++-
  .../hadoop/hive/metastore/txn/TxnUtils.java | 4 +-
  .../metastore/txn/ValidCompactorTxnList.java | 2 +-
  .../hive/metastore/txn/TestTxnHandler.java | 94 +++++++++++
  .../ql/txn/AcidCompactionHistoryService.java | 7 +
  .../hive/ql/txn/AcidHouseKeeperService.java | 7 +
  .../hadoop/hive/ql/txn/compactor/Cleaner.java | 62 ++++++-
  .../hadoop/hive/ql/txn/compactor/Initiator.java | 19 ++-
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 7 +-
  .../hive/ql/lockmgr/TestDbTxnManager.java | 6 +
  16 files changed, 437 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index b44f92f..af4e9b2 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -65,5 +65,16 @@ public class ServerUtils {
      }
      return serverIPAddress;
    }
+ /**
+ * @return name of current host
+ */
+ public static String hostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to resolve my host name " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }

  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
----------------------------------------------------------------------
diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
index 415caf4..36bae6c 100644
--- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -58,13 +58,14 @@
          <name>hive.exec.dynamic.partition.mode</name>
          <value>nonstrict</value>
      </property>
+
      <property>
          <name>hive.compactor.initiator.on</name>
- <value>false</value>
+ <value>true</value>
      </property>
      <property>
          <name>hive.compactor.worker.threads</name>
- <value>2</value>
+ <value>5</value>
      </property>
      <property>
          <name>hive.timedout.txn.reaper.start</name>
@@ -77,9 +78,24 @@
      -->
      <property>
          <name>hive.timedout.txn.reaper.interval</name>
- <value>30s</value>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.history.reaper.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.cleaner.run.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.check.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.delta.num.threshold</name>
+ <value>2</value>
      </property>
-
      <!--end ACID related properties-->
  <!--
      <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 73255d2..bea1473 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -18,6 +18,7 @@
  package org.apache.hadoop.hive.metastore.txn;

  import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;

  import java.sql.PreparedStatement;
  import java.sql.ResultSet;
@@ -39,6 +40,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
    public boolean tooManyAborts = false;
    /**
     * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
+ * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
+ * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
+ * {@link ValidCompactorTxnList#highWatermark}
     */
    public long highestTxnId;
    byte[] metaInfo;

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 15c01da..67e661f 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -160,6 +160,8 @@ class CompactionTxnHandler extends TxnHandler {
      try {
        Connection dbConn = null;
        Statement stmt = null;
+ //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+ Statement updStmt = null;
        ResultSet rs = null;
        try {
          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -173,6 +175,7 @@ class CompactionTxnHandler extends TxnHandler {
            dbConn.rollback();
            return null;
          }
+ updStmt = dbConn.createStatement();
          do {
            CompactionInfo info = new CompactionInfo();
            info.id = rs.getLong(1);
@@ -186,7 +189,7 @@ class CompactionTxnHandler extends TxnHandler {
              "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
              " AND cq_state='" + INITIATED_STATE + "'";
            LOG.debug("Going to execute update <" + s + ">");
- int updCount = stmt.executeUpdate(s);
+ int updCount = updStmt.executeUpdate(s);
            if(updCount == 1) {
              dbConn.commit();
              return info;
@@ -210,6 +213,7 @@ class CompactionTxnHandler extends TxnHandler {
          throw new MetaException("Unable to connect to transaction database " +
            StringUtils.stringifyException(e));
        } finally {
+ closeStmt(updStmt);
          close(rs, stmt, dbConn);
        }
      } catch (RetryException e) {
@@ -626,6 +630,7 @@ class CompactionTxnHandler extends TxnHandler {

    /**
     * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
     */
    public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
      Connection dbConn = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 8172242..df480ea 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -140,6 +140,13 @@ public final class TxnDbUtil {
          " CC_HIGHEST_TXN_ID bigint," +
          " CC_META_INFO varchar(2048) for bit data," +
          " CC_HADOOP_JOB_ID varchar(32))");
+
+ stmt.execute("CREATE TABLE AUX_TABLE (" +
+ " MT_KEY1 varchar(128) NOT NULL," +
+ " MT_KEY2 bigint NOT NULL," +
+ " MT_COMMENT varchar(255)," +
+ " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
+ ")");

        conn.commit();
      } catch (SQLException e) {
@@ -185,6 +192,7 @@ public final class TxnDbUtil {
        dropTable(stmt, "COMPACTION_QUEUE");
        dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
        dropTable(stmt, "COMPLETED_COMPACTIONS");
+ dropTable(stmt, "AUX_TABLE");
        conn.commit();
      } finally {
        closeResources(conn, stmt, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index afeb168..21faff4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -23,6 +23,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
  import org.apache.commons.dbcp.ConnectionFactory;
  import org.apache.commons.dbcp.DriverManagerConnectionFactory;
  import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hive.common.ServerUtils;
  import org.apache.hadoop.hive.common.classification.InterfaceAudience;
  import org.apache.hadoop.hive.common.classification.InterfaceStability;
  import org.apache.hadoop.hive.metastore.Warehouse;
@@ -43,6 +45,8 @@ import javax.sql.DataSource;
  import java.io.IOException;
  import java.sql.*;
  import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReentrantLock;

@@ -85,7 +89,7 @@ import java.util.concurrent.locks.ReentrantLock;
   */
  @InterfaceAudience.Private
  @InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore {
+abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

    static final protected char INITIATED_STATE = 'i';
    static final protected char WORKING_STATE = 'w';
@@ -136,6 +140,12 @@ abstract class TxnHandler implements TxnStore {
     * Derby specific concurrency control
     */
    private static final ReentrantLock derbyLock = new ReentrantLock(true);
+ /**
+ * must be static since even in UT there may be > 1 instance of TxnHandler
+ * (e.g. via Compactor services)
+ */
+ private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
+ private static final String hostname = ServerUtils.hostname();

    // Private methods should never catch SQLException and then throw MetaException. The public
    // methods depend on SQLException coming back so they can detect and handle deadlocks. Private
@@ -541,7 +551,7 @@ abstract class TxnHandler implements TxnStore {
     * @throws MetaException
     */
    private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
- String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : "");
+ String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
      ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
      if(rs.next()) {
        return rs;
@@ -1399,14 +1409,14 @@ abstract class TxnHandler implements TxnStore {
      }
    }

- void rollbackDBConn(Connection dbConn) {
+ static void rollbackDBConn(Connection dbConn) {
      try {
        if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
      } catch (SQLException e) {
        LOG.warn("Failed to rollback db connection " + getMessage(e));
      }
    }
- protected void closeDbConn(Connection dbConn) {
+ protected static void closeDbConn(Connection dbConn) {
      try {
        if (dbConn != null && !dbConn.isClosed()) {
          dbConn.close();
@@ -1420,7 +1430,7 @@ abstract class TxnHandler implements TxnStore {
     * Close statement instance.
     * @param stmt statement instance.
     */
- protected void closeStmt(Statement stmt) {
+ protected static void closeStmt(Statement stmt) {
      try {
        if (stmt != null && !stmt.isClosed()) stmt.close();
      } catch (SQLException e) {
@@ -1432,7 +1442,7 @@ abstract class TxnHandler implements TxnStore {
     * Close the ResultSet.
     * @param rs may be {@code null}
     */
- void close(ResultSet rs) {
+ static void close(ResultSet rs) {
      try {
        if (rs != null && !rs.isClosed()) {
          rs.close();
@@ -1446,7 +1456,7 @@ abstract class TxnHandler implements TxnStore {
    /**
     * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
     */
- void close(ResultSet rs, Statement stmt, Connection dbConn) {
+ static void close(ResultSet rs, Statement stmt, Connection dbConn) {
      close(rs);
      closeStmt(stmt);
      closeDbConn(dbConn);
@@ -2597,6 +2607,40 @@ abstract class TxnHandler implements TxnStore {
      }
      return false;
    }
+ private boolean isDuplicateKeyError(SQLException ex) {
+ switch (dbProduct) {
+ case DERBY:
+ if("23505".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case MYSQL:
+ if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case SQLSERVER:
+ //2627 is unique constaint violation incl PK, 2601 - unique key
+ if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case ORACLE:
+ if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case POSTGRES:
+ //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html
+ if("23505".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex));
+ }
+ return false;
+ }
    private static String getMessage(SQLException ex) {
      return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
    }
@@ -2671,4 +2715,115 @@ abstract class TxnHandler implements TxnStore {
        derbyLock.unlock();
      }
    }
+ @Override
+ public MutexAPI getMutexAPI() {
+ return this;
+ }
+
+ @Override
+ public LockHandle acquireLock(String key) throws MetaException {
+ /**
+ * The implementation here is a bit kludgey but done so that code exercised by unit tests
+ * (which run against Derby which has no support for select for update) is as similar to
+ * production code as possible.
+ * In particular, with Derby we always run in a single process with a single metastore and
+ * the absence of For Update is handled via a Semaphore. The later would strictly speaking
+ * make the SQL statments below unnecessary (for Derby), but then they would not be tested.
+ */
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("About to execute SQL: " + sqlStmt);
+ }
+ rs = stmt.executeQuery(sqlStmt);
+ if (!rs.next()) {
+ close(rs);
+ try {
+ stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)");
+ dbConn.commit();
+ } catch (SQLException ex) {
+ if (!isDuplicateKeyError(ex)) {
+ throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex);
+ }
+ }
+ rs = stmt.executeQuery(sqlStmt);
+ if (!rs.next()) {
+ throw new IllegalStateException("Unable to lock " + quoteString(key) + ". Expected row in AUX_TABLE is missing.");
+ }
+ }
+ Semaphore derbySemaphore = null;
+ if(dbProduct == DatabaseProduct.DERBY) {
+ derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
+ derbySemaphore = derbyKey2Lock.get(key);
+ derbySemaphore.acquire();
+ }
+ LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname));
+ //OK, so now we have a lock
+ return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
+ } catch (SQLException ex) {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
+ throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
+ }
+ catch(InterruptedException ex) {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage() + StringUtils.stringifyException(ex));
+ }
+ finally {
+ unlockInternal();
+ }
+ }
+ catch(RetryException ex) {
+ acquireLock(key);
+ }
+ throw new MetaException("This can't happen because checkRetryable() has a retry limit");
+ }
+ public void acquireLock(String key, LockHandle handle) {
+ //the idea is that this will use LockHandle.dbConn
+ throw new NotImplementedException();
+ }
+ private static final class LockHandleImpl implements LockHandle {
+ private final Connection dbConn;
+ private final Statement stmt;
+ private final ResultSet rs;
+ private final Semaphore derbySemaphore;
+ private final List<String> keys = new ArrayList<>();
+ LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore) {
+ this.dbConn = conn;
+ this.stmt = stmt;
+ this.rs = rs;
+ this.derbySemaphore = derbySemaphore;
+ if(derbySemaphore != null) {
+ //oterwise it may later release permit acquired by someone else
+ assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore";
+ }
+ keys.add(key);
+ }
+ void addKey(String key) {
+ //keys.add(key);
+ //would need a list of (stmt,rs) pairs - 1 for each key
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void releaseLocks() {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ if(derbySemaphore != null) {
+ derbySemaphore.release();
+ }
+ for(String key : keys) {
+ LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname));
+ }
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6d738b5..927e9bc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -47,6 +47,7 @@ import java.util.Set;
  @InterfaceStability.Evolving
  public interface TxnStore {

+ public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
    // Compactor states (Should really be enum)
    static final public String INITIATED_RESPONSE = "initiated";
    static final public String WORKING_RESPONSE = "working";
@@ -329,10 +330,40 @@ public interface TxnStore {
     */
    public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;

-
    @VisibleForTesting
    public int numLocksInLockTable() throws SQLException, MetaException;

    @VisibleForTesting
    long setTimeout(long milliseconds);
+
+ public MutexAPI getMutexAPI();
+
+ /**
+ * This is primarily designed to provide coarse grained mutex support to operations running
+ * inside the Metastore (of which there could be several instances). The initial goal is to
+ * ensure that various sub-processes of the Compactor don't step on each other.
+ *
+ * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+ */
+ public static interface MutexAPI {
+ /**
+ * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
+ * a handle which must be used to release the lock. Each invocation returns a new handle.
+ */
+ public LockHandle acquireLock(String key) throws MetaException;
+
+ /**
+ * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This
+ * will associate the lock on {@code key} with the same handle. All locks associated with
+ * the same handle will be released together.
+ * @param handle not NULL
+ */
+ public void acquireLock(String key, LockHandle handle) throws MetaException;
+ public static interface LockHandle {
+ /**
+ * Releases all locks associcated with this handle.
+ */
+ public void releaseLocks();
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 57b88cc..cc9e583 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -75,8 +75,8 @@ public class TxnUtils {
      int i = 0;
      for (TxnInfo txn : txns.getOpen_txns()) {
        if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();
- }
+ exceptions[i++] = txn.getId();//todo: only add Aborted
+ }//remove all exceptions < minOpenTxn
      highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
      return new ValidCompactorTxnList(exceptions, -1, highWater);
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
index 648fd49..30bdfa7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
  import java.util.Arrays;

  /**
- * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
   * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
   * is committed or aborted. Additionally it will return none if there are any open transactions
   * below the max transaction given, since we don't want to compact above open transactions. For

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index f0d23ba..26a660a 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -50,11 +50,13 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo;
  import org.apache.hadoop.hive.metastore.api.TxnOpenException;
  import org.apache.hadoop.hive.metastore.api.TxnState;
  import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.util.StringUtils;
  import org.apache.logging.log4j.Level;
  import org.apache.logging.log4j.LogManager;
  import org.apache.logging.log4j.core.LoggerContext;
  import org.apache.logging.log4j.core.config.Configuration;
  import org.junit.After;
+import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Ignore;
  import org.junit.Test;
@@ -68,6 +70,7 @@ import java.util.ArrayList;
  import java.util.List;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;

  import static junit.framework.Assert.assertEquals;
  import static junit.framework.Assert.assertFalse;
@@ -1252,6 +1255,97 @@ public class TestTxnHandler {
      }
    }

+ /**
+ * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
+ * 1. add to metastore/pom.xml
+ * <dependency>
+ * <groupId>mysql</groupId>
+ * <artifactId>mysql-connector-java</artifactId>
+ * <version>5.1.30</version>
+ * </dependency>
+ * 2. Hack in the c'tor of this class
+ * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
+ * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
+ * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
+ * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+ * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
+ *
+ */
+ @Ignore("multiple threads wedge Derby")
+ @Test
+ public void testMutexAPI() throws Exception {
+ final TxnStore.MutexAPI api = txnHandler.getMutexAPI();
+ final AtomicInteger stepTracker = new AtomicInteger(0);
+ /**
+ * counter = 0;
+ * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock
+ * Thread2 counter=2, lock (and block), inc counter, should be 4
+ */
+ Thread t1 = new Thread("MutexTest1") {
+ public void run() {
+ try {
+ stepTracker.incrementAndGet();//now 1
+ TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+ Thread.sleep(4000);
+ //stepTracker should now be 2 which indicates t2 has started
+ Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
+ stepTracker.incrementAndGet();//now 3
+ handle.releaseLocks();
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+ };
+ t1.setDaemon(true);
+ ErrorHandle ueh1 = new ErrorHandle();
+ t1.setUncaughtExceptionHandler(ueh1);
+ Thread t2 = new Thread("MutexTest2") {
+ public void run() {
+ try {
+ stepTracker.incrementAndGet();//now 2
+ //this should block until t1 unlocks
+ TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+ stepTracker.incrementAndGet();//now 4
+ Assert.assertEquals(4, stepTracker.get());
+ handle.releaseLocks();
+ stepTracker.incrementAndGet();//now 5
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+ };
+ t2.setDaemon(true);
+ ErrorHandle ueh2 = new ErrorHandle();
+ t2.setUncaughtExceptionHandler(ueh2);
+ t1.start();
+ try {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException ex) {
+ LOG.info("Sleep was interrupted");
+ }
+ t2.start();
+ t1.join(6000);//so that test doesn't block
+ t2.join(6000);
+
+ if(ueh1.error != null) {
+ Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
+ }
+ if (ueh2.error != null) {
+ Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
+ }
+ Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
+ }
+ private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
+ Throwable error = null;
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
+ error = e;
+ }
+ }
    private void updateTxns(Connection conn) throws SQLException {
      Statement stmt = conn.createStatement();
      stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index 59c8fe4..5d9e7be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -61,7 +61,9 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {

      @Override
      public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
        try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
          long startTime = System.currentTimeMillis();
          txnHandler.purgeCompactionHistory();
          int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
        catch(Throwable t) {
          LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
        }
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index 882562b..13b10de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -61,7 +61,9 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
      }
      @Override
      public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
        try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
          long startTime = System.currentTimeMillis();
          txnHandler.performTimeOuts();
          int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
        catch(Throwable t) {
          LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
        }
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index fbf5481..9ffeaec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
   */
  package org.apache.hadoop.hive.ql.txn.compactor;

+import org.apache.hadoop.hive.metastore.txn.TxnStore;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.apache.hadoop.fs.FileStatus;
@@ -72,11 +73,13 @@ public class Cleaner extends CompactorThread {
        // and if so remembers that and then sets it to true at the end. We have to check here
        // first to make sure we go through a complete iteration of the loop before resetting it.
        boolean setLooped = !looped.get();
- long startedAt = System.currentTimeMillis();
+ TxnStore.MutexAPI.LockHandle handle = null;
+ long startedAt = -1;
        // Make sure nothing escapes this run method and kills the metastore at large,
        // so wrap it in a big catch Throwable statement.
        try {
-
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+ startedAt = System.currentTimeMillis();
          // First look for all the compactions that are waiting to be cleaned. If we have not
          // seen an entry before, look for all the locks held on that table or partition and
          // record them. We will then only clean the partition once all of those locks have been
@@ -86,6 +89,31 @@ public class Cleaner extends CompactorThread {
          // done the compaction will read the more up to date version of the data (either in a
          // newer delta or in a newer base).
          List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+ {
+ /**
+ * Since there may be more than 1 instance of Cleaner running we may have state info
+ * for items which were cleaned by instances. Here we remove them.
+ *
+ * In the long run if we add end_time to compaction_queue, then we can check that
+ * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case
+ * we know the lock owner is reading files created by this compaction or later.
+ * The advantage is that we don't have to store the locks.
+ */
+ Set<Long> currentToCleanSet = new HashSet<>();
+ for (CompactionInfo ci : toClean) {
+ currentToCleanSet.add(ci.id);
+ }
+ Set<Long> cleanPerformedByOthers = new HashSet<>();
+ for (long id : compactId2CompactInfoMap.keySet()) {
+ if (!currentToCleanSet.contains(id)) {
+ cleanPerformedByOthers.add(id);
+ }
+ }
+ for (long id : cleanPerformedByOthers) {
+ compactId2CompactInfoMap.remove(id);
+ compactId2LockMap.remove(id);
+ }
+ }
          if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
            ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());

@@ -119,6 +147,7 @@ public class Cleaner extends CompactorThread {
                  // Remember to remove this when we're out of the loop,
                  // we can't do it in the loop or we'll get a concurrent modification exception.
                  compactionsCleaned.add(queueEntry.getKey());
+ //Future thought: this may be expensive so consider having a thread pool run in parallel
                  clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
                } else {
                  // Remove the locks we didn't see so we don't look for them again next time
@@ -140,6 +169,11 @@ public class Cleaner extends CompactorThread {
          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
              StringUtils.stringifyException(t));
        }
+ finally {
+ if (handle != null) {
+ handle.releaseLocks();
+ }
+ }
        if (setLooped) {
          looped.set(true);
        }
@@ -206,10 +240,24 @@ public class Cleaner extends CompactorThread {
        StorageDescriptor sd = resolveStorageDescriptor(t, p);
        final String location = sd.getLocation();

- // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
- // transactions. This assures that all deltas are treated as valid and all we return are
- // obsolete files.
- final ValidTxnList txnList = new ValidReadTxnList();
+ /**
+ * Each Compaction only compacts as far as the highest txn id such that all txns below it
+ * are resolved (i.e. not opened). This is what "highestTxnId" tracks. This is only tracked
+ * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorTxnList and uses for more info.
+ *
+ * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from
+ * under an active reader.
+ *
+ * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a
+ * clean request for D2.
+ * Cleaner checks existing locks and finds none.
+ * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction
+ * completes which creates D4.
+ * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete
+ * unless ValidTxnList is "capped" at highestTxnId.
+ */
+ final ValidTxnList txnList = ci.highestTxnId > 0 ?
+ new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList();

        if (runJobAsSelf(ci.runAs)) {
          removeFiles(location, txnList);
@@ -249,7 +297,7 @@ public class Cleaner extends CompactorThread {
      FileSystem fs = filesToDelete.get(0).getFileSystem(conf);

      for (Path dead : filesToDelete) {
- LOG.debug("Doing to delete path " + dead.toString());
+ LOG.debug("Going to delete path " + dead.toString());
        fs.delete(dead, true);
      }
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 3e22548..916d9dc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -74,11 +74,15 @@ public class Initiator extends CompactorThread {
        // much easier. The stop value is only for testing anyway and not used when called from
        // HiveMetaStore.
        do {
- long startedAt = System.currentTimeMillis();
+ long startedAt = -1;
+ TxnStore.MutexAPI.LockHandle handle = null;

          // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
          // don't doom the entire thread.
- try {//todo: add method to only get current i.e. skip history - more efficient
+ try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+ startedAt = System.currentTimeMillis();
+ //todo: add method to only get current i.e. skip history - more efficient
            ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
            ValidTxnList txns =
                TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -114,6 +118,8 @@ public class Initiator extends CompactorThread {
                // Check if we already have initiated or are working on a compaction for this partition
                // or table. If so, skip it. If we are just waiting on cleaning we can still check,
                // as it may be time to compact again even though we haven't cleaned.
+ //todo: this is not robust. You can easily run Alter Table to start a compaction between
+ //the time currentCompactions is generated and now
                if (lookForCurrentCompactions(currentCompactions, ci)) {
                  LOG.debug("Found currently initiated or working compaction for " +
                      ci.getFullPartitionName() + " so we will not initiate another compaction");
@@ -134,7 +140,9 @@ public class Initiator extends CompactorThread {
                }
                StorageDescriptor sd = resolveStorageDescriptor(t, p);
                String runAs = findUserToRunAs(sd.getLocation(), t);
-
+ /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
+ * Long term we should consider having a thread pool here and running checkForCompactionS
+ * in parallel*/
                CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
                if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
              } catch (Throwable t) {
@@ -154,6 +162,11 @@ public class Initiator extends CompactorThread {
            LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
                StringUtils.stringifyException(t));
          }
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }

          long elapsedTime = System.currentTimeMillis() - startedAt;
          if (elapsedTime >= checkInterval || stop.get()) continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 9b00435..0786c21 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -229,7 +229,6 @@ public class TestTxnCommands2 {
      }
      Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
    }
- @Ignore("alter table")
    @Test
    public void testAlterTable() throws Exception {
      int[][] tableData = {{1,2}};
@@ -604,7 +603,13 @@ public class TestTxnCommands2 {
    private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
      int lastCount = houseKeeperService.getIsAliveCounter();
      houseKeeperService.start(conf);
+ int maxIter = 10;
+ int iterCount = 0;
      while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ if(iterCount++ >= maxIter) {
+ //prevent test hangs
+ throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
+ }
        try {
          Thread.sleep(100);//make sure it has run at least once
        }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f65fabc/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index a3bf9d3..3a6e76e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -189,7 +189,13 @@ public class TestDbTxnManager {
    private void runReaper() throws Exception {
      int lastCount = houseKeeperService.getIsAliveCounter();
      houseKeeperService.start(conf);
+ int maxIter = 10;
+ int iterCount = 0;
      while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ if(iterCount++ >= maxIter) {
+ //prevent test hangs
+ throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits");
+ }
        try {
          Thread.sleep(100);//make sure it has run at least once
        }

Search Discussions

  • Ekoifman at Mar 24, 2016 at 11:28 pm
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 db2efe42a -> 178708231


    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    index 5545574..cac4623 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    @@ -20,15 +20,31 @@ package org.apache.hadoop.hive.ql.txn.compactor;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.conf.Configuration;
    -import org.apache.hadoop.fs.*;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.common.ValidTxnList;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
      import org.apache.hadoop.hive.metastore.IMetaStoreClient;
    -import org.apache.hadoop.hive.metastore.MetaStoreThread;
    -import org.apache.hadoop.hive.metastore.api.*;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
    +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
    +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
    +import org.apache.hadoop.hive.metastore.api.FieldSchema;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
    +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
    +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
    +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
    +import org.apache.hadoop.hive.metastore.api.Order;
    +import org.apache.hadoop.hive.metastore.api.Partition;
    +import org.apache.hadoop.hive.metastore.api.SerDeInfo;
    +import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
    +import org.apache.hadoop.hive.metastore.api.Table;
    +import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.io.AcidInputFormat;
      import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
      import org.apache.hadoop.hive.ql.io.AcidUtils;
    @@ -39,7 +55,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.Writable;
      import org.apache.hadoop.io.WritableComparable;
    -import org.apache.hadoop.mapred.*;
    +import org.apache.hadoop.mapred.InputSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapred.RecordReader;
    +import org.apache.hadoop.mapred.RecordWriter;
    +import org.apache.hadoop.mapred.Reporter;
      import org.apache.hadoop.util.Progressable;
      import org.apache.thrift.TException;

    @@ -62,7 +82,7 @@ public abstract class CompactorTest {
        static final private String CLASS_NAME = CompactorTest.class.getName();
        static final private Log LOG = LogFactory.getLog(CLASS_NAME);

    - protected CompactionTxnHandler txnHandler;
    + protected TxnStore txnHandler;
        protected IMetaStoreClient ms;
        protected long sleepTime = 1000;
        protected HiveConf conf;
    @@ -75,7 +95,7 @@ public abstract class CompactorTest {
          TxnDbUtil.setConfValues(conf);
          TxnDbUtil.cleanDb();
          ms = new HiveMetaStoreClient(conf);
    - txnHandler = new CompactionTxnHandler(conf);
    + txnHandler = TxnUtils.getTxnStore(conf);
          tmpdir = new File(System.getProperty("java.io.tmpdir") +
              System.getProperty("file.separator") + "compactor_test_tables");
          tmpdir.mkdir();

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    index 913c8bc..17634f0 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
    @@ -17,7 +17,6 @@
       */
      package org.apache.hadoop.hive.ql.txn.compactor;

    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.junit.Assert;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
    @@ -25,6 +24,7 @@ import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.metastore.api.*;
      import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
      import org.junit.Test;

      import java.util.ArrayList;
    @@ -73,7 +73,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, null);
    @@ -105,7 +105,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -135,7 +135,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, null);
    @@ -174,7 +174,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -329,7 +329,7 @@ public class TestCleaner extends CompactorTest {
          rsp = txnHandler.showCompact(new ShowCompactRequest());
          compacts = rsp.getCompacts();
          Assert.assertEquals(1, compacts.size());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -403,7 +403,7 @@ public class TestCleaner extends CompactorTest {
          rsp = txnHandler.showCompact(new ShowCompactRequest());
          compacts = rsp.getCompacts();
          Assert.assertEquals(1, compacts.size());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -429,7 +429,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));

          // Check that the files are removed
          List<Path> paths = getDirectories(conf, t, p);
    @@ -460,7 +460,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }

        @Test
    @@ -488,7 +488,7 @@ public class TestCleaner extends CompactorTest {
          // Check there are no compactions requests left.
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          Assert.assertEquals(1, rsp.getCompactsSize());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }
        @Override
        boolean useHive130DeltaDirName() {

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    index 07f477d..f84bd7e 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    @@ -17,13 +17,12 @@
       */
      package org.apache.hadoop.hive.ql.txn.compactor;

    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.junit.Assert;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.metastore.api.*;
    -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
      import org.junit.Before;
      import org.junit.Test;

    @@ -205,12 +204,12 @@ public class TestInitiator extends CompactorTest {
          LockResponse res = txnHandler.lock(req);
          txnHandler.abortTxn(new AbortTxnRequest(txnid));

    - for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
    + for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
            txnid = openTxn();
            txnHandler.abortTxn(new AbortTxnRequest(txnid));
          }
          GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
    - Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
    + Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());

          startInitiator();


    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    index 8862402..381eeb3 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
    @@ -22,13 +22,18 @@ import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.fs.*;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.metastore.api.*;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
      import org.apache.hadoop.hive.ql.io.AcidUtils;
      import org.junit.Assert;
      import org.junit.Before;
      import org.junit.Test;

    -import java.io.*;
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInput;
    +import java.io.DataInputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.BitSet;
    @@ -935,7 +940,7 @@ public class TestWorker extends CompactorTest {
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(1, compacts.size());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
        }

        @Test
    @@ -960,6 +965,6 @@ public class TestWorker extends CompactorTest {
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(1, compacts.size());
    - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
    + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
        }
      }
  • Ekoifman at Mar 24, 2016 at 11:28 pm
    HIVE-13344 - port HIVE-12902 to 1.x line (Eugene Koifman, reviewed by Wei Zheng)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c8295051
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c8295051
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c8295051

    Branch: refs/heads/branch-1
    Commit: c8295051cc26577dcc1eb17709d4ffc0f9784c5b
    Parents: db2efe4
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Thu Mar 24 16:21:07 2016 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Thu Mar 24 16:21:07 2016 -0700

    ----------------------------------------------------------------------
      .../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
      .../hive/ql/txn/compactor/TestCompactor.java | 25 +-
      .../hive/metastore/AcidEventListener.java | 38 +-
      .../hadoop/hive/metastore/HiveMetaStore.java | 14 +-
      .../hive/metastore/HiveMetaStoreClient.java | 6 +-
      .../metastore/txn/CompactionTxnHandler.java | 47 +--
      .../hadoop/hive/metastore/txn/TxnHandler.java | 127 +------
      .../hadoop/hive/metastore/txn/TxnStore.java | 364 +++++++++++++++++++
      .../hadoop/hive/metastore/txn/TxnUtils.java | 209 +++++++++++
      .../metastore/txn/TestCompactionTxnHandler.java | 5 +-
      .../hive/metastore/txn/TestTxnHandler.java | 183 +++++-----
      .../metastore/txn/TestTxnHandlerNegative.java | 2 +-
      .../ql/txn/AcidCompactionHistoryService.java | 16 +-
      .../hive/ql/txn/AcidHouseKeeperService.java | 13 +-
      .../hive/ql/txn/compactor/CompactorThread.java | 7 +-
      .../hadoop/hive/ql/txn/compactor/Initiator.java | 8 +-
      .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +-
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +-
      .../hive/ql/lockmgr/TestDbTxnManager.java | 12 +-
      .../hive/ql/txn/compactor/CompactorTest.java | 34 +-
      .../hive/ql/txn/compactor/TestCleaner.java | 20 +-
      .../hive/ql/txn/compactor/TestInitiator.java | 7 +-
      .../hive/ql/txn/compactor/TestWorker.java | 13 +-
      23 files changed, 847 insertions(+), 337 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    ----------------------------------------------------------------------
    diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    index b78bea2..f84c940 100644
    --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    @@ -39,6 +39,7 @@ import java.util.regex.Pattern;

      import javax.security.auth.login.LoginException;

    +import com.google.common.base.Joiner;
      import org.apache.commons.lang.StringUtils;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
    @@ -56,7 +57,6 @@ import org.apache.hadoop.security.UserGroupInformation;
      import org.apache.hadoop.util.Shell;
      import org.apache.hive.common.HiveCompat;

    -import com.google.common.base.Joiner;

      /**
       * Hive Configuration.
    @@ -607,6 +607,10 @@ public class HiveConf extends Configuration {
          METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
              "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" +
              "This class is used to store and retrieval of raw metadata objects such as table, database"),
    + METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl",
    + "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler",
    + "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " +
    + "class is used to store and retrieve transactions and locks"),
          METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver",
              "Driver class name for a JDBC metastore"),
          METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass",

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    ----------------------------------------------------------------------
    diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    index 9c0f374..37bbab8 100644
    --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    @@ -8,7 +8,6 @@ import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.cli.CliSessionState;
      import org.apache.hadoop.hive.common.ValidTxnList;
      import org.apache.hadoop.hive.conf.HiveConf;
    -import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
      import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
      import org.apache.hadoop.hive.metastore.IMetaStoreClient;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
    @@ -20,10 +19,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
      import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
      import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
      import org.apache.hadoop.hive.metastore.api.Table;
    -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
      import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.CommandNeedRetryException;
      import org.apache.hadoop.hive.ql.Driver;
      import org.apache.hadoop.hive.ql.io.AcidInputFormat;
    @@ -187,7 +186,7 @@ public class TestCompactor {
          initiator.init(stop, new AtomicBoolean());
          initiator.run();

    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(4, compacts.size());
    @@ -290,7 +289,7 @@ public class TestCompactor {
          initiator.init(stop, new AtomicBoolean());
          initiator.run();

    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(4, compacts.size());
    @@ -363,7 +362,7 @@ public class TestCompactor {
          execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
            tblName + " after load:");

    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
          CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
          LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
          Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
    @@ -498,7 +497,7 @@ public class TestCompactor {
          initiator.init(stop, new AtomicBoolean());
          initiator.run();

    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(2, compacts.size());
    @@ -538,7 +537,7 @@ public class TestCompactor {
          initiator.init(stop, new AtomicBoolean());
          initiator.run();

    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(2, compacts.size());
    @@ -580,7 +579,7 @@ public class TestCompactor {
          initiator.init(stop, new AtomicBoolean());
          initiator.run();

    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
          ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
          List<ShowCompactResponseElement> compacts = rsp.getCompacts();
          Assert.assertEquals(1, compacts.size());
    @@ -620,7 +619,7 @@ public class TestCompactor {
            writeBatch(connection, writer, true);

            // Now, compact
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
            txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
            Worker t = new Worker();
            t.setThreadId((int) t.getId());
    @@ -682,7 +681,7 @@ public class TestCompactor {
            writeBatch(connection, writer, true);

            // Now, compact
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
            txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
            Worker t = new Worker();
            t.setThreadId((int) t.getId());
    @@ -738,7 +737,7 @@ public class TestCompactor {
            txnBatch.abort();

            // Now, compact
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
            txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
            Worker t = new Worker();
            t.setThreadId((int) t.getId());
    @@ -804,7 +803,7 @@ public class TestCompactor {


            // Now, compact
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(conf);
            txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
            Worker t = new Worker();
            t.setThreadId((int) t.getId());

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
    index 767bc54..b241e9e 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
    @@ -25,7 +25,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
      import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
      import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
      import org.apache.hadoop.hive.metastore.events.DropTableEvent;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;


      /**
    @@ -33,7 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnHandler;
       */
      public class AcidEventListener extends MetaStoreEventListener {

    - private TxnHandler txnHandler;
    + private TxnStore txnHandler;
        private HiveConf hiveConf;

        public AcidEventListener(Configuration configuration) {
    @@ -46,24 +47,47 @@ public class AcidEventListener extends MetaStoreEventListener {
          // We can loop thru all the tables to check if they are ACID first and then perform cleanup,
          // but it's more efficient to unconditionally perform cleanup for the database, especially
          // when there are a lot of tables
    - txnHandler = new TxnHandler(hiveConf);
    + txnHandler = getTxnHandler();
          txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
        }

        @Override
        public void onDropTable(DropTableEvent tableEvent) throws MetaException {
    - if (TxnHandler.isAcidTable(tableEvent.getTable())) {
    - txnHandler = new TxnHandler(hiveConf);
    + if (TxnUtils.isAcidTable(tableEvent.getTable())) {
    + txnHandler = getTxnHandler();
            txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
          }
        }

        @Override
        public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
    - if (TxnHandler.isAcidTable(partitionEvent.getTable())) {
    - txnHandler = new TxnHandler(hiveConf);
    + if (TxnUtils.isAcidTable(partitionEvent.getTable())) {
    + txnHandler = getTxnHandler();
            txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
                partitionEvent.getPartitionIterator());
          }
        }
    + private TxnStore getTxnHandler() {
    + boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) ||
    + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
    + String origTxnMgr = null;
    + boolean origConcurrency = false;
    +
    + // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
    + // which may change the values of below two entries, we need to avoid pulluting the original values
    + if (hackOn) {
    + origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
    + origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
    + }
    +
    + txnHandler = TxnUtils.getTxnStore(hiveConf);
    +
    + // Set them back
    + if (hackOn) {
    + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
    + }
    +
    + return txnHandler;
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    index fba545d..bf65532 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
      import com.google.common.collect.ImmutableListMultimap;
      import com.google.common.collect.Lists;
      import com.google.common.collect.Multimaps;
    -
      import org.apache.commons.cli.OptionBuilder;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
    @@ -176,7 +175,8 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap;
      import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
      import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
      import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.serde2.Deserializer;
      import org.apache.hadoop.hive.serde2.SerDeException;
      import org.apache.hadoop.hive.shims.HadoopShims;
    @@ -308,9 +308,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                }
              };

    - private final ThreadLocal<TxnHandler> threadLocalTxn = new ThreadLocal<TxnHandler>() {
    + private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
            @Override
    - protected synchronized TxnHandler initialValue() {
    + protected TxnStore initialValue() {
              return null;
            }
          };
    @@ -584,10 +584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
            return ms;
          }

    - private TxnHandler getTxnHandler() {
    - TxnHandler txn = threadLocalTxn.get();
    + private TxnStore getTxnHandler() {
    + TxnStore txn = threadLocalTxn.get();
            if (txn == null) {
    - txn = new TxnHandler(hiveConf);
    + txn = TxnUtils.getTxnStore(hiveConf);
              threadLocalTxn.set(txn);
            }
            return txn;

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    index 393ef3b..50bf43c 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    @@ -134,7 +134,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
      import org.apache.hadoop.hive.metastore.api.UnknownTableException;
      import org.apache.hadoop.hive.metastore.api.UnlockRequest;
      import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.shims.ShimLoader;
      import org.apache.hadoop.hive.shims.Utils;
      import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
    @@ -1846,12 +1846,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient {

        @Override
        public ValidTxnList getValidTxns() throws TException {
    - return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
    + return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0);
        }

        @Override
        public ValidTxnList getValidTxns(long currentTxn) throws TException {
    - return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn);
    + return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn);
        }

        @Override

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    index 28e06ed..f7c738a 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    @@ -20,27 +20,33 @@ package org.apache.hadoop.hive.metastore.txn;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;

    -import org.apache.hadoop.hive.common.ValidTxnList;
      import org.apache.hadoop.hive.conf.HiveConf;
    -import org.apache.hadoop.hive.metastore.api.*;
    +import org.apache.hadoop.hive.metastore.api.CompactionType;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
      import org.apache.hadoop.util.StringUtils;

    -import java.sql.*;
    -import java.util.*;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;

      /**
       * Extends the transaction handler with methods needed only by the compactor threads. These
       * methods are not available through the thrift interface.
       */
    -public class CompactionTxnHandler extends TxnHandler {
    +class CompactionTxnHandler extends TxnHandler {
        static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
        static final private Log LOG = LogFactory.getLog(CLASS_NAME);

        // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
        // See TxnHandler for notes on how to deal with deadlocks. Follow those notes.

    - public CompactionTxnHandler(HiveConf conf) {
    - super(conf);
    + public CompactionTxnHandler() {
        }

        /**
    @@ -385,7 +391,7 @@ public class CompactionTxnHandler extends TxnHandler {
                }

                // Populate the complete query with provided prefix and suffix
    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);

                for (String query : queries) {
                  LOG.debug("Going to execute update <" + query + ">");
    @@ -450,7 +456,7 @@ public class CompactionTxnHandler extends TxnHandler {
              prefix.append("delete from TXNS where ");
              suffix.append("");

    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);

              for (String query : queries) {
                LOG.debug("Going to execute update <" + query + ">");
    @@ -620,27 +626,6 @@ public class CompactionTxnHandler extends TxnHandler {
        }

        /**
    - * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
    - * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
    - * compact the files, and thus treats only open transactions as invalid. Additionally any
    - * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
    - * delta_17_120 where txnId 80, for example, is still open.
    - * @param txns txn list from the metastore
    - * @return a valid txn list.
    - */
    - public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
    - long highWater = txns.getTxn_high_water_mark();
    - long minOpenTxn = Long.MAX_VALUE;
    - long[] exceptions = new long[txns.getOpen_txnsSize()];
    - int i = 0;
    - for (TxnInfo txn : txns.getOpen_txns()) {
    - if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
    - exceptions[i++] = txn.getId();
    - }
    - highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
    - return new ValidCompactorTxnList(exceptions, -1, highWater);
    - }
    - /**
         * Record the highest txn id that the {@code ci} compaction job will pay attention to.
         */
        public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
    @@ -746,7 +731,7 @@ public class CompactionTxnHandler extends TxnHandler {
              prefix.append("delete from COMPLETED_COMPACTIONS where ");
              suffix.append("");

    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);

              for (String query : queries) {
                LOG.debug("Going to execute update <" + query + ">");

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    index 0ddc078..9789371 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    @@ -87,14 +87,7 @@ import java.util.concurrent.locks.ReentrantLock;
       */
      @InterfaceAudience.Private
      @InterfaceStability.Evolving
    -public class TxnHandler {
    - // Compactor states (Should really be enum)
    - static final public String INITIATED_RESPONSE = "initiated";
    - static final public String WORKING_RESPONSE = "working";
    - static final public String CLEANING_RESPONSE = "ready for cleaning";
    - static final public String FAILED_RESPONSE = "failed";
    - static final public String SUCCEEDED_RESPONSE = "succeeded";
    - static final public String ATTEMPTED_RESPONSE = "attempted";
    +abstract class TxnHandler implements TxnStore {

        static final protected char INITIATED_STATE = 'i';
        static final protected char WORKING_STATE = 'w';
    @@ -131,7 +124,7 @@ public class TxnHandler {
         * Number of consecutive deadlocks we have seen
         */
        private int deadlockCnt;
    - private final long deadlockRetryInterval;
    + private long deadlockRetryInterval;
        protected HiveConf conf;
        protected DatabaseProduct dbProduct;

    @@ -139,8 +132,8 @@ public class TxnHandler {
        private long timeout;

        private String identifierQuoteString; // quotes to use for quoting tables, where necessary
    - private final long retryInterval;
    - private final int retryLimit;
    + private long retryInterval;
    + private int retryLimit;
        private int retryNum;
        /**
         * Derby specific concurrency control
    @@ -157,7 +150,10 @@ public class TxnHandler {
        // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction,
        // and then they should catch RetryException and call themselves recursively. See commitTxn for an example.

    - public TxnHandler(HiveConf conf) {
    + public TxnHandler() {
    + }
    +
    + public void setConf(HiveConf conf) {
          this.conf = conf;

          checkQFileTestHack();
    @@ -183,7 +179,6 @@ public class TxnHandler {
              TimeUnit.MILLISECONDS);
          retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
          deadlockRetryInterval = retryInterval / 10;
    -
        }

        public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
    @@ -1211,6 +1206,7 @@ public class TxnHandler {
         * Clean up corresponding records in metastore tables, specifically:
         * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
         */
    + @Override
        public void cleanupRecords(HiveObjectType type, Database db, Table table,
                                   Iterator<Partition> partitionIterator) throws MetaException {
          try {
    @@ -1386,106 +1382,11 @@ public class TxnHandler {
          String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
          return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
        }
    -
    - /**
    - * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
    - * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
    - * For NOT IN case, NOT IN list is broken into multiple AND clauses.
    - * @param queries array of complete query strings
    - * @param prefix part of the query that comes before IN list
    - * @param suffix part of the query that comes after IN list
    - * @param inList the list containing IN list values
    - * @param inColumn column name of IN list operator
    - * @param addParens add a pair of parenthesis outside the IN lists
    - * e.g. ( id in (1,2,3) OR id in (4,5,6) )
    - * @param notIn clause to be broken up is NOT IN
    - */
    - public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
    - StringBuilder suffix, List<Long> inList,
    - String inColumn, boolean addParens, boolean notIn) {
    - int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
    - int numWholeBatches = inList.size() / batchSize;
    - StringBuilder buf = new StringBuilder();
    - buf.append(prefix);
    - if (addParens) {
    - buf.append("(");
    - }
    - buf.append(inColumn);
    - if (notIn) {
    - buf.append(" not in (");
    - } else {
    - buf.append(" in (");
    - }
    -
    - for (int i = 0; i <= numWholeBatches; i++) {
    - if (needNewQuery(conf, buf)) {
    - // Wrap up current query string
    - if (addParens) {
    - buf.append(")");
    - }
    - buf.append(suffix);
    - queries.add(buf.toString());
    -
    - // Prepare a new query string
    - buf.setLength(0);
    - }
    -
    - if (i > 0) {
    - if (notIn) {
    - if (buf.length() == 0) {
    - buf.append(prefix);
    - if (addParens) {
    - buf.append("(");
    - }
    - } else {
    - buf.append(" and ");
    - }
    - buf.append(inColumn);
    - buf.append(" not in (");
    - } else {
    - if (buf.length() == 0) {
    - buf.append(prefix);
    - if (addParens) {
    - buf.append("(");
    - }
    - } else {
    - buf.append(" or ");
    - }
    - buf.append(inColumn);
    - buf.append(" in (");
    - }
    - }
    -
    - if (i * batchSize == inList.size()) {
    - // At this point we just realized we don't need another query
    - return;
    - }
    - for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
    - buf.append(inList.get(j)).append(",");
    - }
    - buf.setCharAt(buf.length() - 1, ')');
    - }
    -
    - if (addParens) {
    - buf.append(")");
    - }
    - buf.append(suffix);
    - queries.add(buf.toString());
    - }
    -
    - /** Estimate if the size of a string will exceed certain limit */
    - private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
    - int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
    - // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
    - long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
    - return sizeInBytes / 1024 > queryMemoryLimit;
    - }
    -
        /**
         * For testing only, do not use.
         */
        @VisibleForTesting
    - int numLocksInLockTable() throws SQLException, MetaException {
    + public int numLocksInLockTable() throws SQLException, MetaException {
          Connection dbConn = null;
          Statement stmt = null;
          ResultSet rs = null;
    @@ -1508,7 +1409,7 @@ public class TxnHandler {
        /**
         * For testing only, do not use.
         */
    - long setTimeout(long milliseconds) {
    + public long setTimeout(long milliseconds) {
          long previous_timeout = timeout;
          timeout = milliseconds;
          return previous_timeout;
    @@ -1975,7 +1876,7 @@ public class TxnHandler {
              suffix.append("");
            }

    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);

            for (String query : queries) {
              LOG.debug("Going to execute update <" + query + ">");
    @@ -1998,7 +1899,7 @@ public class TxnHandler {
            prefix.append("delete from HIVE_LOCKS where ");
            suffix.append("");

    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);

            for (String query : queries) {
              LOG.debug("Going to execute update <" + query + ">");
    @@ -2435,7 +2336,7 @@ public class TxnHandler {
            prefix.append(" and hl_txnid = 0 and ");
            suffix.append("");

    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);

            int deletedLocks = 0;
            for (String query : queries) {

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    new file mode 100644
    index 0000000..6fc6ed9
    --- /dev/null
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    @@ -0,0 +1,364 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements. See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.metastore.txn;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.hadoop.hive.common.classification.InterfaceAudience;
    +import org.apache.hadoop.hive.common.classification.InterfaceStability;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
    +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
    +import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
    +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
    +import org.apache.hadoop.hive.metastore.api.CompactionRequest;
    +import org.apache.hadoop.hive.metastore.api.Database;
    +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
    +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
    +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
    +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
    +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
    +import org.apache.hadoop.hive.metastore.api.HiveObjectType;
    +import org.apache.hadoop.hive.metastore.api.LockRequest;
    +import org.apache.hadoop.hive.metastore.api.LockResponse;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
    +import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
    +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
    +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
    +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
    +import org.apache.hadoop.hive.metastore.api.Partition;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
    +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
    +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
    +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
    +import org.apache.hadoop.hive.metastore.api.Table;
    +import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
    +import org.apache.hadoop.hive.metastore.api.TxnOpenException;
    +import org.apache.hadoop.hive.metastore.api.UnlockRequest;
    +
    +import java.sql.SQLException;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * A handler to answer transaction related calls that come into the metastore
    + * server.
    + *
    + * Note on log messages: Please include txnid:X and lockid info using
    + * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
    + * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
    + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
    + * so keeping the format consistent makes grep'ing the logs much easier.
    + *
    + * Note on HIVE_LOCKS.hl_last_heartbeat.
    + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
    + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
    + * transaction in TXNS.
    + */
    +@interfaceaudience.private
    +@interfacestability.evolving
    +public interface TxnStore {
    +
    + // Compactor states (Should really be enum)
    + static final public String INITIATED_RESPONSE = "initiated";
    + static final public String WORKING_RESPONSE = "working";
    + static final public String CLEANING_RESPONSE = "ready for cleaning";
    + static final public String FAILED_RESPONSE = "failed";
    + static final public String SUCCEEDED_RESPONSE = "succeeded";
    + static final public String ATTEMPTED_RESPONSE = "attempted";
    +
    + public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
    +
    + public void setConf(HiveConf conf);
    +
    + /**
    + * Get information about open transactions. This gives extensive information about the
    + * transactions rather than just the list of transactions. This should be used when the need
    + * is to see information about the transactions (e.g. show transactions).
    + * @return information about open transactions
    + * @throws MetaException
    + */
    + public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
    +
    + /**
    + * Get list of valid transactions. This gives just the list of transactions that are open.
    + * @return list of open transactions, as well as a high water mark.
    + * @throws MetaException
    + */
    + public GetOpenTxnsResponse getOpenTxns() throws MetaException;
    +
    + /**
    + * Open a set of transactions
    + * @param rqst request to open transactions
    + * @return information on opened transactions
    + * @throws MetaException
    + */
    + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
    +
    + /**
    + * Abort (rollback) a transaction.
    + * @param rqst info on transaction to abort
    + * @throws NoSuchTxnException
    + * @throws MetaException
    + */
    + public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
    +
    + /**
    + * Commit a transaction
    + * @param rqst info on transaction to commit
    + * @throws NoSuchTxnException
    + * @throws TxnAbortedException
    + * @throws MetaException
    + */
    + public void commitTxn(CommitTxnRequest rqst)
    + throws NoSuchTxnException, TxnAbortedException, MetaException;
    +
    + /**
    + * Obtain a lock.
    + * @param rqst information on the lock to obtain. If the requester is part of a transaction
    + * the txn information must be included in the lock request.
    + * @return info on the lock, including whether it was obtained.
    + * @throws NoSuchTxnException
    + * @throws TxnAbortedException
    + * @throws MetaException
    + */
    + public LockResponse lock(LockRequest rqst)
    + throws NoSuchTxnException, TxnAbortedException, MetaException;
    +
    + /**
    + * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait
    + * state.
    + * @param rqst info on the lock to check
    + * @return info on the state of the lock
    + * @throws NoSuchTxnException
    + * @throws NoSuchLockException
    + * @throws TxnAbortedException
    + * @throws MetaException
    + */
    + public LockResponse checkLock(CheckLockRequest rqst)
    + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
    +
    + /**
    + * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case
    + * the txn should be committed or aborted instead. (Note someday this will change since
    + * multi-statement transactions will allow unlocking in the transaction.)
    + * @param rqst lock to unlock
    + * @throws NoSuchLockException
    + * @throws TxnOpenException
    + * @throws MetaException
    + */
    + public void unlock(UnlockRequest rqst)
    + throws NoSuchLockException, TxnOpenException, MetaException;
    +
    + /**
    + * Get information on current locks.
    + * @param rqst lock information to retrieve
    + * @return lock information.
    + * @throws MetaException
    + */
    + public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
    +
    + /**
    + * Send a heartbeat for a lock or a transaction
    + * @param ids lock and/or txn id to heartbeat
    + * @throws NoSuchTxnException
    + * @throws NoSuchLockException
    + * @throws TxnAbortedException
    + * @throws MetaException
    + */
    + public void heartbeat(HeartbeatRequest ids)
    + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
    +
    + /**
    + * Heartbeat a group of transactions together
    + * @param rqst set of transactions to heartbat
    + * @return info on txns that were heartbeated
    + * @throws MetaException
    + */
    + public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
    + throws MetaException;
    +
    + /**
    + * Submit a compaction request into the queue. This is called when a user manually requests a
    + * compaction.
    + * @param rqst information on what to compact
    + * @return id of the compaction that has been started
    + * @throws MetaException
    + */
    + public long compact(CompactionRequest rqst) throws MetaException;
    +
    + /**
    + * Show list of current compactions
    + * @param rqst info on which compactions to show
    + * @return compaction information
    + * @throws MetaException
    + */
    + public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
    +
    + /**
    + * Add information on a set of dynamic partitions that participated in a transaction.
    + * @param rqst dynamic partition info.
    + * @throws NoSuchTxnException
    + * @throws TxnAbortedException
    + * @throws MetaException
    + */
    + public void addDynamicPartitions(AddDynamicPartitions rqst)
    + throws NoSuchTxnException, TxnAbortedException, MetaException;
    +
    + /**
    + * Clean up corresponding records in metastore tables
    + * @param type Hive object type
    + * @param db database object
    + * @param table table object
    + * @param partitionIterator partition iterator
    + * @throws MetaException
    + */
    + public void cleanupRecords(HiveObjectType type, Database db, Table table,
    + Iterator<Partition> partitionIterator) throws MetaException;
    + /**
    + * Timeout transactions and/or locks. This should only be called by the compactor.
    + */
    + public void performTimeOuts();
    +
    + /**
    + * This will look through the completed_txn_components table and look for partitions or tables
    + * that may be ready for compaction. Also, look through txns and txn_components tables for
    + * aborted transactions that we should add to the list.
    + * @param maxAborted Maximum number of aborted queries to allow before marking this as a
    + * potential compaction.
    + * @return list of CompactionInfo structs. These will not have id, type,
    + * or runAs set since these are only potential compactions not actual ones.
    + */
    + public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
    +
    + /**
    + * Sets the user to run as. This is for the case
    + * where the request was generated by the user and so the worker must set this value later.
    + * @param cq_id id of this entry in the queue
    + * @param user user to run the jobs as
    + */
    + public void setRunAs(long cq_id, String user) throws MetaException;
    +
    + /**
    + * This will grab the next compaction request off of
    + * the queue, and assign it to the worker.
    + * @param workerId id of the worker calling this, will be recorded in the db
    + * @return an info element for this compaction request, or null if there is no work to do now.
    + */
    + public CompactionInfo findNextToCompact(String workerId) throws MetaException;
    +
    + /**
    + * This will mark an entry in the queue as compacted
    + * and put it in the ready to clean state.
    + * @param info info on the compaction entry to mark as compacted.
    + */
    + public void markCompacted(CompactionInfo info) throws MetaException;
    +
    + /**
    + * Find entries in the queue that are ready to
    + * be cleaned.
    + * @return information on the entry in the queue.
    + */
    + public List<CompactionInfo> findReadyToClean() throws MetaException;
    +
    + /**
    + * This will remove an entry from the queue after
    + * it has been compacted.
    + *
    + * @param info info on the compaction entry to remove
    + */
    + public void markCleaned(CompactionInfo info) throws MetaException;
    +
    + /**
    + * Mark a compaction entry as failed. This will move it to the compaction history queue with a
    + * failed status. It will NOT clean up aborted transactions in the table/partition associated
    + * with this compaction.
    + * @param info information on the compaction that failed.
    + * @throws MetaException
    + */
    + public void markFailed(CompactionInfo info) throws MetaException;
    +
    + /**
    + * Clean up aborted transactions from txns that have no components in txn_components. The reson such
    + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
    + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
    + */
    + public void cleanEmptyAbortedTxns() throws MetaException;
    +
    + /**
    + * This will take all entries assigned to workers
    + * on a host return them to INITIATED state. The initiator should use this at start up to
    + * clean entries from any workers that were in the middle of compacting when the metastore
    + * shutdown. It does not reset entries from worker threads on other hosts as those may still
    + * be working.
    + * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
    + * so that like hostname% will match the worker id.
    + */
    + public void revokeFromLocalWorkers(String hostname) throws MetaException;
    +
    + /**
    + * This call will return all compaction queue
    + * entries assigned to a worker but over the timeout back to the initiated state.
    + * This should be called by the initiator on start up and occasionally when running to clean up
    + * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
    + * first.
    + * @param timeout number of milliseconds since start time that should elapse before a worker is
    + * declared dead.
    + */
    + public void revokeTimedoutWorkers(long timeout) throws MetaException;
    +
    + /**
    + * Queries metastore DB directly to find columns in the table which have statistics information.
    + * If {@code ci} includes partition info then per partition stats info is examined, otherwise
    + * table level stats are examined.
    + * @throws MetaException
    + */
    + public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
    +
    + /**
    + * Record the highest txn id that the {@code ci} compaction job will pay attention to.
    + */
    + public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
    +
    + /**
    + * For any given compactable entity (partition, table if not partitioned) the history of compactions
    + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
    + * history such that a configurable number of each type of state is present. Any other entries
    + * can be purged. This scheme has advantage of always retaining the last failure/success even if
    + * it's not recent.
    + * @throws MetaException
    + */
    + public void purgeCompactionHistory() throws MetaException;
    +
    + /**
    + * Determine if there are enough consecutive failures compacting a table or partition that no
    + * new automatic compactions should be scheduled. User initiated compactions do not do this
    + * check.
    + * @param ci Table or partition to check.
    + * @return true if it is ok to compact, false if there have been too many failures.
    + * @throws MetaException
    + */
    + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
    +
    +
    + @VisibleForTesting
    + public int numLocksInLockTable() throws SQLException, MetaException;
    +
    + @VisibleForTesting
    + long setTimeout(long milliseconds);
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    new file mode 100644
    index 0000000..f60e34b
    --- /dev/null
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements. See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License. You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.metastore.txn;
    +
    +import org.apache.hadoop.hive.common.ValidReadTxnList;
    +import org.apache.hadoop.hive.common.ValidTxnList;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
    +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
    +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
    +import org.apache.hadoop.hive.metastore.api.Table;
    +import org.apache.hadoop.hive.metastore.api.TxnInfo;
    +import org.apache.hadoop.hive.metastore.api.TxnState;
    +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +public class TxnUtils {
    + private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
    +
    + /**
    + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
    + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
    + * read the files, and thus treats both open and aborted transactions as invalid.
    + * @param txns txn list from the metastore
    + * @param currentTxn Current transaction that the user has open. If this is greater than 0 it
    + * will be removed from the exceptions list so that the user sees his own
    + * transaction as valid.
    + * @return a valid txn list.
    + */
    + public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
    + long highWater = txns.getTxn_high_water_mark();
    + Set<Long> open = txns.getOpen_txns();
    + long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
    + int i = 0;
    + for(long txn: open) {
    + if (currentTxn > 0 && currentTxn == txn) continue;
    + exceptions[i++] = txn;
    + }
    + return new ValidReadTxnList(exceptions, highWater);
    + }
    +
    + /**
    + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
    + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
    + * compact the files, and thus treats only open transactions as invalid. Additionally any
    + * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
    + * delta_17_120 where txnId 80, for example, is still open.
    + * @param txns txn list from the metastore
    + * @return a valid txn list.
    + */
    + public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
    + long highWater = txns.getTxn_high_water_mark();
    + long minOpenTxn = Long.MAX_VALUE;
    + long[] exceptions = new long[txns.getOpen_txnsSize()];
    + int i = 0;
    + for (TxnInfo txn : txns.getOpen_txns()) {
    + if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
    + exceptions[i++] = txn.getId();
    + }
    + highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
    + return new ValidCompactorTxnList(exceptions, -1, highWater);
    + }
    +
    + /**
    + * Get an instance of the TxnStore that is appropriate for this store
    + * @param conf configuration
    + * @return txn store
    + */
    + public static TxnStore getTxnStore(HiveConf conf) {
    + String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL);
    + try {
    + TxnStore handler = ((Class<? extends TxnHandler>) MetaStoreUtils.getClass(
    + className)).newInstance();
    + handler.setConf(conf);
    + return handler;
    + } catch (Exception e) {
    + LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
    + throw new RuntimeException(e);
    + }
    + }
    +
    + /** Checks if a table is a valid ACID table.
    + * Note, users are responsible for using the correct TxnManager. We do not look at
    + * SessionState.get().getTxnMgr().supportsAcid() here
    + * @param table table
    + * @return true if table is a legit ACID table, false otherwise
    + */
    + public static boolean isAcidTable(Table table) {
    + if (table == null) {
    + return false;
    + }
    + Map<String, String> parameters = table.getParameters();
    + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
    + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
    + }
    + /**
    + * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
    + * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
    + * For NOT IN case, NOT IN list is broken into multiple AND clauses.
    + * @param queries array of complete query strings
    + * @param prefix part of the query that comes before IN list
    + * @param suffix part of the query that comes after IN list
    + * @param inList the list containing IN list values
    + * @param inColumn column name of IN list operator
    + * @param addParens add a pair of parenthesis outside the IN lists
    + * e.g. ( id in (1,2,3) OR id in (4,5,6) )
    + * @param notIn clause to be broken up is NOT IN
    + */
    + public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
    + StringBuilder suffix, List<Long> inList,
    + String inColumn, boolean addParens, boolean notIn) {
    + int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
    + int numWholeBatches = inList.size() / batchSize;
    + StringBuilder buf = new StringBuilder();
    + buf.append(prefix);
    + if (addParens) {
    + buf.append("(");
    + }
    + buf.append(inColumn);
    + if (notIn) {
    + buf.append(" not in (");
    + } else {
    + buf.append(" in (");
    + }
    +
    + for (int i = 0; i <= numWholeBatches; i++) {
    + if (needNewQuery(conf, buf)) {
    + // Wrap up current query string
    + if (addParens) {
    + buf.append(")");
    + }
    + buf.append(suffix);
    + queries.add(buf.toString());
    +
    + // Prepare a new query string
    + buf.setLength(0);
    + }
    +
    + if (i > 0) {
    + if (notIn) {
    + if (buf.length() == 0) {
    + buf.append(prefix);
    + if (addParens) {
    + buf.append("(");
    + }
    + } else {
    + buf.append(" and ");
    + }
    + buf.append(inColumn);
    + buf.append(" not in (");
    + } else {
    + if (buf.length() == 0) {
    + buf.append(prefix);
    + if (addParens) {
    + buf.append("(");
    + }
    + } else {
    + buf.append(" or ");
    + }
    + buf.append(inColumn);
    + buf.append(" in (");
    + }
    + }
    +
    + if (i * batchSize == inList.size()) {
    + // At this point we just realized we don't need another query
    + return;
    + }
    + for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
    + buf.append(inList.get(j)).append(",");
    + }
    + buf.setCharAt(buf.length() - 1, ')');
    + }
    +
    + if (addParens) {
    + buf.append(")");
    + }
    + buf.append(suffix);
    + queries.add(buf.toString());
    + }
    +
    + /** Estimate if the size of a string will exceed certain limit */
    + private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
    + int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
    + // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
    + long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
    + return sizeInBytes / 1024 > queryMemoryLimit;
    + }
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    index 051da60..bdeacb9 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    @@ -42,8 +42,7 @@ import static junit.framework.Assert.*;
      public class TestCompactionTxnHandler {

        private HiveConf conf = new HiveConf();
    - private CompactionTxnHandler txnHandler;
    - static final private Log LOG = LogFactory.getLog(TestCompactionTxnHandler.class);
    + private TxnStore txnHandler;

        public TestCompactionTxnHandler() throws Exception {
          TxnDbUtil.setConfValues(conf);
    @@ -424,7 +423,7 @@ public class TestCompactionTxnHandler {
        @Before
        public void setUp() throws Exception {
          TxnDbUtil.prepDb();
    - txnHandler = new CompactionTxnHandler(conf);
    + txnHandler = TxnUtils.getTxnStore(conf);
        }

        @After

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    index 930af7c..b8cab71 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    @@ -34,7 +34,11 @@ import java.util.List;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;

    -import static junit.framework.Assert.*;
    +import static junit.framework.Assert.assertEquals;
    +import static junit.framework.Assert.assertFalse;
    +import static junit.framework.Assert.assertNull;
    +import static junit.framework.Assert.assertTrue;
    +import static junit.framework.Assert.fail;

      /**
       * Tests for TxnHandler.
    @@ -44,7 +48,7 @@ public class TestTxnHandler {
        static final private Log LOG = LogFactory.getLog(CLASS_NAME);

        private HiveConf conf = new HiveConf();
    - private TxnHandler txnHandler;
    + private TxnStore txnHandler;

        public TestTxnHandler() throws Exception {
          TxnDbUtil.setConfValues(conf);
    @@ -1111,99 +1115,102 @@ public class TestTxnHandler {
        @Ignore
        public void deadlockDetected() throws Exception {
          LOG.debug("Starting deadlock test");
    - Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    - Statement stmt = conn.createStatement();
    - long now = txnHandler.getDbTime(conn);
    - stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
    - "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
    - "'scooby.com')");
    - stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
    - "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
    - "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
    - txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
    - "'scooby.com')");
    - conn.commit();
    - txnHandler.closeDbConn(conn);
    -
    - final AtomicBoolean sawDeadlock = new AtomicBoolean();
    -
    - final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    - final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    - try {
    + if (txnHandler instanceof TxnHandler) {
    + final TxnHandler tHndlr = (TxnHandler)txnHandler;
    + Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + Statement stmt = conn.createStatement();
    + long now = tHndlr.getDbTime(conn);
    + stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
    + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
    + "'scooby.com')");
    + stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
    + "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
    + "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
    + tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
    + "'scooby.com')");
    + conn.commit();
    + tHndlr.closeDbConn(conn);
    +
    + final AtomicBoolean sawDeadlock = new AtomicBoolean();
    +
    + final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
    + try {

    - for (int i = 0; i < 5; i++) {
    - Thread t1 = new Thread() {
    - @Override
    - public void run() {
    - try {
    + for (int i = 0; i < 5; i++) {
    + Thread t1 = new Thread() {
    + @Override
    + public void run() {
                    try {
    - updateTxns(conn1);
    - updateLocks(conn1);
    - Thread.sleep(1000);
    - conn1.commit();
    - LOG.debug("no exception, no deadlock");
    - } catch (SQLException e) {
                      try {
    - txnHandler.checkRetryable(conn1, e, "thread t1");
    - LOG.debug("Got an exception, but not a deadlock, SQLState is " +
    - e.getSQLState() + " class of exception is " + e.getClass().getName() +
    - " msg is <" + e.getMessage() + ">");
    - } catch (TxnHandler.RetryException de) {
    - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
    - "exception is " + e.getClass().getName() + " msg is <" + e
    - .getMessage() + ">");
    - sawDeadlock.set(true);
    + updateTxns(conn1);
    + updateLocks(conn1);
    + Thread.sleep(1000);
    + conn1.commit();
    + LOG.debug("no exception, no deadlock");
    + } catch (SQLException e) {
    + try {
    + tHndlr.checkRetryable(conn1, e, "thread t1");
    + LOG.debug("Got an exception, but not a deadlock, SQLState is " +
    + e.getSQLState() + " class of exception is " + e.getClass().getName() +
    + " msg is <" + e.getMessage() + ">");
    + } catch (TxnHandler.RetryException de) {
    + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
    + "exception is " + e.getClass().getName() + " msg is <" + e
    + .getMessage() + ">");
    + sawDeadlock.set(true);
    + }
                      }
    + conn1.rollback();
    + } catch (Exception e) {
    + throw new RuntimeException(e);
                    }
    - conn1.rollback();
    - } catch (Exception e) {
    - throw new RuntimeException(e);
                  }
    - }
    - };
    + };

    - Thread t2 = new Thread() {
    - @Override
    - public void run() {
    - try {
    + Thread t2 = new Thread() {
    + @Override
    + public void run() {
                    try {
    - updateLocks(conn2);
    - updateTxns(conn2);
    - Thread.sleep(1000);
    - conn2.commit();
    - LOG.debug("no exception, no deadlock");
    - } catch (SQLException e) {
                      try {
    - txnHandler.checkRetryable(conn2, e, "thread t2");
    - LOG.debug("Got an exception, but not a deadlock, SQLState is " +
    - e.getSQLState() + " class of exception is " + e.getClass().getName() +
    - " msg is <" + e.getMessage() + ">");
    - } catch (TxnHandler.RetryException de) {
    - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
    - "exception is " + e.getClass().getName() + " msg is <" + e
    - .getMessage() + ">");
    - sawDeadlock.set(true);
    + updateLocks(conn2);
    + updateTxns(conn2);
    + Thread.sleep(1000);
    + conn2.commit();
    + LOG.debug("no exception, no deadlock");
    + } catch (SQLException e) {
    + try {
    + tHndlr.checkRetryable(conn2, e, "thread t2");
    + LOG.debug("Got an exception, but not a deadlock, SQLState is " +
    + e.getSQLState() + " class of exception is " + e.getClass().getName() +
    + " msg is <" + e.getMessage() + ">");
    + } catch (TxnHandler.RetryException de) {
    + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
    + "exception is " + e.getClass().getName() + " msg is <" + e
    + .getMessage() + ">");
    + sawDeadlock.set(true);
    + }
                      }
    + conn2.rollback();
    + } catch (Exception e) {
    + throw new RuntimeException(e);
                    }
    - conn2.rollback();
    - } catch (Exception e) {
    - throw new RuntimeException(e);
                  }
    - }
    - };
    -
    - t1.start();
    - t2.start();
    - t1.join();
    - t2.join();
    - if (sawDeadlock.get()) break;
    + };
    +
    + t1.start();
    + t2.start();
    + t1.join();
    + t2.join();
    + if (sawDeadlock.get()) break;
    + }
    + assertTrue(sawDeadlock.get());
    + } finally {
    + conn1.rollback();
    + tHndlr.closeDbConn(conn1);
    + conn2.rollback();
    + tHndlr.closeDbConn(conn2);
            }
    - assertTrue(sawDeadlock.get());
    - } finally {
    - conn1.rollback();
    - txnHandler.closeDbConn(conn1);
    - conn2.rollback();
    - txnHandler.closeDbConn(conn2);
          }
        }

    @@ -1236,7 +1243,7 @@ public class TestTxnHandler {
          for (long i = 1; i <= 200; i++) {
            inList.add(i);
          }
    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
          Assert.assertEquals(1, queries.size());
          runAgainstDerby(queries);

    @@ -1244,7 +1251,7 @@ public class TestTxnHandler {
          // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
          queries.clear();
          inList.add((long)201);
    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
          Assert.assertEquals(2, queries.size());
          runAgainstDerby(queries);

    @@ -1255,13 +1262,13 @@ public class TestTxnHandler {
          for (long i = 202; i <= 4321; i++) {
            inList.add(i);
          }
    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
          Assert.assertEquals(3, queries.size());
          runAgainstDerby(queries);

          // Case 4 - NOT IN list
          queries.clear();
    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
          Assert.assertEquals(3, queries.size());
          runAgainstDerby(queries);

    @@ -1269,7 +1276,7 @@ public class TestTxnHandler {
          queries.clear();
          suffix.setLength(0);
          suffix.append("");
    - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
    + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
          Assert.assertEquals(3, queries.size());
          runAgainstDerby(queries);
        }
    @@ -1297,7 +1304,7 @@ public class TestTxnHandler {
        @Before
        public void setUp() throws Exception {
          TxnDbUtil.prepDb();
    - txnHandler = new TxnHandler(conf);
    + txnHandler = TxnUtils.getTxnStore(conf);
        }

        @After

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
    index abe1e37..6c27515 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
    @@ -37,7 +37,7 @@ public class TestTxnHandlerNegative {
          conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah");
          RuntimeException e = null;
          try {
    - TxnHandler txnHandler1 = new TxnHandler(conf);
    + TxnUtils.getTxnStore(conf);
          }
          catch(RuntimeException ex) {
            LOG.info("Expected error: " + ex.getMessage(), ex);

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    index a91ca5c..59c8fe4 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    @@ -18,20 +18,12 @@
      package org.apache.hadoop.hive.ql.txn;

      import org.apache.hadoop.hive.conf.HiveConf;
    -import org.apache.hadoop.hive.metastore.HouseKeeperService;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
    -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
    -import org.apache.hadoop.hive.ql.metadata.Hive;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;

    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.ScheduledFuture;
    -import java.util.concurrent.ThreadFactory;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;

    @@ -60,10 +52,10 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
        }

        private static final class ObsoleteEntryReaper implements Runnable {
    - private final CompactionTxnHandler txnHandler;
    + private final TxnStore txnHandler;
          private final AtomicInteger isAliveCounter;
          private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
    - txnHandler = new CompactionTxnHandler(hiveConf);
    + txnHandler = TxnUtils.getTxnStore(hiveConf);
            this.isAliveCounter = isAliveCounter;
          }


    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    index 38151fb..de74a7b 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    @@ -20,15 +20,10 @@ package org.apache.hadoop.hive.ql.txn;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.hive.conf.HiveConf;
    -import org.apache.hadoop.hive.metastore.HouseKeeperService;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
    -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;

    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.ThreadFactory;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;

    @@ -58,10 +53,10 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
        }

        private static final class TimedoutTxnReaper implements Runnable {
    - private final TxnHandler txnHandler;
    + private final TxnStore txnHandler;
          private final AtomicInteger isAliveCounter;
          private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
    - txnHandler = new TxnHandler(hiveConf);
    + txnHandler = TxnUtils.getTxnStore(hiveConf);
            this.isAliveCounter = isAliveCounter;
          }
          @Override

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
    index c956f58..ae8865c 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
    @@ -31,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.Partition;
      import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
      import org.apache.hadoop.hive.metastore.api.Table;
      import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.security.AccessControlException;
      import org.apache.hadoop.security.UserGroupInformation;

    @@ -50,7 +51,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
        static final private Log LOG = LogFactory.getLog(CLASS_NAME);

        protected HiveConf conf;
    - protected CompactionTxnHandler txnHandler;
    + protected TxnStore txnHandler;
        protected RawStore rs;
        protected int threadId;
        protected AtomicBoolean stop;
    @@ -75,7 +76,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
          setDaemon(true); // this means the process will exit without waiting for this thread

          // Get our own instance of the transaction handler
    - txnHandler = new CompactionTxnHandler(conf);
    + txnHandler = TxnUtils.getTxnStore(conf);

          // Get our own connection to the database so we can get table and partition information.
          rs = RawStoreProxy.getProxy(conf, conf,

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    index c023c27..1898a4d 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    @@ -37,8 +37,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
      import org.apache.hadoop.hive.metastore.api.Table;
      import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
      import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.io.AcidUtils;
      import org.apache.hadoop.security.UserGroupInformation;
      import org.apache.hadoop.util.StringUtils;
    @@ -81,7 +81,7 @@ public class Initiator extends CompactorThread {
              try {//todo: add method to only get current i.e. skip history - more efficient
                ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
                ValidTxnList txns =
    - CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
    + TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
                Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
                LOG.debug("Found " + potentials.size() + " potential compactions, " +
                    "checking to see if we should compact any of them");
    @@ -184,7 +184,7 @@ public class Initiator extends CompactorThread {
                                                  CompactionInfo ci) {
          if (compactions.getCompacts() != null) {
            for (ShowCompactResponseElement e : compactions.getCompacts()) {
    - if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) &&
    + if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
                  e.getDbname().equals(ci.dbname) &&
                  e.getTablename().equals(ci.tableName) &&
                  (e.getPartitionname() == null && ci.partName == null ||

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    index 59a765b..516b92e 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    @@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
      import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
      import org.apache.hadoop.hive.metastore.api.Table;
      import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.CommandNeedRetryException;
      import org.apache.hadoop.hive.ql.Driver;
      import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
      import org.apache.hadoop.hive.ql.session.SessionState;
      import org.apache.hadoop.security.UserGroupInformation;
      import org.apache.hadoop.util.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;

      import java.io.IOException;
      import java.net.InetAddress;
    @@ -135,7 +137,7 @@ public class Worker extends CompactorThread {

              final boolean isMajor = ci.isMajorCompaction();
              final ValidTxnList txns =
    - CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
    + TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
              LOG.debug("ValidCompactTxnList: " + txns.writeToString());
              txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
              final StringBuilder jobName = new StringBuilder(name);

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index 44b77e7..6f8dc35 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -32,10 +32,10 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
      import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
      import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
      import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
    -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
      import org.apache.hadoop.hive.ql.io.HiveInputFormat;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
    +import org.apache.hadoop.hive.metastore.txn.TxnUtils;
      import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
      import org.apache.hadoop.hive.ql.session.SessionState;
      import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
    @@ -486,7 +486,7 @@ public class TestTxnCommands2 {
          hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);

          int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
          AtomicBoolean stop = new AtomicBoolean(true);
          //create failed compactions
          for(int i = 0; i < numFailedCompactions; i++) {
    @@ -556,27 +556,27 @@ public class TestTxnCommands2 {
          private int working;
          private int total;
        }
    - private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
    + private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
          ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
          CompactionsByState compactionsByState = new CompactionsByState();
          compactionsByState.total = resp.getCompactsSize();
          for(ShowCompactResponseElement compact : resp.getCompacts()) {
    - if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
    + if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) {
              compactionsByState.failed++;
            }
    - else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
    + else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) {
              compactionsByState.readyToClean++;
            }
    - else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
    + else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) {
              compactionsByState.initiated++;
            }
    - else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
    + else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) {
              compactionsByState.succeeded++;
            }
    - else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
    + else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) {
              compactionsByState.working++;
            }
    - else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
    + else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
              compactionsByState.attempted++;
            }
          }
    @@ -632,7 +632,7 @@ public class TestTxnCommands2 {
          runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");

          //run Worker to execute compaction
    - CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
    + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
          txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
          Worker t = new Worker();
          t.setThreadId((int) t.getId());

    http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    index a4f7e5b..99705b4 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
      import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
      import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
      import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
    -import org.apache.hadoop.hive.metastore.txn.TxnHandler;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
      import org.apache.hadoop.hive.ql.Context;
      import org.apache.hadoop.hive.ql.ErrorMsg;
      import org.apache.hadoop.hive.ql.QueryPlan;
    @@ -42,7 +42,11 @@ import org.junit.Assert;
      import org.junit.Before;
      import org.junit.Test;

    -import java.util.*;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
      import java.util.concurrent.TimeUnit;

      /**
    @@ -243,10 +247,10 @@ public class TestDbTxnManager {
          }
          expireLocks(txnMgr, 5);
          //create a lot of locks
    - for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
    + for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
            ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
          }
    - expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
    + expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
        }
        private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
          DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
  • Ekoifman at Mar 24, 2016 at 11:28 pm
    HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17870823
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17870823
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17870823

    Branch: refs/heads/branch-1
    Commit: 178708231e09bb2c08aa05cf9979efd6d3cd542c
    Parents: c829505
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Thu Mar 24 16:22:21 2016 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Thu Mar 24 16:22:21 2016 -0700

    ----------------------------------------------------------------------
      .../apache/hadoop/hive/common/ServerUtils.java | 14 ++
      .../deployers/config/hive/hive-site.mysql.xml | 24 ++-
      .../hive/metastore/txn/CompactionInfo.java | 4 +
      .../metastore/txn/CompactionTxnHandler.java | 7 +-
      .../hadoop/hive/metastore/txn/TxnDbUtil.java | 20 ++-
      .../hadoop/hive/metastore/txn/TxnHandler.java | 169 ++++++++++++++++++-
      .../hadoop/hive/metastore/txn/TxnStore.java | 33 +++-
      .../hadoop/hive/metastore/txn/TxnUtils.java | 4 +-
      .../metastore/txn/ValidCompactorTxnList.java | 2 +-
      .../hive/metastore/txn/TestTxnHandler.java | 93 ++++++++++
      .../ql/txn/AcidCompactionHistoryService.java | 7 +
      .../hive/ql/txn/AcidHouseKeeperService.java | 7 +
      .../hadoop/hive/ql/txn/compactor/Cleaner.java | 62 ++++++-
      .../hadoop/hive/ql/txn/compactor/Initiator.java | 19 ++-
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 7 +-
      .../hive/ql/lockmgr/TestDbTxnManager.java | 6 +
      16 files changed, 445 insertions(+), 33 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
    ----------------------------------------------------------------------
    diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
    index a284f18..4141770 100644
    --- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
    +++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
    @@ -24,6 +24,9 @@ import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.conf.HiveConf;

    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +
      /**
       * ServerUtils (specific to HiveServer version 1)
       */
    @@ -47,4 +50,15 @@ public class ServerUtils {
          }
        }

    + /**
    + * @return name of current host
    + */
    + public static String hostname() {
    + try {
    + return InetAddress.getLocalHost().getHostName();
    + } catch (UnknownHostException e) {
    + LOG.error("Unable to resolve my host name " + e.getMessage());
    + throw new RuntimeException(e);
    + }
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
    ----------------------------------------------------------------------
    diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
    index b6f1ab7..387da6c 100644
    --- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
    +++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
    @@ -62,13 +62,14 @@
              <name>hive.exec.dynamic.partition.mode</name>
              <value>nonstrict</value>
          </property>
    +
          <property>
              <name>hive.compactor.initiator.on</name>
    - <value>false</value>
    + <value>true</value>
          </property>
          <property>
              <name>hive.compactor.worker.threads</name>
    - <value>2</value>
    + <value>5</value>
          </property>
          <property>
              <name>hive.timedout.txn.reaper.start</name>
    @@ -81,9 +82,24 @@
          -->
          <property>
              <name>hive.timedout.txn.reaper.interval</name>
    - <value>30s</value>
    + <value>1s</value>
    + </property>
    + <property>
    + <name>hive.compactor.history.reaper.interval</name>
    + <value>1s</value>
    + </property>
    + <property>
    + <name>hive.compactor.cleaner.run.interval</name>
    + <value>1s</value>
    + </property>
    + <property>
    + <name>hive.compactor.check.interval</name>
    + <value>1s</value>
    + </property>
    + <property>
    + <name>hive.compactor.delta.num.threshold</name>
    + <value>2</value>
          </property>
    -
          <!--end ACID related properties-->
      <!--
          <property>

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    index 73255d2..bea1473 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    @@ -18,6 +18,7 @@
      package org.apache.hadoop.hive.metastore.txn;

      import org.apache.hadoop.hive.metastore.api.CompactionType;
    +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;

      import java.sql.PreparedStatement;
      import java.sql.ResultSet;
    @@ -39,6 +40,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
        public boolean tooManyAborts = false;
        /**
         * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
    + * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
    + * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
    + * {@link ValidCompactorTxnList#highWatermark}
         */
        public long highestTxnId;
        byte[] metaInfo;

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    index f7c738a..cdff357 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    @@ -161,6 +161,8 @@ class CompactionTxnHandler extends TxnHandler {
          try {
            Connection dbConn = null;
            Statement stmt = null;
    + //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
    + Statement updStmt = null;
            ResultSet rs = null;
            try {
              dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    @@ -174,6 +176,7 @@ class CompactionTxnHandler extends TxnHandler {
                dbConn.rollback();
                return null;
              }
    + updStmt = dbConn.createStatement();
              do {
                CompactionInfo info = new CompactionInfo();
                info.id = rs.getLong(1);
    @@ -187,7 +190,7 @@ class CompactionTxnHandler extends TxnHandler {
                  "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
                  " AND cq_state='" + INITIATED_STATE + "'";
                LOG.debug("Going to execute update <" + s + ">");
    - int updCount = stmt.executeUpdate(s);
    + int updCount = updStmt.executeUpdate(s);
                if(updCount == 1) {
                  dbConn.commit();
                  return info;
    @@ -211,6 +214,7 @@ class CompactionTxnHandler extends TxnHandler {
              throw new MetaException("Unable to connect to transaction database " +
                StringUtils.stringifyException(e));
            } finally {
    + closeStmt(updStmt);
              close(rs, stmt, dbConn);
            }
          } catch (RetryException e) {
    @@ -627,6 +631,7 @@ class CompactionTxnHandler extends TxnHandler {

        /**
         * Record the highest txn id that the {@code ci} compaction job will pay attention to.
    + * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
         */
        public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
          Connection dbConn = null;

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    index 42415ac..56c9ed8 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    @@ -68,7 +68,7 @@ public final class TxnDbUtil {
          Connection conn = null;
          Statement stmt = null;
          try {
    - conn = getConnection();
    + conn = getConnection(true);
            stmt = conn.createStatement();
            stmt.execute("CREATE TABLE TXNS (" +
                " TXN_ID bigint PRIMARY KEY," +
    @@ -140,8 +140,13 @@ public final class TxnDbUtil {
              " CC_HIGHEST_TXN_ID bigint," +
              " CC_META_INFO varchar(2048) for bit data," +
              " CC_HADOOP_JOB_ID varchar(32))");
    -
    - conn.commit();
    +
    + stmt.execute("CREATE TABLE AUX_TABLE (" +
    + " MT_KEY1 varchar(128) NOT NULL," +
    + " MT_KEY2 bigint NOT NULL," +
    + " MT_COMMENT varchar(255)," +
    + " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
    + ")");
          } catch (SQLException e) {
            try {
              conn.rollback();
    @@ -166,7 +171,7 @@ public final class TxnDbUtil {
          Connection conn = null;
          Statement stmt = null;
          try {
    - conn = getConnection();
    + conn = getConnection(true);
            stmt = conn.createStatement();

            // We want to try these, whether they succeed or fail.
    @@ -185,7 +190,7 @@ public final class TxnDbUtil {
            dropTable(stmt, "COMPACTION_QUEUE");
            dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
            dropTable(stmt, "COMPLETED_COMPACTIONS");
    - conn.commit();
    + dropTable(stmt, "AUX_TABLE");
          } finally {
            closeResources(conn, stmt, null);
          }
    @@ -249,6 +254,9 @@ public final class TxnDbUtil {
        }

        static Connection getConnection() throws Exception {
    + return getConnection(false);
    + }
    + static Connection getConnection(boolean isAutoCommit) throws Exception {
          HiveConf conf = new HiveConf();
          String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
          Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
    @@ -260,7 +268,7 @@ public final class TxnDbUtil {
          prop.setProperty("user", user);
          prop.setProperty("password", passwd);
          Connection conn = driver.connect(driverUrl, prop);
    - conn.setAutoCommit(false);
    + conn.setAutoCommit(isAutoCommit);
          return conn;
        }


    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    index 9789371..a3b0751 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    @@ -23,6 +23,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
      import org.apache.commons.dbcp.ConnectionFactory;
      import org.apache.commons.dbcp.DriverManagerConnectionFactory;
      import org.apache.commons.dbcp.PoolableConnectionFactory;
    +import org.apache.commons.lang.NotImplementedException;
    +import org.apache.hadoop.hive.common.ServerUtils;
      import org.apache.hadoop.hive.common.classification.InterfaceAudience;
      import org.apache.hadoop.hive.common.classification.InterfaceStability;
      import org.apache.commons.logging.Log;
    @@ -45,6 +47,8 @@ import javax.sql.DataSource;
      import java.io.IOException;
      import java.sql.*;
      import java.util.*;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Semaphore;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.locks.ReentrantLock;

    @@ -87,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock;
       */
      @InterfaceAudience.Private
      @InterfaceStability.Evolving
    -abstract class TxnHandler implements TxnStore {
    +abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

        static final protected char INITIATED_STATE = 'i';
        static final protected char WORKING_STATE = 'w';
    @@ -139,6 +143,12 @@ abstract class TxnHandler implements TxnStore {
         * Derby specific concurrency control
         */
        private static final ReentrantLock derbyLock = new ReentrantLock(true);
    + /**
    + * must be static since even in UT there may be > 1 instance of TxnHandler
    + * (e.g. via Compactor services)
    + */
    + private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
    + private static final String hostname = ServerUtils.hostname();

        // Private methods should never catch SQLException and then throw MetaException. The public
        // methods depend on SQLException coming back so they can detect and handle deadlocks. Private
    @@ -563,7 +573,7 @@ abstract class TxnHandler implements TxnStore {
         * @throws MetaException
         */
        private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
    - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : "");
    + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
          ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
          if(rs.next()) {
            return rs;
    @@ -1437,14 +1447,14 @@ abstract class TxnHandler implements TxnStore {
          }
        }

    - void rollbackDBConn(Connection dbConn) {
    + static void rollbackDBConn(Connection dbConn) {
          try {
            if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
          } catch (SQLException e) {
            LOG.warn("Failed to rollback db connection " + getMessage(e));
          }
        }
    - protected void closeDbConn(Connection dbConn) {
    + protected static void closeDbConn(Connection dbConn) {
          try {
            if (dbConn != null && !dbConn.isClosed()) {
              dbConn.close();
    @@ -1458,7 +1468,7 @@ abstract class TxnHandler implements TxnStore {
         * Close statement instance.
         * @param stmt statement instance.
         */
    - protected void closeStmt(Statement stmt) {
    + protected static void closeStmt(Statement stmt) {
          try {
            if (stmt != null && !stmt.isClosed()) stmt.close();
          } catch (SQLException e) {
    @@ -1470,7 +1480,7 @@ abstract class TxnHandler implements TxnStore {
         * Close the ResultSet.
         * @param rs may be {@code null}
         */
    - void close(ResultSet rs) {
    + static void close(ResultSet rs) {
          try {
            if (rs != null && !rs.isClosed()) {
              rs.close();
    @@ -1484,7 +1494,7 @@ abstract class TxnHandler implements TxnStore {
        /**
         * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
         */
    - void close(ResultSet rs, Statement stmt, Connection dbConn) {
    + static void close(ResultSet rs, Statement stmt, Connection dbConn) {
          close(rs);
          closeStmt(stmt);
          closeDbConn(dbConn);
    @@ -2635,6 +2645,40 @@ abstract class TxnHandler implements TxnStore {
          }
          return false;
        }
    + private boolean isDuplicateKeyError(SQLException ex) {
    + switch (dbProduct) {
    + case DERBY:
    + if("23505".equals(ex.getSQLState())) {
    + return true;
    + }
    + break;
    + case MYSQL:
    + if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) {
    + return true;
    + }
    + break;
    + case SQLSERVER:
    + //2627 is unique constaint violation incl PK, 2601 - unique key
    + if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
    + return true;
    + }
    + break;
    + case ORACLE:
    + if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) {
    + return true;
    + }
    + break;
    + case POSTGRES:
    + //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html
    + if("23505".equals(ex.getSQLState())) {
    + return true;
    + }
    + break;
    + default:
    + throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex));
    + }
    + return false;
    + }
        private static String getMessage(SQLException ex) {
          return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
        }
    @@ -2709,4 +2753,115 @@ abstract class TxnHandler implements TxnStore {
            derbyLock.unlock();
          }
        }
    + @Override
    + public MutexAPI getMutexAPI() {
    + return this;
    + }
    +
    + @Override
    + public LockHandle acquireLock(String key) throws MetaException {
    + /**
    + * The implementation here is a bit kludgey but done so that code exercised by unit tests
    + * (which run against Derby which has no support for select for update) is as similar to
    + * production code as possible.
    + * In particular, with Derby we always run in a single process with a single metastore and
    + * the absence of For Update is handled via a Semaphore. The later would strictly speaking
    + * make the SQL statments below unnecessary (for Derby), but then they would not be tested.
    + */
    + Connection dbConn = null;
    + Statement stmt = null;
    + ResultSet rs = null;
    + try {
    + try {
    + String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
    + lockInternal();
    + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
    + stmt = dbConn.createStatement();
    + if(LOG.isDebugEnabled()) {
    + LOG.debug("About to execute SQL: " + sqlStmt);
    + }
    + rs = stmt.executeQuery(sqlStmt);
    + if (!rs.next()) {
    + close(rs);
    + try {
    + stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)");
    + dbConn.commit();
    + } catch (SQLException ex) {
    + if (!isDuplicateKeyError(ex)) {
    + throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex);
    + }
    + }
    + rs = stmt.executeQuery(sqlStmt);
    + if (!rs.next()) {
    + throw new IllegalStateException("Unable to lock " + quoteString(key) + ". Expected row in AUX_TABLE is missing.");
    + }
    + }
    + Semaphore derbySemaphore = null;
    + if(dbProduct == DatabaseProduct.DERBY) {
    + derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
    + derbySemaphore = derbyKey2Lock.get(key);
    + derbySemaphore.acquire();
    + }
    + LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname));
    + //OK, so now we have a lock
    + return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
    + } catch (SQLException ex) {
    + rollbackDBConn(dbConn);
    + close(rs, stmt, dbConn);
    + checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
    + throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
    + }
    + catch(InterruptedException ex) {
    + rollbackDBConn(dbConn);
    + close(rs, stmt, dbConn);
    + throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage() + StringUtils.stringifyException(ex));
    + }
    + finally {
    + unlockInternal();
    + }
    + }
    + catch(RetryException ex) {
    + acquireLock(key);
    + }
    + throw new MetaException("This can't happen because checkRetryable() has a retry limit");
    + }
    + public void acquireLock(String key, LockHandle handle) {
    + //the idea is that this will use LockHandle.dbConn
    + throw new NotImplementedException();
    + }
    + private static final class LockHandleImpl implements LockHandle {
    + private final Connection dbConn;
    + private final Statement stmt;
    + private final ResultSet rs;
    + private final Semaphore derbySemaphore;
    + private final List<String> keys = new ArrayList<>();
    + LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore) {
    + this.dbConn = conn;
    + this.stmt = stmt;
    + this.rs = rs;
    + this.derbySemaphore = derbySemaphore;
    + if(derbySemaphore != null) {
    + //oterwise it may later release permit acquired by someone else
    + assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore";
    + }
    + keys.add(key);
    + }
    + void addKey(String key) {
    + //keys.add(key);
    + //would need a list of (stmt,rs) pairs - 1 for each key
    + throw new NotImplementedException();
    + }
    +
    + @Override
    + public void releaseLocks() {
    + rollbackDBConn(dbConn);
    + close(rs, stmt, dbConn);
    + if(derbySemaphore != null) {
    + derbySemaphore.release();
    + }
    + for(String key : keys) {
    + LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname));
    + }
    + }
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    index 6fc6ed9..3aac11b 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
    @@ -74,6 +74,7 @@ import java.util.Set;
      @InterfaceStability.Evolving
      public interface TxnStore {

    + public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
        // Compactor states (Should really be enum)
        static final public String INITIATED_RESPONSE = "initiated";
        static final public String WORKING_RESPONSE = "working";
    @@ -355,10 +356,40 @@ public interface TxnStore {
         */
        public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;

    -
        @VisibleForTesting
        public int numLocksInLockTable() throws SQLException, MetaException;

        @VisibleForTesting
        long setTimeout(long milliseconds);
    +
    + public MutexAPI getMutexAPI();
    +
    + /**
    + * This is primarily designed to provide coarse grained mutex support to operations running
    + * inside the Metastore (of which there could be several instances). The initial goal is to
    + * ensure that various sub-processes of the Compactor don't step on each other.
    + *
    + * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
    + */
    + public static interface MutexAPI {
    + /**
    + * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
    + * a handle which must be used to release the lock. Each invocation returns a new handle.
    + */
    + public LockHandle acquireLock(String key) throws MetaException;
    +
    + /**
    + * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This
    + * will associate the lock on {@code key} with the same handle. All locks associated with
    + * the same handle will be released together.
    + * @param handle not NULL
    + */
    + public void acquireLock(String key, LockHandle handle) throws MetaException;
    + public static interface LockHandle {
    + /**
    + * Releases all locks associcated with this handle.
    + */
    + public void releaseLocks();
    + }
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    index f60e34b..4c14eef 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
    @@ -75,8 +75,8 @@ public class TxnUtils {
          int i = 0;
          for (TxnInfo txn : txns.getOpen_txns()) {
            if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
    - exceptions[i++] = txn.getId();
    - }
    + exceptions[i++] = txn.getId();//todo: only add Aborted
    + }//remove all exceptions < minOpenTxn
          highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
          return new ValidCompactorTxnList(exceptions, -1, highWater);
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
    index 648fd49..30bdfa7 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
    @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
      import java.util.Arrays;

      /**
    - * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
    + * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
       * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
       * is committed or aborted. Additionally it will return none if there are any open transactions
       * below the max transaction given, since we don't want to compact above open transactions. For

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    index b8cab71..6033c15 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.api.*;
      import org.apache.log4j.Level;
      import org.apache.log4j.LogManager;
      import org.junit.*;
    +import org.apache.hadoop.util.StringUtils;

      import java.sql.Connection;
      import java.sql.ResultSet;
    @@ -33,6 +34,7 @@ import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;

      import static junit.framework.Assert.assertEquals;
      import static junit.framework.Assert.assertFalse;
    @@ -1214,6 +1216,97 @@ public class TestTxnHandler {
          }
        }

    + /**
    + * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
    + * 1. add to metastore/pom.xml
    + * <dependency>
    + * <groupId>mysql</groupId>
    + * <artifactId>mysql-connector-java</artifactId>
    + * <version>5.1.30</version>
    + * </dependency>
    + * 2. Hack in the c'tor of this class
    + * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
    + * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
    + * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
    + * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
    + * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
    + *
    + */
    + @Ignore("multiple threads wedge Derby")
    + @Test
    + public void testMutexAPI() throws Exception {
    + final TxnStore.MutexAPI api = txnHandler.getMutexAPI();
    + final AtomicInteger stepTracker = new AtomicInteger(0);
    + /**
    + * counter = 0;
    + * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock
    + * Thread2 counter=2, lock (and block), inc counter, should be 4
    + */
    + Thread t1 = new Thread("MutexTest1") {
    + public void run() {
    + try {
    + stepTracker.incrementAndGet();//now 1
    + TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
    + Thread.sleep(4000);
    + //stepTracker should now be 2 which indicates t2 has started
    + Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
    + stepTracker.incrementAndGet();//now 3
    + handle.releaseLocks();
    + }
    + catch(Exception ex) {
    + throw new RuntimeException(ex.getMessage(), ex);
    + }
    + }
    + };
    + t1.setDaemon(true);
    + ErrorHandle ueh1 = new ErrorHandle();
    + t1.setUncaughtExceptionHandler(ueh1);
    + Thread t2 = new Thread("MutexTest2") {
    + public void run() {
    + try {
    + stepTracker.incrementAndGet();//now 2
    + //this should block until t1 unlocks
    + TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
    + stepTracker.incrementAndGet();//now 4
    + Assert.assertEquals(4, stepTracker.get());
    + handle.releaseLocks();
    + stepTracker.incrementAndGet();//now 5
    + }
    + catch(Exception ex) {
    + throw new RuntimeException(ex.getMessage(), ex);
    + }
    + }
    + };
    + t2.setDaemon(true);
    + ErrorHandle ueh2 = new ErrorHandle();
    + t2.setUncaughtExceptionHandler(ueh2);
    + t1.start();
    + try {
    + Thread.sleep(1000);
    + }
    + catch(InterruptedException ex) {
    + LOG.info("Sleep was interrupted");
    + }
    + t2.start();
    + t1.join(6000);//so that test doesn't block
    + t2.join(6000);
    +
    + if(ueh1.error != null) {
    + Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
    + }
    + if (ueh2.error != null) {
    + Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
    + }
    + Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
    + }
    + private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
    + Throwable error = null;
    + @Override
    + public void uncaughtException(Thread t, Throwable e) {
    + LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
    + error = e;
    + }
    + }
        private void updateTxns(Connection conn) throws SQLException {
          Statement stmt = conn.createStatement();
          stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    index 59c8fe4..5d9e7be 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
    @@ -61,7 +61,9 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {

          @Override
          public void run() {
    + TxnStore.MutexAPI.LockHandle handle = null;
            try {
    + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
              long startTime = System.currentTimeMillis();
              txnHandler.purgeCompactionHistory();
              int count = isAliveCounter.incrementAndGet();
    @@ -70,6 +72,11 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
            catch(Throwable t) {
              LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
            }
    + finally {
    + if(handle != null) {
    + handle.releaseLocks();
    + }
    + }
          }
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    index de74a7b..f39df17 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
    @@ -61,7 +61,9 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
          }
          @Override
          public void run() {
    + TxnStore.MutexAPI.LockHandle handle = null;
            try {
    + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
              long startTime = System.currentTimeMillis();
              txnHandler.performTimeOuts();
              int count = isAliveCounter.incrementAndGet();
    @@ -70,6 +72,11 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
            catch(Throwable t) {
              LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t);
            }
    + finally {
    + if(handle != null) {
    + handle.releaseLocks();
    + }
    + }
          }
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    index 33580fd..1e6e8a1 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;

      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.metastore.txn.TxnStore;
      import org.apache.hadoop.fs.FileStatus;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
    @@ -72,11 +73,13 @@ public class Cleaner extends CompactorThread {
            // and if so remembers that and then sets it to true at the end. We have to check here
            // first to make sure we go through a complete iteration of the loop before resetting it.
            boolean setLooped = !looped.get();
    - long startedAt = System.currentTimeMillis();
    + TxnStore.MutexAPI.LockHandle handle = null;
    + long startedAt = -1;
            // Make sure nothing escapes this run method and kills the metastore at large,
            // so wrap it in a big catch Throwable statement.
            try {
    -
    + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
    + startedAt = System.currentTimeMillis();
              // First look for all the compactions that are waiting to be cleaned. If we have not
              // seen an entry before, look for all the locks held on that table or partition and
              // record them. We will then only clean the partition once all of those locks have been
    @@ -86,6 +89,31 @@ public class Cleaner extends CompactorThread {
              // done the compaction will read the more up to date version of the data (either in a
              // newer delta or in a newer base).
              List<CompactionInfo> toClean = txnHandler.findReadyToClean();
    + {
    + /**
    + * Since there may be more than 1 instance of Cleaner running we may have state info
    + * for items which were cleaned by instances. Here we remove them.
    + *
    + * In the long run if we add end_time to compaction_queue, then we can check that
    + * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case
    + * we know the lock owner is reading files created by this compaction or later.
    + * The advantage is that we don't have to store the locks.
    + */
    + Set<Long> currentToCleanSet = new HashSet<>();
    + for (CompactionInfo ci : toClean) {
    + currentToCleanSet.add(ci.id);
    + }
    + Set<Long> cleanPerformedByOthers = new HashSet<>();
    + for (long id : compactId2CompactInfoMap.keySet()) {
    + if (!currentToCleanSet.contains(id)) {
    + cleanPerformedByOthers.add(id);
    + }
    + }
    + for (long id : cleanPerformedByOthers) {
    + compactId2CompactInfoMap.remove(id);
    + compactId2LockMap.remove(id);
    + }
    + }
              if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
                ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());

    @@ -119,6 +147,7 @@ public class Cleaner extends CompactorThread {
                      // Remember to remove this when we're out of the loop,
                      // we can't do it in the loop or we'll get a concurrent modification exception.
                      compactionsCleaned.add(queueEntry.getKey());
    + //Future thought: this may be expensive so consider having a thread pool run in parallel
                      clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
                    } else {
                      // Remove the locks we didn't see so we don't look for them again next time
    @@ -140,6 +169,11 @@ public class Cleaner extends CompactorThread {
              LOG.error("Caught an exception in the main loop of compactor cleaner, " +
                  StringUtils.stringifyException(t));
            }
    + finally {
    + if (handle != null) {
    + handle.releaseLocks();
    + }
    + }
            if (setLooped) {
              looped.set(true);
            }
    @@ -206,10 +240,24 @@ public class Cleaner extends CompactorThread {
            StorageDescriptor sd = resolveStorageDescriptor(t, p);
            final String location = sd.getLocation();

    - // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
    - // transactions. This assures that all deltas are treated as valid and all we return are
    - // obsolete files.
    - final ValidTxnList txnList = new ValidReadTxnList();
    + /**
    + * Each Compaction only compacts as far as the highest txn id such that all txns below it
    + * are resolved (i.e. not opened). This is what "highestTxnId" tracks. This is only tracked
    + * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorTxnList and uses for more info.
    + *
    + * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from
    + * under an active reader.
    + *
    + * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a
    + * clean request for D2.
    + * Cleaner checks existing locks and finds none.
    + * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction
    + * completes which creates D4.
    + * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete
    + * unless ValidTxnList is "capped" at highestTxnId.
    + */
    + final ValidTxnList txnList = ci.highestTxnId > 0 ?
    + new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList();

            if (runJobAsSelf(ci.runAs)) {
              removeFiles(location, txnList);
    @@ -249,7 +297,7 @@ public class Cleaner extends CompactorThread {
          FileSystem fs = filesToDelete.get(0).getFileSystem(conf);

          for (Path dead : filesToDelete) {
    - LOG.debug("Doing to delete path " + dead.toString());
    + LOG.debug("Going to delete path " + dead.toString());
            fs.delete(dead, true);
          }
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    index 1898a4d..0e4ba06 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    @@ -74,11 +74,15 @@ public class Initiator extends CompactorThread {
            // much easier. The stop value is only for testing anyway and not used when called from
            // HiveMetaStore.
            do {
    - long startedAt = System.currentTimeMillis();
    + long startedAt = -1;
    + TxnStore.MutexAPI.LockHandle handle = null;

              // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
              // don't doom the entire thread.
    - try {//todo: add method to only get current i.e. skip history - more efficient
    + try {
    + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
    + startedAt = System.currentTimeMillis();
    + //todo: add method to only get current i.e. skip history - more efficient
                ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
                ValidTxnList txns =
                    TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
    @@ -114,6 +118,8 @@ public class Initiator extends CompactorThread {
                    // Check if we already have initiated or are working on a compaction for this partition
                    // or table. If so, skip it. If we are just waiting on cleaning we can still check,
                    // as it may be time to compact again even though we haven't cleaned.
    + //todo: this is not robust. You can easily run Alter Table to start a compaction between
    + //the time currentCompactions is generated and now
                    if (lookForCurrentCompactions(currentCompactions, ci)) {
                      LOG.debug("Found currently initiated or working compaction for " +
                          ci.getFullPartitionName() + " so we will not initiate another compaction");
    @@ -134,7 +140,9 @@ public class Initiator extends CompactorThread {
                    }
                    StorageDescriptor sd = resolveStorageDescriptor(t, p);
                    String runAs = findUserToRunAs(sd.getLocation(), t);
    -
    + /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
    + * Long term we should consider having a thread pool here and running checkForCompactionS
    + * in parallel*/
                    CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
                    if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
                  } catch (Throwable t) {
    @@ -154,6 +162,11 @@ public class Initiator extends CompactorThread {
                LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
                    StringUtils.stringifyException(t));
              }
    + finally {
    + if(handle != null) {
    + handle.releaseLocks();
    + }
    + }

              long elapsedTime = System.currentTimeMillis() - startedAt;
              if (elapsedTime >= checkInterval || stop.get()) continue;

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index 6f8dc35..8394ec6 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -229,7 +229,6 @@ public class TestTxnCommands2 {
          }
          Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
        }
    - @Ignore("alter table")
        @Test
        public void testAlterTable() throws Exception {
          int[][] tableData = {{1,2}};
    @@ -604,7 +603,13 @@ public class TestTxnCommands2 {
        private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
          int lastCount = houseKeeperService.getIsAliveCounter();
          houseKeeperService.start(conf);
    + int maxIter = 10;
    + int iterCount = 0;
          while(houseKeeperService.getIsAliveCounter() <= lastCount) {
    + if(iterCount++ >= maxIter) {
    + //prevent test hangs
    + throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
    + }
            try {
              Thread.sleep(100);//make sure it has run at least once
            }

    http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    index 99705b4..b355dbe 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    @@ -194,7 +194,13 @@ public class TestDbTxnManager {
        private void runReaper() throws Exception {
          int lastCount = houseKeeperService.getIsAliveCounter();
          houseKeeperService.start(conf);
    + int maxIter = 10;
    + int iterCount = 0;
          while(houseKeeperService.getIsAliveCounter() <= lastCount) {
    + if(iterCount++ >= maxIter) {
    + //prevent test hangs
    + throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits");
    + }
            try {
              Thread.sleep(100);//make sure it has run at least once
            }

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 23, '16 at 6:59p
activeMar 24, '16 at 11:28p
posts4
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 4 posts

People

Translate

site design / logo © 2021 Grokbase