Grokbase Groups Hive commits May 2016
FAQ
Repository: hive
Updated Branches:
   refs/heads/branch-1 8a59b85a6 -> 7dbc53da9


http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index b355dbe..3f5d0b6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.FieldSchema;
  import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
  import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
  import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
  import org.apache.hadoop.hive.metastore.txn.TxnStore;
  import org.apache.hadoop.hive.ql.Context;
@@ -508,6 +509,12 @@ public class TestDbTxnManager {
        partCols.add(fs);
        t.setPartCols(partCols);
      }
+ Map<String, String> tblProps = t.getParameters();
+ if(tblProps == null) {
+ tblProps = new HashMap<>();
+ }
+ tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ t.setParamters(tblProps);
      return t;
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 0e2bfc0..832606b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -17,7 +17,13 @@
   */
  package org.apache.hadoop.hive.ql.lockmgr;

-import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
+import org.apache.hadoop.hive.ql.txn.AcidWriteSetService;
+import org.junit.After;
+import org.junit.Assert;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.LockState;
  import org.apache.hadoop.hive.metastore.api.LockType;
@@ -29,23 +35,32 @@ import org.apache.hadoop.hive.ql.Driver;
  import org.apache.hadoop.hive.ql.ErrorMsg;
  import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
  import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.After;
  import org.junit.Before;
  import org.junit.BeforeClass;
+import org.junit.Ignore;
  import org.junit.Test;

  import java.util.ArrayList;
+import java.util.Collections;
  import java.util.List;

  /**
   * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
   * Tests here are "end-to-end"ish and simulate concurrent queries.
+ *
+ * The general approach is to use an instance of Driver to use Driver.run() to create tables
+ * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks().
+ * Same HiveTxnManager is used to openTxn()/commitTxn() etc. This can exercise almost the entire
+ * code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then
+ * simulate interleaved transactional/locking operations but all from within a single thread.
+ * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB.
   */
  public class TestDbTxnManager2 {
    private static HiveConf conf = new HiveConf(Driver.class);
    private HiveTxnManager txnMgr;
    private Context ctx;
    private Driver driver;
+ TxnStore txnHandler;

    @BeforeClass
    public static void setUpClass() throws Exception {
@@ -61,15 +76,17 @@ public class TestDbTxnManager2 {
      driver.init();
      TxnDbUtil.cleanDb();
      TxnDbUtil.prepDb();
- txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ SessionState ss = SessionState.get();
+ ss.initTxnMgr(conf);
+ txnMgr = ss.getTxnMgr();
      Assert.assertTrue(txnMgr instanceof DbTxnManager);
+ txnHandler = TxnUtils.getTxnStore(conf);
+
    }
    @After
    public void tearDown() throws Exception {
      driver.close();
      if (txnMgr != null) txnMgr.closeTxnManager();
- TxnDbUtil.cleanDb();
- TxnDbUtil.prepDb();
    }
    @Test
    public void testLocksInSubquery() throws Exception {
@@ -193,22 +210,24 @@ public class TestDbTxnManager2 {
      checkCmdOnDriver(cpr);
      cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
      checkCmdOnDriver(cpr);
+ txnMgr.openTxn("Fifer");
      txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
- List<HiveLock> updateLocks = ctx.getHiveLocks();
- cpr = driver.compileAndRespond("drop database if exists temp");
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
+ checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ //txnMgr2.openTxn("Fiddler");
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
      List<ShowLocksResponseElement> locks = getLocks();
      Assert.assertEquals("Unexpected lock count", 2, locks.size());
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0));
      checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1));
- txnMgr.getLockManager().releaseLocks(updateLocks);
- lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+ txnMgr.commitTxn();
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
      locks = getLocks();
      Assert.assertEquals("Unexpected lock count", 1, locks.size());
      checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0));
      List<HiveLock> xLock = new ArrayList<HiveLock>(0);
      xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(xLock);
+ txnMgr2.getLockManager().releaseLocks(xLock);
    }
    @Test
    public void updateSelectUpdate() throws Exception {
@@ -216,29 +235,27 @@ public class TestDbTxnManager2 {
      checkCmdOnDriver(cpr);
      cpr = driver.compileAndRespond("delete from T8 where b = 89");
      checkCmdOnDriver(cpr);
+ txnMgr.openTxn("Fifer");
      txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
- List<HiveLock> deleteLocks = ctx.getHiveLocks();
      cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
      checkCmdOnDriver(cpr);
- txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
- cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1");
- checkCmdOnDriver(cpr);
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Fiddler");
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+ checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
+ ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
      List<ShowLocksResponseElement> locks = getLocks();
      Assert.assertEquals("Unexpected lock count", 3, locks.size());
      checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
      checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2));
- txnMgr.getLockManager().releaseLocks(deleteLocks);
- lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());
+ txnMgr.rollbackTxn();
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid());
      locks = getLocks();
      Assert.assertEquals("Unexpected lock count", 2, locks.size());
      checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
- List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr2.commitTxn();
      cpr = driver.run("drop table if exists T6");
      locks = getLocks();
      Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
@@ -605,12 +622,12 @@ public class TestDbTxnManager2 {
      txnMgr.getLockManager().releaseLocks(relLocks);
    }

- private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
- Assert.assertEquals(l.toString(),l.getType(), type);
- Assert.assertEquals(l.toString(),l.getState(), state);
- Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db));
- Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table));
- Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition));
+ private void checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, ShowLocksResponseElement actual) {
+ Assert.assertEquals(actual.toString(), expectedType, actual.getType());
+ Assert.assertEquals(actual.toString(), expectedState,actual.getState());
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedDb), normalizeCase(actual.getDbname()));
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedTable), normalizeCase(actual.getTablename()));
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedPartition), normalizeCase(actual.getPartname()));
    }
    private void checkCmdOnDriver(CommandProcessorResponse cpr) {
      Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0);
@@ -625,4 +642,541 @@ public class TestDbTxnManager2 {
      ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
      return rsp.getLocks();
    }
+
+ /**
+ * txns update same resource but do not overlap in time - no conflict
+ */
+ @Test
+ public void testWriteSetTracking1() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
+ txnMgr.openTxn("Nicholas");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.commitTxn();
+ txnMgr2.openTxn("Alexandra");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ txnMgr2.commitTxn();
+ }
+ /**
+ * txns overlap in time but do not update same resource - no conflict
+ */
+ @Test
+ public void testWriteSetTracking2() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.openTxn("Peter");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter");
+ txnMgr2.openTxn("Catherine");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //note that "update" uses dynamic partitioning thus lock is on the table not partition
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ txnMgr.commitTxn();
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine");
+ txnMgr2.commitTxn();
+ }
+
+ /**
+ * txns overlap and update the same resource - can't commit 2nd txn
+ */
+ @Test
+ public void testWriteSetTracking3() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ txnMgr.openTxn("Known");
+ txnMgr2.openTxn("Unknown");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+ locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+ txnMgr.commitTxn();
+ LockException expectedException = null;
+ try {
+ txnMgr2.commitTxn();
+ }
+ catch (LockException e) {
+ expectedException = e;
+ }
+ Assert.assertTrue("Didn't get exception", expectedException != null);
+ Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+ Assert.assertEquals("Exception msg didn't match",
+ "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]",
+ expectedException.getCause().getMessage());
+ }
+ /**
+ * txns overlap, update same resource, simulate multi-stmt txn case
+ * Also tests that we kill txn when it tries to acquire lock if we already know it will not be committed
+ */
+ @Test
+ public void testWriteSetTracking4() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ txnMgr.openTxn("Long Running");
+ checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //for some reason this just locks the table; if I alter table to add this partition, then
+ //we end up locking both table and partition with share_read. (Plan has 2 ReadEntities)...?
+ //same for other locks below
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Short Running");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running");
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+ //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST));
+ txnMgr2.commitTxn();
+ //Short Running updated nothing, so we expect 0 rows in WRITE_SET
+ Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+ txnMgr2.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3");
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty
+ //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.singletonList("p=two")));//simulate partition update
+ txnMgr2.commitTxn();
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+ AcidWriteSetService houseKeeper = new AcidWriteSetService();
+ TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+ //since T3 overlaps with Long Running (still open) GC does nothing
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+ //so generate empty Dyn Part call
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST));
+ txnMgr.commitTxn();
+
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 0, locks.size());
+ TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+ /**
+ * overlapping txns updating the same resource but 1st one rolls back; 2nd commits
+ * @throws Exception
+ */
+ @Test
+ public void testWriteSetTracking5() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ txnMgr.openTxn("Known");
+ txnMgr2.openTxn("Unknown");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+ locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+ txnMgr.rollbackTxn();
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+ /**
+ * check that read query concurrent with txn works ok
+ */
+ @Test
+ public void testWriteSetTracking6() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " +
+ "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Works");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Horton");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+ txnMgr2.commitTxn();//no conflict
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf);
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+
+ /**
+ * 2 concurrent txns update different partitions of the same table and succeed
+ * @throws Exception
+ */
+ @Test
+ public void testWriteSetTracking7() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ //test with predicates such that partition pruning works
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=one")));
+ txnMgr.commitTxn();//txnid:3
+ //now both txns concurrently updated TAB2 but different partitions.
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'"));
+ //2 from txnid:1, 1 from txnid:2, 1 from txnid:3
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null"));
+
+ //================
+ //test with predicates such that partition pruning doesn't kick in
+ cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
+ txnMgr2.openTxn("T5");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
+ locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T6");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 4, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3));
+
+ //this simulates the completion of txnid:5
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:5
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ //completion of txnid:6
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:6
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ //2 from insert + 1 for each update stmt
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent updates with partition pruning predicate and w/o one
+ */
+ @Test
+ public void testWriteSetTracking8() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:3
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent update/delete of different partitions - should pass
+ */
+ @Test
+ public void testWriteSetTracking9() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:3
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent update/delete of same partition - should fail to commit
+ */
+ @Test
+ public void testWriteSetTracking10() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ LockException exception = null;
+ try {
+ txnMgr.commitTxn();//txnid:3
+ }
+ catch(LockException e) {
+ exception = e;
+ }
+ Assert.assertNotEquals("Expected exception", null, exception);
+ Assert.assertEquals("Exception msg doesn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ exception.getCause().getMessage());
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 3, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent delte/detele of same partition - should pass
+ * This test doesn't work yet, because we don't yet pass in operation type
+ *
+ * todo: Concurrent insert/update of same partition - should pass
+ */
+ @Ignore("HIVE-13622")
+ @Test
+ public void testWriteSetTracking11() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ LockException exception = null;
+ try {
+ txnMgr.commitTxn();//txnid:3
+ }
+ catch(LockException e) {
+ exception = e;
+ }
+ Assert.assertNotEquals("Expected exception", null, exception);
+ Assert.assertEquals("Exception msg doesn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ exception.getCause().getMessage());
+
+ //todo: this currently fails since we don't yet set operation type properly
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 17634f0..8d75ab3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -249,6 +249,8 @@ public class TestCleaner extends CompactorTest {
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
+ OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania"));
+ req.setTxnid(resp.getTxn_ids().get(0));
      LockResponse res = txnHandler.lock(req);

      startCleaner();

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 5 of 5 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 5, '16 at 8:37p
activeMay 5, '16 at 10:23p
posts5
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 5 posts

People

Translate

site design / logo © 2021 Grokbase