FAQ
Author: hashutosh
Date: Thu Aug 21 20:32:21 2014
New Revision: 1619567

URL: http://svn.apache.org/r1619567
Log:
HIVE-7281 : DbTxnManager acquiring wrong level of lock for dynamic partitioning (Alan Gates via Ashutosh Chauhan)

Modified:
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Thu Aug 21 20:32:21 2014
@@ -49,121 +49,135 @@ public class TxnDbUtil {
      // intended for creating derby databases, and thus will inexorably get
      // out of date with it. I'm open to any suggestions on how to make this
      // read the file in a build friendly way.
- Connection conn = getConnection();
- Statement s = conn.createStatement();
- s.execute("CREATE TABLE TXNS (" +
- " TXN_ID bigint PRIMARY KEY," +
- " TXN_STATE char(1) NOT NULL," +
- " TXN_STARTED bigint NOT NULL," +
- " TXN_LAST_HEARTBEAT bigint NOT NULL," +
- " TXN_USER varchar(128) NOT NULL," +
- " TXN_HOST varchar(128) NOT NULL)");
-
- s.execute("CREATE TABLE TXN_COMPONENTS (" +
- " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
- " TC_DATABASE varchar(128) NOT NULL," +
- " TC_TABLE varchar(128)," +
- " TC_PARTITION varchar(767))");
- s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
- " CTC_TXNID bigint," +
- " CTC_DATABASE varchar(128) NOT NULL," +
- " CTC_TABLE varchar(128)," +
- " CTC_PARTITION varchar(767))");
- s.execute("CREATE TABLE NEXT_TXN_ID (" +
- " NTXN_NEXT bigint NOT NULL)");
- s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
- s.execute("CREATE TABLE HIVE_LOCKS (" +
- " HL_LOCK_EXT_ID bigint NOT NULL," +
- " HL_LOCK_INT_ID bigint NOT NULL," +
- " HL_TXNID bigint," +
- " HL_DB varchar(128) NOT NULL," +
- " HL_TABLE varchar(128)," +
- " HL_PARTITION varchar(767)," +
- " HL_LOCK_STATE char(1) NOT NULL," +
- " HL_LOCK_TYPE char(1) NOT NULL," +
- " HL_LAST_HEARTBEAT bigint NOT NULL," +
- " HL_ACQUIRED_AT bigint," +
- " HL_USER varchar(128) NOT NULL," +
- " HL_HOST varchar(128) NOT NULL," +
- " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
- s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
-
- s.execute("CREATE TABLE NEXT_LOCK_ID (" +
- " NL_NEXT bigint NOT NULL)");
- s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
-
- s.execute("CREATE TABLE COMPACTION_QUEUE (" +
- " CQ_ID bigint PRIMARY KEY," +
- " CQ_DATABASE varchar(128) NOT NULL," +
- " CQ_TABLE varchar(128) NOT NULL," +
- " CQ_PARTITION varchar(767)," +
- " CQ_STATE char(1) NOT NULL," +
- " CQ_TYPE char(1) NOT NULL," +
- " CQ_WORKER_ID varchar(128)," +
- " CQ_START bigint," +
- " CQ_RUN_AS varchar(128))");
-
- s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
- s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
-
- conn.commit();
- conn.close();
+ Connection conn = null;
+ boolean committed = false;
+ try {
+ conn = getConnection();
+ Statement s = conn.createStatement();
+ s.execute("CREATE TABLE TXNS (" +
+ " TXN_ID bigint PRIMARY KEY," +
+ " TXN_STATE char(1) NOT NULL," +
+ " TXN_STARTED bigint NOT NULL," +
+ " TXN_LAST_HEARTBEAT bigint NOT NULL," +
+ " TXN_USER varchar(128) NOT NULL," +
+ " TXN_HOST varchar(128) NOT NULL)");
+
+ s.execute("CREATE TABLE TXN_COMPONENTS (" +
+ " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+ " TC_DATABASE varchar(128) NOT NULL," +
+ " TC_TABLE varchar(128)," +
+ " TC_PARTITION varchar(767))");
+ s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
+ " CTC_TXNID bigint," +
+ " CTC_DATABASE varchar(128) NOT NULL," +
+ " CTC_TABLE varchar(128)," +
+ " CTC_PARTITION varchar(767))");
+ s.execute("CREATE TABLE NEXT_TXN_ID (" +
+ " NTXN_NEXT bigint NOT NULL)");
+ s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+ s.execute("CREATE TABLE HIVE_LOCKS (" +
+ " HL_LOCK_EXT_ID bigint NOT NULL," +
+ " HL_LOCK_INT_ID bigint NOT NULL," +
+ " HL_TXNID bigint," +
+ " HL_DB varchar(128) NOT NULL," +
+ " HL_TABLE varchar(128)," +
+ " HL_PARTITION varchar(767)," +
+ " HL_LOCK_STATE char(1) NOT NULL," +
+ " HL_LOCK_TYPE char(1) NOT NULL," +
+ " HL_LAST_HEARTBEAT bigint NOT NULL," +
+ " HL_ACQUIRED_AT bigint," +
+ " HL_USER varchar(128) NOT NULL," +
+ " HL_HOST varchar(128) NOT NULL," +
+ " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+ s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
+
+ s.execute("CREATE TABLE NEXT_LOCK_ID (" +
+ " NL_NEXT bigint NOT NULL)");
+ s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
+
+ s.execute("CREATE TABLE COMPACTION_QUEUE (" +
+ " CQ_ID bigint PRIMARY KEY," +
+ " CQ_DATABASE varchar(128) NOT NULL," +
+ " CQ_TABLE varchar(128) NOT NULL," +
+ " CQ_PARTITION varchar(767)," +
+ " CQ_STATE char(1) NOT NULL," +
+ " CQ_TYPE char(1) NOT NULL," +
+ " CQ_WORKER_ID varchar(128)," +
+ " CQ_START bigint," +
+ " CQ_RUN_AS varchar(128))");
+
+ s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
+ s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+
+ conn.commit();
+ committed = true;
+ } finally {
+ if (!committed) conn.rollback();
+ conn.close();
+ }
    }

    public static void cleanDb() throws Exception {
- Connection conn = getConnection();
- Statement s = conn.createStatement();
- // We want to try these, whether they succeed or fail.
- try {
- s.execute("DROP INDEX HL_TXNID_INDEX");
- } catch (Exception e) {
- System.err.println("Unable to drop index HL_TXNID_INDEX " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE TXN_COMPONENTS");
- } catch (Exception e) {
- System.err.println("Unable to drop table TXN_COMPONENTS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS");
- } catch (Exception e) {
- System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE TXNS");
- } catch (Exception e) {
- System.err.println("Unable to drop table TXNS " +
- e.getMessage());
- }
+ Connection conn = null;
+ boolean committed = false;
      try {
- s.execute("DROP TABLE NEXT_TXN_ID");
- } catch (Exception e) {
- System.err.println("Unable to drop table NEXT_TXN_ID " +
- e.getMessage());
+ conn = getConnection();
+ Statement s = conn.createStatement();
+ // We want to try these, whether they succeed or fail.
+ try {
+ s.execute("DROP INDEX HL_TXNID_INDEX");
+ } catch (Exception e) {
+ System.err.println("Unable to drop index HL_TXNID_INDEX " +
+ e.getMessage());
+ }
+ try {
+ s.execute("DROP TABLE TXN_COMPONENTS");
+ } catch (Exception e) {
+ System.err.println("Unable to drop table TXN_COMPONENTS " +
+ e.getMessage());
+ }
+ try {
+ s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS");
+ } catch (Exception e) {
+ System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " +
+ e.getMessage());
+ }
+ try {
+ s.execute("DROP TABLE TXNS");
+ } catch (Exception e) {
+ System.err.println("Unable to drop table TXNS " +
+ e.getMessage());
+ }
+ try {
+ s.execute("DROP TABLE NEXT_TXN_ID");
+ } catch (Exception e) {
+ System.err.println("Unable to drop table NEXT_TXN_ID " +
+ e.getMessage());
+ }
+ try {
+ s.execute("DROP TABLE HIVE_LOCKS");
+ } catch (Exception e) {
+ System.err.println("Unable to drop table HIVE_LOCKS " +
+ e.getMessage());
+ }
+ try {
+ s.execute("DROP TABLE NEXT_LOCK_ID");
+ } catch (Exception e) {
+ }
+ try {
+ s.execute("DROP TABLE COMPACTION_QUEUE");
+ } catch (Exception e) {
+ }
+ try {
+ s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID");
+ } catch (Exception e) {
+ }
+ conn.commit();
+ committed = true;
+ } finally {
+ if (!committed) conn.rollback();
+ conn.close();
      }
- try {
- s.execute("DROP TABLE HIVE_LOCKS");
- } catch (Exception e) {
- System.err.println("Unable to drop table HIVE_LOCKS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE NEXT_LOCK_ID");
- } catch (Exception e) {
- }
- try {
- s.execute("DROP TABLE COMPACTION_QUEUE");
- } catch (Exception e) {
- }
- try {
- s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID");
- } catch (Exception e) {
- }
- conn.commit();
- conn.close();
    }

    /**
@@ -174,25 +188,34 @@ public class TxnDbUtil {
     */
    public static int countLockComponents(long lockId) throws Exception {
      Connection conn = getConnection();
- Statement s = conn.createStatement();
- ResultSet rs = s.executeQuery("select count(*) from hive_locks where " +
- "hl_lock_ext_id = " + lockId);
- if (!rs.next()) return 0;
- int rc = rs.getInt(1);
- conn.rollback();
- conn.close();
- return rc;
+ try {
+ Statement s = conn.createStatement();
+ ResultSet rs = s.executeQuery("select count(*) from hive_locks where hl_lock_ext_id = " +
+ lockId);
+ if (!rs.next()) return 0;
+ int rc = rs.getInt(1);
+ return rc;
+ } finally {
+ conn.rollback();
+ conn.close();
+ }
    }

    public static int findNumCurrentLocks() throws Exception {
- Connection conn = getConnection();
- Statement s = conn.createStatement();
- ResultSet rs = s.executeQuery("select count(*) from hive_locks");
- if (!rs.next()) return 0;
- int rc = rs.getInt(1);
- conn.rollback();
- conn.close();
- return rc;
+ Connection conn = null;
+ try {
+ conn = getConnection();
+ Statement s = conn.createStatement();
+ ResultSet rs = s.executeQuery("select count(*) from hive_locks");
+ if (!rs.next()) return 0;
+ int rc = rs.getInt(1);
+ return rc;
+ } finally {
+ if (conn != null) {
+ conn.rollback();
+ conn.close();
+ }
+ }
    }

    private static Connection getConnection() throws Exception {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Thu Aug 21 20:32:21 2014
@@ -165,13 +165,13 @@ public class DbTxnManager extends HiveTx
            break;

          case TABLE:
+ case DUMMYPARTITION: // in case of dynamic partitioning lock the table
            t = output.getTable();
            compBuilder.setDbName(t.getDbName());
            compBuilder.setTableName(t.getTableName());
            break;

          case PARTITION:
- case DUMMYPARTITION:
            compBuilder.setPartitionName(output.getPartition().getName());
            t = output.getPartition().getTable();
            compBuilder.setDbName(t.getDbName());
@@ -301,7 +301,10 @@ public class DbTxnManager extends HiveTx
      try {
        if (txnId > 0) rollbackTxn();
        if (lockMgr != null) lockMgr.close();
+ if (client != null) client.close();
      } catch (Exception e) {
+ LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
+ + ">, swallowing as there is nothing we can do with it.");
        // Not much we can do about it here.
      }
    }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java Thu Aug 21 20:32:21 2014
@@ -21,12 +21,12 @@ import junit.framework.Assert;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
  import org.apache.hadoop.hive.ql.Context;
  import org.apache.hadoop.hive.ql.QueryPlan;
  import org.apache.hadoop.hive.ql.hooks.ReadEntity;
  import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition;
  import org.apache.hadoop.hive.ql.metadata.Table;
  import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.log4j.Level;
@@ -137,6 +137,43 @@ public class TestDbTxnManager {
      Assert.assertEquals(0, locks.size());
    }

+
+ @Test
+ public void testSingleWritePartition() throws Exception {
+ WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
+ QueryPlan qp = new MockQueryPlan(this);
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(qp, ctx, "fred");
+ List<HiveLock> locks = ctx.getHiveLocks();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testWriteDynamicPartition() throws Exception {
+ WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT);
+ QueryPlan qp = new MockQueryPlan(this);
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(qp, ctx, "fred");
+ List<HiveLock> locks = ctx.getHiveLocks();
+ Assert.assertEquals(1, locks.size());
+ /*Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
+ */// Make sure we're locking the whole table, since this is dynamic partitioning
+ ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
+ List<ShowLocksResponseElement> elms = rsp.getLocks();
+ Assert.assertEquals(1, elms.size());
+ Assert.assertNotNull(elms.get(0).getTablename());
+ Assert.assertNull(elms.get(0).getPartname());
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
    @Test
    public void testReadWrite() throws Exception {
      Table t = newTable(true);
@@ -252,6 +289,7 @@ public class TestDbTxnManager {

    @After
    public void tearDown() throws Exception {
+ if (txnMgr != null) txnMgr.closeTxnManager();
      TxnDbUtil.cleanDb();
    }

@@ -318,4 +356,12 @@ public class TestDbTxnManager {
      writeEntities.add(we);
      return we;
    }
+
+ private WriteEntity addDynamicPartitionedOutput(Table t, WriteEntity.WriteType writeType)
+ throws Exception {
+ DummyPartition dp = new DummyPartition(t, "no clue what I should call this");
+ WriteEntity we = new WriteEntity(dp, writeType, false);
+ writeEntities.add(we);
+ return we;
+ }
  }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java Thu Aug 21 20:32:21 2014
@@ -65,7 +65,6 @@ public class TestQBCompact {
    private AlterTableSimpleDesc parseAndAnalyze(String query) throws Exception {
      ParseDriver hd = new ParseDriver();
      ASTNode head = (ASTNode)hd.parse(query).getChild(0);
- System.out.println("HERE " + head.dump());
      BaseSemanticAnalyzer a = SemanticAnalyzerFactory.get(conf, head);
      a.analyze(head, new Context(conf));
      List<Task<? extends Serializable>> roots = a.getRootTasks();

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedAug 21, '14 at 8:32p
activeAug 21, '14 at 8:32p
posts1
users1
websitehive.apache.org

1 user in discussion

Hashutosh: 1 post

People

Translate

site design / logo © 2022 Grokbase