Grokbase Groups Hive commits May 2016
FAQ
Repository: hive
Updated Branches:
   refs/heads/branch-1 6c6583274 -> c0b532fce


http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 0770298..584cd45 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -18,6 +18,7 @@
  package org.apache.hadoop.hive.ql.lockmgr;

  import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
  import org.apache.hadoop.hive.metastore.txn.TxnStore;
  import org.apache.hadoop.hive.metastore.txn.TxnUtils;
  import org.apache.hadoop.hive.ql.TestTxnCommands2;
@@ -854,8 +855,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
      //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
      Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
- "default", "tab2", Collections.EMPTY_LIST));
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST);
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();
      //Short Running updated nothing, so we expect 0 rows in WRITE_SET
      Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -869,8 +872,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty
      //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
      Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
- "default", "tab2", Collections.singletonList("p=two")));//simulate partition update
+ adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);//simulate partition update
      txnMgr2.commitTxn();
      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
        1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -882,8 +887,10 @@ public class TestDbTxnManager2 {
      checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match
      txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
      //so generate empty Dyn Part call
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
- "default", "tab2", Collections.EMPTY_LIST));
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST);
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr.commitTxn();

      locks = getLocks(txnMgr);
@@ -984,16 +991,20 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1));

      //this simulates the completion of txnid:2
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
- Collections.singletonList("p=two")));
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();//txnid:2

      locks = getLocks(txnMgr2);
      Assert.assertEquals("Unexpected lock count", 1, locks.size());
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0));
      //completion of txnid:3
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
- Collections.singletonList("p=one")));
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=one"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr.commitTxn();//txnid:3
      //now both txns concurrently updated TAB2 but different partitions.

@@ -1031,8 +1042,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3));

      //this simulates the completion of txnid:5
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=one")));
+ adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();//txnid:5

      ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
@@ -1041,8 +1054,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
      //completion of txnid:6
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr.commitTxn();//txnid:6

      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1082,8 +1097,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));

      //this simulates the completion of txnid:2
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=one")));
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();//txnid:2

      ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
@@ -1091,8 +1108,10 @@ public class TestDbTxnManager2 {
      Assert.assertEquals("Unexpected lock count", 1, locks.size());
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
      //completion of txnid:3
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr.commitTxn();//txnid:3

      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1131,8 +1150,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));

      //this simulates the completion of txnid:2
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=one")));
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();//txnid:2

      ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
@@ -1140,14 +1161,22 @@ public class TestDbTxnManager2 {
      Assert.assertEquals("Unexpected lock count", 1, locks.size());
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
      //completion of txnid:3
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr.commitTxn();//txnid:3

+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1' and ctc_partition='p=one'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=3 and ctc_table='tab1' and ctc_partition='p=two'"));
      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
        1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
      Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
        4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
    }
@@ -1180,8 +1209,10 @@ public class TestDbTxnManager2 {
      checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));

      //this simulates the completion of txnid:2
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();//txnid:2

      ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
@@ -1189,8 +1220,10 @@ public class TestDbTxnManager2 {
      Assert.assertEquals("Unexpected lock count", 1, locks.size());
      checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
      //completion of txnid:3
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
      LockException exception = null;
      try {
        txnMgr.commitTxn();//txnid:3
@@ -1210,11 +1243,7 @@ public class TestDbTxnManager2 {
    }
    /**
     * Concurrent delte/detele of same partition - should pass
- * This test doesn't work yet, because we don't yet pass in operation type
- *
- * todo: Concurrent insert/update of same partition - should pass
     */
- @Ignore("HIVE-13622")
    @Test
    public void testWriteSetTracking11() throws Exception {
      CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
@@ -1232,46 +1261,86 @@ public class TestDbTxnManager2 {

      //now start concurrent txn
      txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
      checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
      ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
      locks = getLocks(txnMgr);
- Assert.assertEquals("Unexpected lock count", 3, locks.size());
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
- checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+ Assert.assertEquals("Unexpected lock count", 5, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks.get(0));
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(2));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(3));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(4));

      //this simulates the completion of txnid:2
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
      txnMgr2.commitTxn();//txnid:2

- ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest WAITING locks (both have same ext id)
      locks = getLocks(txnMgr);
- Assert.assertEquals("Unexpected lock count", 1, locks.size());
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks.get(0));
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(2));
      //completion of txnid:3
- txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
- Collections.singletonList("p=two")));
- LockException exception = null;
- try {
- txnMgr.commitTxn();//txnid:3
- }
- catch(LockException e) {
- exception = e;
- }
- Assert.assertNotEquals("Expected exception", null, exception);
- Assert.assertEquals("Exception msg doesn't match",
- "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
- exception.getCause().getMessage());
+ adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two"));
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
+ txnMgr.commitTxn();//txnid:3

- //todo: this currently fails since we don't yet set operation type properly
      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2"));
      Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3"));
      Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
        4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
    }
+ @Test
+ public void testCompletedTxnComponents() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists tab_not_acid2 (a int, b int)");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab_not_acid2 values(1,1),(2,2)"));
+ //writing both acid and non-acid resources in the same txn
+ checkCmdOnDriver(driver.run("from tab_not_acid2 insert into tab1 partition(p='two')(a,b) select a,b insert into tab_not_acid2(a,b) select a,b "));//txnid:1
+ Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS"));
+ //only expect transactional components to be in COMPLETED_TXN_COMPONENTS
+ Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'"));
+ }
+ @Test
+ public void testMultiInsert() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p string)");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab_not_acid values(1,1,'one'),(2,2,'two')"));
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:1
+ //writing both acid and non-acid resources in the same txn
+ //tab1 write is a dynamic partition insert
+ checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:2
+ Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS"));
+ //only expect transactional components to be in COMPLETED_TXN_COMPONENTS
+ Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2"));
+ Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'"));
+ }
+ //todo: Concurrent insert/update of same partition - should pass

    private List<ShowLocksResponseElement> getLocksWithFilterOptions(HiveTxnManager txnMgr,
        String dbName, String tblName, Map<String, String> partSpec) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 8d75ab3..b797b55 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -208,6 +208,7 @@ public class TestCleaner extends CompactorTest {

      LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
      comp.setTablename("bblt");
+ comp.setOperationType(DataOperationType.SELECT);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -246,6 +247,7 @@ public class TestCleaner extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("bblp");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.DELETE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -288,6 +290,7 @@ public class TestCleaner extends CompactorTest {

      LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
      comp.setTablename("bblt");
+ comp.setOperationType(DataOperationType.INSERT);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -311,6 +314,7 @@ public class TestCleaner extends CompactorTest {
      // clean request
      LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
      comp2.setTablename("bblt");
+ comp.setOperationType(DataOperationType.SELECT);
      List<LockComponent> components2 = new ArrayList<LockComponent>(1);
      components2.add(comp2);
      LockRequest req2 = new LockRequest(components, "me", "localhost");
@@ -360,6 +364,7 @@ public class TestCleaner extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default");
      comp.setTablename("bblt");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.INSERT);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -385,6 +390,7 @@ public class TestCleaner extends CompactorTest {
      LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default");
      comp2.setTablename("bblt");
      comp2.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.SELECT);
      List<LockComponent> components2 = new ArrayList<LockComponent>(1);
      components2.add(comp2);
      LockRequest req2 = new LockRequest(components, "me", "localhost");

http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index f84bd7e..bbd2bf8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -110,6 +110,7 @@ public class TestInitiator extends CompactorTest {
        long txnid = openTxn();
        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
        comp.setTablename("mcottma");
+ comp.setOperationType(DataOperationType.UPDATE);
        List<LockComponent> components = new ArrayList<LockComponent>(1);
        components.add(comp);
        LockRequest req = new LockRequest(components, "me", "localhost");
@@ -140,6 +141,7 @@ public class TestInitiator extends CompactorTest {
        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
        comp.setTablename("mcoptma");
        comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.DELETE);
        List<LockComponent> components = new ArrayList<LockComponent>(1);
        components.add(comp);
        LockRequest req = new LockRequest(components, "me", "localhost");
@@ -173,6 +175,7 @@ public class TestInitiator extends CompactorTest {
        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
        comp.setTablename("ncomdpa");
        comp.setPartitionname("ds=day-" + i);
+ comp.setOperationType(DataOperationType.UPDATE);
        List<LockComponent> components = new ArrayList<LockComponent>(1);
        components.add(comp);
        LockRequest req = new LockRequest(components, "me", "localhost");
@@ -197,6 +200,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("ceat");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -229,6 +233,7 @@ public class TestInitiator extends CompactorTest {
        long txnid = openTxn();
        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
        comp.setTablename("ncwncs");
+ comp.setOperationType(DataOperationType.UPDATE);
        List<LockComponent> components = new ArrayList<LockComponent>(1);
        components.add(comp);
        LockRequest req = new LockRequest(components, "me", "localhost");
@@ -254,6 +259,7 @@ public class TestInitiator extends CompactorTest {
      for (int i = 0; i < 11; i++) {
        long txnid = openTxn();
        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setOperationType(DataOperationType.DELETE);
        comp.setTablename("ncwncs");
        List<LockComponent> components = new ArrayList<LockComponent>(1);
        components.add(comp);
@@ -279,6 +285,7 @@ public class TestInitiator extends CompactorTest {
        long txnid = openTxn();
        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
        comp.setTablename("ncwcas");
+ comp.setOperationType(DataOperationType.UPDATE);
        List<LockComponent> components = new ArrayList<LockComponent>(1);
        components.add(comp);
        LockRequest req = new LockRequest(components, "me", "localhost");
@@ -319,6 +326,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("cthdp");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -351,6 +359,7 @@ public class TestInitiator extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("cphdp");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -382,6 +391,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("nctdpnhe");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -417,6 +427,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("cttmd");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -458,6 +469,7 @@ public class TestInitiator extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("cptmd");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -489,6 +501,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("nctned");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -524,6 +537,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("cmomwbv");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -564,6 +578,7 @@ public class TestInitiator extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("ednb");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.DELETE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -597,6 +612,7 @@ public class TestInitiator extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("ttospgocr");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -608,6 +624,7 @@ public class TestInitiator extends CompactorTest {
      comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("ttospgocr");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.UPDATE);
      components = new ArrayList<LockComponent>(1);
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
@@ -640,6 +657,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
      comp.setTablename("nctdp");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -667,6 +685,7 @@ public class TestInitiator extends CompactorTest {
      long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("dt");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
@@ -698,6 +717,7 @@ public class TestInitiator extends CompactorTest {
      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
      comp.setTablename("dp");
      comp.setPartitionname("ds=today");
+ comp.setOperationType(DataOperationType.UPDATE);
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");

http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
----------------------------------------------------------------------
diff --git a/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote b/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
index af2d93d..cbdbd56 100755
--- a/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
+++ b/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
@@ -145,6 +145,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
    print(' GetOpenTxnsInfoResponse get_open_txns_info()')
    print(' OpenTxnsResponse open_txns(OpenTxnRequest rqst)')
    print(' void abort_txn(AbortTxnRequest rqst)')
+ print(' void abort_txns(AbortTxnsRequest rqst)')
    print(' void commit_txn(CommitTxnRequest rqst)')
    print(' LockResponse lock(LockRequest rqst)')
    print(' LockResponse check_lock(CheckLockRequest rqst)')
@@ -953,6 +954,12 @@ elif cmd == 'abort_txn':
      sys.exit(1)
    pp.pprint(client.abort_txn(eval(args[0]),))

+elif cmd == 'abort_txns':
+ if len(args) != 1:
+ print('abort_txns requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.abort_txns(eval(args[0]),))
+
  elif cmd == 'commit_txn':
    if len(args) != 1:
      print('commit_txn requires 1 args')

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 11 of 11 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 19, '16 at 7:29p
activeMay 19, '16 at 8:00p
posts11
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 11 posts

People

Translate

site design / logo © 2021 Grokbase