Grokbase Groups Hive commits May 2016
FAQ
Repository: hive
Updated Branches:
   refs/heads/master 868e5e141 -> b70efa447


HIVE-13213 make DbLockManger work for non-acid resources (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b70efa44
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b70efa44
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b70efa44

Branch: refs/heads/master
Commit: b70efa447d9ae5883315d88e84ad1262d371213d
Parents: 47bf055
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Tue May 3 13:38:42 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Tue May 3 13:53:02 2016 -0700

----------------------------------------------------------------------
  .../hadoop/hive/metastore/txn/TxnHandler.java | 5 ++
  .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 12 +++
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 ++++++
  .../hive/ql/lockmgr/TestDbTxnManager2.java | 81 ++++++++++++++++++++
  4 files changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b70efa44/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c32b0b0..c0fa97a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -87,6 +87,11 @@ import java.util.regex.Pattern;
   * If we ever decide to run remote Derby server, according to
   * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be
   * seriazlied, so that would also work though has not been tested.
+ *
+ * General design note:
+ * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is
+ * still valid and active. In the code this is usually achieved at the same time the txn record
+ * is locked for some operation.
   */
  @InterfaceAudience.Private
  @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/hive/blob/b70efa44/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index e8ebe55..3aec8eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;

  import com.google.common.annotations.VisibleForTesting;
  import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hive.common.util.ShutdownHookManager;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@ -213,6 +214,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {
            break;

          case INSERT:
+ t = output.getTable();
+ if(t == null) {
+ throw new IllegalStateException("No table info for " + output);
+ }
+ if(AcidUtils.isAcidTable(t)) {
+ compBuilder.setShared();
+ }
+ else {
+ compBuilder.setExclusive();
+ }
+ break;
          case DDL_SHARED:
            compBuilder.setShared();
            break;

http://git-wip-us.apache.org/repos/asf/hive/blob/b70efa44/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 04c1d17..1030987 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -433,6 +433,28 @@ public class TestTxnCommands2 {
    }

    /**
+ * Test update that hits multiple partitions (i.e. requries dynamic partition insert to process)
+ * @throws Exception
+ */
+ @Test
+ public void updateDeletePartitioned() throws Exception {
+ int[][] tableData = {{1,2},{3,4},{5,6}};
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+ txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+ runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3");
+ txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR));
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+ List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+ int[][] expectedData = {{1,1,2},{1,3,5},{1,5,6},{2,1,2},{2,3,5},{2,5,6}};
+ Assert.assertEquals("Update " + Table.ACIDTBLPART + " didn't match:", stringifyValues(expectedData), rs);
+ }
+
+ /**
     * https://issues.apache.org/jira/browse/HIVE-10151
     */
    @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/b70efa44/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 6e2cf30..e94af55 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -536,6 +536,87 @@ public class TestDbTxnManager2 {
      Assert.assertEquals(0, count);
    }

+ /**
+ * collection of queries where we ensure that we get the locks that are expected
+ * @throws Exception
+ */
+ @Test
+ public void checkExpectedLocks() throws Exception {
+ CommandProcessorResponse cpr = null;
+ cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc");
+ checkCmdOnDriver(cpr);
+
+ cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)");
+ checkCmdOnDriver(cpr);
+ LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__1", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks.get(1));
+ List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+
+ cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)");
+ checkCmdOnDriver(cpr);
+ lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__2", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks.get(1));
+ relLocks = new ArrayList<HiveLock>(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+
+ cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)");
+ checkCmdOnDriver(cpr);
+ lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__3", null, locks.get(0));
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks.get(1));
+ relLocks = new ArrayList<HiveLock>(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+
+ cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)");
+ checkCmdOnDriver(cpr);
+ lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__4", null, locks.get(0));
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks.get(1));
+ relLocks = new ArrayList<HiveLock>(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+
+ cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1");
+ checkCmdOnDriver(cpr);
+ lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));
+ relLocks = new ArrayList<HiveLock>(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+
+ cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1");
+ checkCmdOnDriver(cpr);
+ lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));//https://issues.apache.org/jira/browse/HIVE-13212
+ relLocks = new ArrayList<HiveLock>(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+ }
+
    private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
      Assert.assertEquals(l.toString(),l.getType(), type);
      Assert.assertEquals(l.toString(),l.getState(), state);

Search Discussions

  • Ekoifman at May 3, 2016 at 8:53 pm
    HIVE-11848 - tables in subqueries don't get locked (Eugene Koifman, reviewed by Wei Zheng)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/47bf055c
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/47bf055c
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/47bf055c

    Branch: refs/heads/master
    Commit: 47bf055c02990272753105b917b487c5bbfe9208
    Parents: 868e5e1
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Tue May 3 13:33:42 2016 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Tue May 3 13:53:02 2016 -0700

    ----------------------------------------------------------------------
      .../ql/parse/UpdateDeleteSemanticAnalyzer.java | 16 +++++++++-
      .../hive/ql/lockmgr/TestDbTxnManager2.java | 33 ++++++++++++++++++++
      2 files changed, 48 insertions(+), 1 deletion(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/47bf055c/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    index b8771d2..33fbffe 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    @@ -329,7 +329,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
          // Walk through all our inputs and set them to note that this read is part of an update or a
          // delete.
          for (ReadEntity input : inputs) {
    - input.setUpdateOrDelete(true);
    + if(isWritten(input)) {
    + input.setUpdateOrDelete(true);
    + }
          }

          if (inputIsPartitioned(inputs)) {
    @@ -377,6 +379,18 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
          }
        }

    + /**
    + * Check that {@code readEntity} is also being written
    + */
    + private boolean isWritten(Entity readEntity) {
    + for(Entity writeEntity : outputs) {
    + //make sure to compare them as Entity, i.e. that it's the same table or partition, etc
    + if(writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
    + return true;
    + }
    + }
    + return false;
    + }
        private String operation() {
          if (updating()) return "update";
          else if (deleting()) return "delete";

    http://git-wip-us.apache.org/repos/asf/hive/blob/47bf055c/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    index 836b507..6e2cf30 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    @@ -71,6 +71,39 @@ public class TestDbTxnManager2 {
          TxnDbUtil.prepDb();
        }
        @Test
    + public void testLocksInSubquery() throws Exception {
    + checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)"));
    + checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
    + checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
    +
    + checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)"));
    + txnMgr.openTxn("one");
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
    + List<ShowLocksResponseElement> locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
    + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1));
    + txnMgr.rollbackTxn();
    +
    + checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)"));
    + txnMgr.openTxn("one");
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
    + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1));
    + txnMgr.rollbackTxn();
    +
    + checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)"));
    + txnMgr.openTxn("three");
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "three");
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 3, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks.get(1));
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks.get(2));
    + }
    + @Test
        public void createTable() throws Exception {
          CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)");
          checkCmdOnDriver(cpr);
  • Ekoifman at May 3, 2016 at 10:24 pm
    HIVE-13213 make DbLockManger work for non-acid resources (Eugene Koifman, reviewed by Alan Gates)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bfc24963
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bfc24963
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bfc24963

    Branch: refs/heads/branch-1
    Commit: bfc249632378c1b9c12c059c817b2c6227c7e0e7
    Parents: 0780218
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Tue May 3 14:52:50 2016 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Tue May 3 14:52:50 2016 -0700

    ----------------------------------------------------------------------
      .../hadoop/hive/metastore/txn/TxnHandler.java | 5 ++
      .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 12 +++
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 ++++++
      .../hive/ql/lockmgr/TestDbTxnManager2.java | 81 ++++++++++++++++++++
      4 files changed, 120 insertions(+)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    index ffd450a..f7ef88e 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    @@ -89,6 +89,11 @@ import java.util.regex.Pattern;
       * If we ever decide to run remote Derby server, according to
       * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be
       * seriazlied, so that would also work though has not been tested.
    + *
    + * General design note:
    + * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is
    + * still valid and active. In the code this is usually achieved at the same time the txn record
    + * is locked for some operation.
       */
      @InterfaceAudience.Private
      @InterfaceStability.Evolving

    http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    index a9867ef..28ee8a8 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.ql.io.AcidUtils;
      import org.apache.hive.common.util.ShutdownHookManager;
      import org.apache.hadoop.hive.common.JavaUtils;
      import org.apache.hadoop.hive.common.ValidTxnList;
    @@ -213,6 +214,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {
                break;

              case INSERT:
    + t = output.getTable();
    + if(t == null) {
    + throw new IllegalStateException("No table info for " + output);
    + }
    + if(AcidUtils.isAcidTable(t)) {
    + compBuilder.setShared();
    + }
    + else {
    + compBuilder.setExclusive();
    + }
    + break;
              case DDL_SHARED:
                compBuilder.setShared();
                break;

    http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index 1bddecb..a901074 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -433,6 +433,28 @@ public class TestTxnCommands2 {
        }

        /**
    + * Test update that hits multiple partitions (i.e. requries dynamic partition insert to process)
    + * @throws Exception
    + */
    + @Test
    + public void updateDeletePartitioned() throws Exception {
    + int[][] tableData = {{1,2},{3,4},{5,6}};
    + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
    + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
    + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
    + runWorker(hiveConf);
    + runCleaner(hiveConf);
    + runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3");
    + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR));
    + runWorker(hiveConf);
    + runCleaner(hiveConf);
    + List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
    + int[][] expectedData = {{1,1,2},{1,3,5},{1,5,6},{2,1,2},{2,3,5},{2,5,6}};
    + Assert.assertEquals("Update " + Table.ACIDTBLPART + " didn't match:", stringifyValues(expectedData), rs);
    + }
    +
    + /**
         * https://issues.apache.org/jira/browse/HIVE-10151
         */
        @Test

    http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    index 42c7064..0e2bfc0 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    @@ -524,6 +524,87 @@ public class TestDbTxnManager2 {
          Assert.assertEquals(0, count);
        }

    + /**
    + * collection of queries where we ensure that we get the locks that are expected
    + * @throws Exception
    + */
    + @Test
    + public void checkExpectedLocks() throws Exception {
    + CommandProcessorResponse cpr = null;
    + cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    + checkCmdOnDriver(cpr);
    + cpr = driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc");
    + checkCmdOnDriver(cpr);
    +
    + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)");
    + checkCmdOnDriver(cpr);
    + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
    + List<ShowLocksResponseElement> locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__1", null, locks.get(0));
    + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks.get(1));
    + List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
    + txnMgr.getLockManager().releaseLocks(relLocks);
    +
    + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)");
    + checkCmdOnDriver(cpr);
    + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__2", null, locks.get(0));
    + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks.get(1));
    + relLocks = new ArrayList<HiveLock>(2);
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
    + txnMgr.getLockManager().releaseLocks(relLocks);
    +
    + cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)");
    + checkCmdOnDriver(cpr);
    + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__3", null, locks.get(0));
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks.get(1));
    + relLocks = new ArrayList<HiveLock>(2);
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
    + txnMgr.getLockManager().releaseLocks(relLocks);
    +
    + cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)");
    + checkCmdOnDriver(cpr);
    + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__4", null, locks.get(0));
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks.get(1));
    + relLocks = new ArrayList<HiveLock>(2);
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
    + txnMgr.getLockManager().releaseLocks(relLocks);
    +
    + cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1");
    + checkCmdOnDriver(cpr);
    + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 1, locks.size());
    + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));
    + relLocks = new ArrayList<HiveLock>(2);
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
    + txnMgr.getLockManager().releaseLocks(relLocks);
    +
    + cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1");
    + checkCmdOnDriver(cpr);
    + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 1, locks.size());
    + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));//https://issues.apache.org/jira/browse/HIVE-13212
    + relLocks = new ArrayList<HiveLock>(2);
    + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
    + txnMgr.getLockManager().releaseLocks(relLocks);
    + }
    +
        private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
          Assert.assertEquals(l.toString(),l.getType(), type);
          Assert.assertEquals(l.toString(),l.getState(), state);
  • Ekoifman at May 3, 2016 at 10:24 pm
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 ab2951237 -> bfc249632


    HIVE-11848 - tables in subqueries don't get locked (Eugene Koifman, reviewed by Wei Zheng)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0780218f
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0780218f
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0780218f

    Branch: refs/heads/branch-1
    Commit: 0780218f2e075e43aa9051313b1e13b034d778ae
    Parents: ab29512
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Tue May 3 14:51:38 2016 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Tue May 3 14:51:38 2016 -0700

    ----------------------------------------------------------------------
      .../ql/parse/UpdateDeleteSemanticAnalyzer.java | 16 +++++++++-
      .../hive/ql/lockmgr/TestDbTxnManager2.java | 33 ++++++++++++++++++++
      2 files changed, 48 insertions(+), 1 deletion(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/0780218f/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    index 4c69534..a80c093 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
    @@ -321,7 +321,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
          // Walk through all our inputs and set them to note that this read is part of an update or a
          // delete.
          for (ReadEntity input : inputs) {
    - input.setUpdateOrDelete(true);
    + if(isWritten(input)) {
    + input.setUpdateOrDelete(true);
    + }
          }

          if (inputIsPartitioned(inputs)) {
    @@ -369,6 +371,18 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
          }
        }

    + /**
    + * Check that {@code readEntity} is also being written
    + */
    + private boolean isWritten(Entity readEntity) {
    + for(Entity writeEntity : outputs) {
    + //make sure to compare them as Entity, i.e. that it's the same table or partition, etc
    + if(writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
    + return true;
    + }
    + }
    + return false;
    + }
        private String operation() {
          if (updating()) return "update";
          else if (deleting()) return "delete";

    http://git-wip-us.apache.org/repos/asf/hive/blob/0780218f/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    index 0a91348..42c7064 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    @@ -72,6 +72,39 @@ public class TestDbTxnManager2 {
          TxnDbUtil.prepDb();
        }
        @Test
    + public void testLocksInSubquery() throws Exception {
    + checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)"));
    + checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
    + checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
    +
    + checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)"));
    + txnMgr.openTxn("one");
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
    + List<ShowLocksResponseElement> locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
    + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1));
    + txnMgr.rollbackTxn();
    +
    + checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)"));
    + txnMgr.openTxn("one");
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
    + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1));
    + txnMgr.rollbackTxn();
    +
    + checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)"));
    + txnMgr.openTxn("three");
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "three");
    + locks = getLocks();
    + Assert.assertEquals("Unexpected lock count", 3, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks.get(1));
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks.get(2));
    + }
    + @Test
        public void createTable() throws Exception {
          CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)");
          checkCmdOnDriver(cpr);

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 3, '16 at 8:53p
activeMay 3, '16 at 10:24p
posts4
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 4 posts

People

Translate

site design / logo © 2021 Grokbase