Grokbase Groups Hive commits May 2016
FAQ
HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7dbc53da
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7dbc53da
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7dbc53da

Branch: refs/heads/branch-1
Commit: 7dbc53da98fb343fc589ff887a8dcc8893a786da
Parents: 8a59b85
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu May 5 15:23:03 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu May 5 15:23:03 2016 -0700

----------------------------------------------------------------------
  .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
  .../hive/metastore/TestHiveMetaStoreTxns.java | 2 +-
  .../upgrade/derby/035-HIVE-13395.derby.sql | 11 +
  .../derby/hive-txn-schema-1.3.0.derby.sql | 11 +-
  .../derby/upgrade-1.2.0-to-1.3.0.derby.sql | 2 +
  .../upgrade/mssql/020-HIVE-13395.mssql.sql | 9 +
  .../upgrade/mssql/hive-schema-1.3.0.mssql.sql | 12 +-
  .../mssql/upgrade-1.2.0-to-1.3.0.mssql.sql | 1 +
  .../upgrade/mysql/035-HIVE-13395.mysql.sql | 10 +
  .../mysql/hive-txn-schema-1.3.0.mysql.sql | 9 +
  .../mysql/upgrade-1.2.0-to-1.3.0.mysql.sql | 1 +
  .../upgrade/oracle/035-HIVE-13395.oracle.sql | 10 +
  .../oracle/hive-txn-schema-1.3.0.oracle.sql | 12 +-
  .../oracle/upgrade-1.2.0-to-1.3.0.oracle.sql | 1 +
  .../postgres/034-HIVE-13395.postgres.sql | 10 +
  .../postgres/hive-txn-schema-1.3.0.postgres.sql | 11 +-
  .../upgrade-1.2.0-to-1.3.0.postgres.sql | 1 +
  .../hadoop/hive/metastore/HiveMetaStore.java | 1 +
  .../hadoop/hive/metastore/txn/TxnDbUtil.java | 131 ++--
  .../hadoop/hive/metastore/txn/TxnHandler.java | 466 +++++++++++---
  .../hadoop/hive/metastore/txn/TxnStore.java | 8 +-
  .../hadoop/hive/metastore/txn/TxnUtils.java | 2 +
  .../metastore/txn/TestCompactionTxnHandler.java | 6 +-
  .../hive/metastore/txn/TestTxnHandler.java | 29 +-
  .../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +-
  .../hadoop/hive/ql/lockmgr/DbLockManager.java | 5 +-
  .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 27 +-
  .../hadoop/hive/ql/txn/AcidWriteSetService.java | 61 ++
  .../txn/compactor/HouseKeeperServiceBase.java | 2 +-
  .../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +-
  .../hadoop/hive/ql/txn/compactor/Worker.java | 2 +-
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +-
  .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 20 +
  .../hive/ql/lockmgr/TestDbTxnManager.java | 7 +
  .../hive/ql/lockmgr/TestDbTxnManager2.java | 610 ++++++++++++++++++-
  .../hive/ql/txn/compactor/TestCleaner.java | 2 +
  36 files changed, 1313 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7c93e44..1086595 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1576,6 +1576,8 @@ public class HiveConf extends Configuration {
        new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
      HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",
        new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"),
+ WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s",
+ new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),

      // For HBase storage handler
      HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index 5ad5f35..d5ecf98 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -186,7 +186,7 @@ public class TestHiveMetaStoreTxns {
          .setDbName("mydb")
          .setTableName("mytable")
          .setPartitionName("mypartition")
- .setExclusive()
+ .setSemiShared()
          .build())
        .addLockComponent(new LockComponentBuilder()
          .setDbName("mydb")

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql b/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
new file mode 100644
index 0000000..df33b95
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
@@ -0,0 +1,11 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
index 13f3340..480c19e 100644
--- a/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
@@ -32,7 +32,8 @@ CREATE TABLE TXN_COMPONENTS (
    TC_TXNID bigint REFERENCES TXNS (TXN_ID),
    TC_DATABASE varchar(128) NOT NULL,
    TC_TABLE varchar(128),
- TC_PARTITION varchar(767)
+ TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL
  );

  CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -117,3 +118,11 @@ CREATE TABLE AUX_TABLE (
    PRIMARY KEY(MT_KEY1, MT_KEY2)
  );

+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
index 6d4e591..1b9e171 100644
--- a/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
@@ -10,4 +10,6 @@ RUN '029-HIVE-12822.derby.sql';
  RUN '030-HIVE-12823.derby.sql';
  RUN '031-HIVE-12831.derby.sql';
  RUN '032-HIVE-12832.derby.sql';
+RUN '035-HIVE-13395.derby.sql';
+
  UPDATE "APP".VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql b/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
new file mode 100644
index 0000000..281014c
--- /dev/null
+++ b/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
@@ -0,0 +1,9 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE nvarchar(128) NOT NULL,
+ WS_TABLE nvarchar(128) NOT NULL,
+ WS_PARTITION nvarchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1) NULL;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
index 33c5ff6..7e0e24f 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
@@ -964,7 +964,8 @@ CREATE TABLE TXN_COMPONENTS(
   TC_TXNID bigint NULL,
   TC_DATABASE nvarchar(128) NOT NULL,
   TC_TABLE nvarchar(128) NULL,
- TC_PARTITION nvarchar(767) NULL
+ TC_PARTITION nvarchar(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
  );

  ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -980,6 +981,15 @@ CREATE TABLE AUX_TABLE (
  )
  );

+CREATE TABLE WRITE_SET (
+ WS_DATABASE nvarchar(128) NOT NULL,
+ WS_TABLE nvarchar(128) NOT NULL,
+ WS_PARTITION nvarchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+

  -- -----------------------------------------------------------------
  -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
index b4de8ce..18da152 100644
--- a/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
@@ -10,6 +10,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE;
  :r 015-HIVE-12823.mssql.sql;
  :r 016-HIVE-12831.mssql.sql;
  :r 017-HIVE-12832.mssql.sql;
+:r 020-HIVE-13395.mssql.sql;

  UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
  SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE;

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql b/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
new file mode 100644
index 0000000..586caef
--- /dev/null
+++ b/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
index ea42757..e852fc9 100644
--- a/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
@@ -34,6 +34,7 @@ CREATE TABLE TXN_COMPONENTS (
    TC_DATABASE varchar(128) NOT NULL,
    TC_TABLE varchar(128),
    TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL,
    FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
  ) ENGINE=InnoDB DEFAULT CHARSET=latin1;

@@ -120,3 +121,11 @@ CREATE TABLE AUX_TABLE (
    PRIMARY KEY(MT_KEY1, MT_KEY2)
  ) ENGINE=InnoDB DEFAULT CHARSET=latin1;

+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
index f385549..021b802 100644
--- a/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
@@ -11,6 +11,7 @@ SOURCE 029-HIVE-12822.mysql.sql;
  SOURCE 030-HIVE-12823.mysql.sql;
  SOURCE 031-HIVE-12831.mysql.sql;
  SOURCE 032-HIVE-12832.mysql.sql;
+SOURCE 035-HIVE-13395.mysql.sql;

  UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
  SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS ' ';

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql b/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
new file mode 100644
index 0000000..ad1bbd9
--- /dev/null
+++ b/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar2(128) NOT NULL,
+ WS_TABLE varchar2(128) NOT NULL,
+ WS_PARTITION varchar2(767),
+ WS_TXNID number(19) NOT NULL,
+ WS_COMMIT_ID number(19) NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
index 788741a..199ff4c 100644
--- a/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
    TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID),
    TC_DATABASE VARCHAR2(128) NOT NULL,
    TC_TABLE VARCHAR2(128),
- TC_PARTITION VARCHAR2(767) NULL
+ TC_PARTITION VARCHAR2(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
  ) ROWDEPENDENCIES;

  CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -118,3 +119,12 @@ CREATE TABLE AUX_TABLE (
    PRIMARY KEY(MT_KEY1, MT_KEY2)
  );

+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar2(128) NOT NULL,
+ WS_TABLE varchar2(128) NOT NULL,
+ WS_PARTITION varchar2(767),
+ WS_TXNID number(19) NOT NULL,
+ WS_COMMIT_ID number(19) NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
index 55e272a..ce86e67 100644
--- a/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
@@ -10,6 +10,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual;
  @030-HIVE-12823.oracle.sql;
  @031-HIVE-12381.oracle.sql;
  @032-HIVE-12832.oracle.sql;
+@035-hive-13395.oracle.sql;

  UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
  SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual;

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql b/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
new file mode 100644
index 0000000..4dda283
--- /dev/null
+++ b/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
index b2fc1a8..b606f81 100644
--- a/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
    TC_TXNID bigint REFERENCES TXNS (TXN_ID),
    TC_DATABASE varchar(128) NOT NULL,
    TC_TABLE varchar(128),
- TC_PARTITION varchar(767) DEFAULT NULL
+ TC_PARTITION varchar(767) DEFAULT NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
  );

  CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -118,4 +119,12 @@ CREATE TABLE AUX_TABLE (
    PRIMARY KEY(MT_KEY1, MT_KEY2)
  );

+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);


http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
index 6b4123b..624dde6 100644
--- a/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
@@ -10,6 +10,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0';
  \i 029-HIVE-12823.postgres.sql;
  \i 030-HIVE-12831.postgres.sql;
  \i 031-HIVE-12832.postgres.sql;
+\i 034-HIVE-13395.postgres.sql;

  UPDATE "VERSION" SET "SCHEMA_VERSION"='1.3.0', "VERSION_COMMENT"='Hive release version 1.3.0' where "VER_ID"=1;
  SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0';

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index bf65532..73422c8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6306,6 +6306,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
      }
      startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
      startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
+ startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService"));
    }
    private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
      //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 2e24678..5805966 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -21,11 +21,13 @@ import java.sql.Connection;
  import java.sql.Driver;
  import java.sql.PreparedStatement;
  import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
  import java.sql.SQLException;
  import java.sql.SQLTransactionRollbackException;
  import java.sql.Statement;
  import java.util.Properties;

+import com.google.common.annotations.VisibleForTesting;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.hive.conf.HiveConf;
@@ -68,7 +70,7 @@ public final class TxnDbUtil {
      Connection conn = null;
      Statement stmt = null;
      try {
- conn = getConnection(true);
+ conn = getConnection();
        stmt = conn.createStatement();
        stmt.execute("CREATE TABLE TXNS (" +
            " TXN_ID bigint PRIMARY KEY," +
@@ -82,7 +84,8 @@ public final class TxnDbUtil {
            " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
            " TC_DATABASE varchar(128) NOT NULL," +
            " TC_TABLE varchar(128)," +
- " TC_PARTITION varchar(767))");
+ " TC_PARTITION varchar(767)," +
+ " TC_OPERATION_TYPE char(1) NOT NULL)");
        stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
            " CTC_TXNID bigint," +
            " CTC_DATABASE varchar(128) NOT NULL," +
@@ -146,16 +149,25 @@ public final class TxnDbUtil {
          " CC_HADOOP_JOB_ID varchar(32))");

        stmt.execute("CREATE TABLE AUX_TABLE (" +
- " MT_KEY1 varchar(128) NOT NULL," +
- " MT_KEY2 bigint NOT NULL," +
- " MT_COMMENT varchar(255)," +
- " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
+ " MT_KEY1 varchar(128) NOT NULL," +
+ " MT_KEY2 bigint NOT NULL," +
+ " MT_COMMENT varchar(255)," +
+ " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
          ")");
+
+ stmt.execute("CREATE TABLE WRITE_SET (" +
+ " WS_DATABASE varchar(128) NOT NULL," +
+ " WS_TABLE varchar(128) NOT NULL," +
+ " WS_PARTITION varchar(767)," +
+ " WS_TXNID bigint NOT NULL," +
+ " WS_COMMIT_ID bigint NOT NULL," +
+ " WS_OPERATION_TYPE char(1) NOT NULL)"
+ );
      } catch (SQLException e) {
        try {
          conn.rollback();
        } catch (SQLException re) {
- System.err.println("Error rolling back: " + re.getMessage());
+ LOG.error("Error rolling back: " + re.getMessage());
        }

        // This might be a deadlock, if so, let's retry
@@ -172,40 +184,60 @@ public final class TxnDbUtil {
    }

    public static void cleanDb() throws Exception {
- Connection conn = null;
- Statement stmt = null;
- try {
- conn = getConnection(true);
- stmt = conn.createStatement();
-
- // We want to try these, whether they succeed or fail.
+ int retryCount = 0;
+ while(++retryCount <= 3) {
+ boolean success = true;
+ Connection conn = null;
+ Statement stmt = null;
        try {
- stmt.execute("DROP INDEX HL_TXNID_INDEX");
- } catch (Exception e) {
- System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage());
- }
+ conn = getConnection();
+ stmt = conn.createStatement();

- dropTable(stmt, "TXN_COMPONENTS");
- dropTable(stmt, "COMPLETED_TXN_COMPONENTS");
- dropTable(stmt, "TXNS");
- dropTable(stmt, "NEXT_TXN_ID");
- dropTable(stmt, "HIVE_LOCKS");
- dropTable(stmt, "NEXT_LOCK_ID");
- dropTable(stmt, "COMPACTION_QUEUE");
- dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
- dropTable(stmt, "COMPLETED_COMPACTIONS");
- dropTable(stmt, "AUX_TABLE");
- } finally {
- closeResources(conn, stmt, null);
+ // We want to try these, whether they succeed or fail.
+ try {
+ stmt.execute("DROP INDEX HL_TXNID_INDEX");
+ } catch (SQLException e) {
+ if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) {
+ //42X65/3000 means index doesn't exist
+ LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() +
+ "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
+ success = false;
+ }
+ }
+
+ success &= dropTable(stmt, "TXN_COMPONENTS", retryCount);
+ success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
+ success &= dropTable(stmt, "TXNS", retryCount);
+ success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+ success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
+ success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
+ success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
+ success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount);
+ success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount);
+ success &= dropTable(stmt, "AUX_TABLE", retryCount);
+ success &= dropTable(stmt, "WRITE_SET", retryCount);
+ } finally {
+ closeResources(conn, stmt, null);
+ }
+ if(success) {
+ return;
+ }
      }
    }

- private static void dropTable(Statement stmt, String name) {
+ private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException {
      try {
        stmt.execute("DROP TABLE " + name);
- } catch (Exception e) {
- System.err.println("Unable to drop table " + name + ": " + e.getMessage());
+ return true;
+ } catch (SQLException e) {
+ if("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+ //failed because object doesn't exist
+ return true;
+ }
+ LOG.error("Unable to drop table " + name + ": " + e.getMessage() +
+ " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
      }
+ return false;
    }

    /**
@@ -256,11 +288,34 @@ public final class TxnDbUtil {
        closeResources(conn, stmt, rs);
      }
    }
+ @VisibleForTesting
+ public static String queryToString(String query) throws Exception {
+ Connection conn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ StringBuilder sb = new StringBuilder();
+ try {
+ conn = getConnection();
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(query);
+ ResultSetMetaData rsmd = rs.getMetaData();
+ for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+ sb.append(rsmd.getColumnName(colPos)).append(" ");
+ }
+ sb.append('\n');
+ while(rs.next()) {
+ for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+ sb.append(rs.getObject(colPos)).append(" ");
+ }
+ sb.append('\n');
+ }
+ } finally {
+ closeResources(conn, stmt, rs);
+ }
+ return sb.toString();
+ }

    static Connection getConnection() throws Exception {
- return getConnection(false);
- }
- static Connection getConnection(boolean isAutoCommit) throws Exception {
      HiveConf conf = new HiveConf();
      String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
      Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
@@ -272,7 +327,7 @@ public final class TxnDbUtil {
      prop.setProperty("user", user);
      prop.setProperty("password", passwd);
      Connection conn = driver.connect(driverUrl, prop);
- conn.setAutoCommit(isAutoCommit);
+ conn.setAutoCommit(true);
      return conn;
    }

@@ -281,7 +336,7 @@ public final class TxnDbUtil {
        try {
          rs.close();
        } catch (SQLException e) {
- System.err.println("Error closing ResultSet: " + e.getMessage());
+ LOG.error("Error closing ResultSet: " + e.getMessage());
        }
      }


http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index f7ef88e..ec60fa5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -74,7 +74,7 @@ import java.util.regex.Pattern;
   * used to properly sequence operations. Most notably:
   * 1. various sequence IDs are generated with aid of this mutex
   * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state
- * includes it's actual state (Open, Aborted) as well as it's lock list/component list. Thus all
+ * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all
   * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
   * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
   * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
@@ -129,6 +129,41 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

    static private DataSource connPool;
    static private boolean doRetryOnConnPool = false;
+
+ private enum OpertaionType {
+ INSERT('i'), UPDATE('u'), DELETE('d');
+ private final char sqlConst;
+ OpertaionType(char sqlConst) {
+ this.sqlConst = sqlConst;
+ }
+ public String toString() {
+ return Character.toString(sqlConst);
+ }
+ public static OpertaionType fromString(char sqlConst) {
+ switch (sqlConst) {
+ case 'i':
+ return INSERT;
+ case 'u':
+ return UPDATE;
+ case 'd':
+ return DELETE;
+ default:
+ throw new IllegalArgumentException(quoteChar(sqlConst));
+ }
+ }
+ //we should instead just pass in OpertaionType from client (HIVE-13622)
+ @Deprecated
+ public static OpertaionType fromLockType(LockType lockType) {
+ switch (lockType) {
+ case SHARED_READ:
+ return INSERT;
+ case SHARED_WRITE:
+ return UPDATE;
+ default:
+ throw new IllegalArgumentException("Unexpected lock type: " + lockType);
+ }
+ }
+ }

    /**
     * Number of consecutive deadlocks we have seen
@@ -476,6 +511,31 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
      }
    }

+ /**
+ * Concurrency/isolation notes:
+ * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
+ * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X
+ * see more notes below.
+ * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn
+ * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
+ * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of
+ * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap.
+ *
+ * Motivating example:
+ * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
+ * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
+ * that they read appropriately. In particular, if txns do not overlap, then one follows the other
+ * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure
+ * this by locking in snapshot after
+ * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()})
+ * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure
+ * that txn T which will be considered a later txn, locks in a snapshot that includes the result
+ * of S's commit (assuming no other txns).
+ * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
+ * were running in parallel). If T and S both locked in the same snapshot (for example commit of
+ * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
+ * 'x' would be updated to the same value by both, i.e. lost update.
+ */
    public void commitTxn(CommitTxnRequest rqst)
      throws NoSuchTxnException, TxnAbortedException, MetaException {
      long txnid = rqst.getTxnid();
@@ -483,40 +543,116 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        Connection dbConn = null;
        Statement stmt = null;
        ResultSet lockHandle = null;
+ ResultSet commitIdRs = null, rs;
        try {
          lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ /**
+ * This S4U will mutex with other commitTxn() and openTxns().
+ * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
+ * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start
+ * at the same time and no new txns start until all 3 commit.
+ * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
+ */
+ commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
+ if(!commitIdRs.next()) {
+ throw new IllegalStateException("No rows found in NEXT_TXN_ID");
+ }
+ long commitId = commitIdRs.getLong(1);
          /**
           * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other
           * operation can change this txn (such acquiring locks). While lock() and commitTxn()
           * should not normally run concurrently (for same txn) but could due to bugs in the client
           * which could then corrupt internal transaction manager state. Also competes with abortTxn().
           */
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
-
          lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
          if(lockHandle == null) {
            //this also ensures that txn is still there and in expected state (hasn't been timed out)
            ensureValidTxn(dbConn, txnid, stmt);
            shouldNeverHappen(txnid);
          }
-
+ Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+ int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
+ " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " +
+ "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")");
+ if(numCompsWritten == 0) {
+ /**
+ * current txn didn't update/delete anything (may have inserted), so just proceed with commit
+ *
+ * We only care about commit id for write txns, so for RO (when supported) txns we don't
+ * have to mutex on NEXT_TXN_ID.
+ * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
+ * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
+ * If RO < W, then there is no reads-from relationship.
+ */
+ }
+ else {
+ /**
+ * see if there are any overlapping txns wrote the same element, i.e. have a conflict
+ * Since entire commit operation is mutexed wrt other start/commit ops,
+ * committed.ws_commit_id <= current.ws_commit_id for all txns
+ * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
+ * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
+ * [17,20] committed and [21,21] committing now - these do not overlap.
+ * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running)
+ */
+ rs = stmt.executeQuery
+ (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
+ "committed.ws_table, committed.ws_partition, cur.ws_commit_id " +
+ "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
+ "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
+ //For partitioned table we always track writes at partition level (never at table)
+ //and for non partitioned - always at table level, thus the same table should never
+ //have entries with partition key and w/o
+ "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
+ "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
+ // with txnid, though any decent DB should infer this
+ " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
+ // part of this commitTxn() op
+ " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
+ //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
+ " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +
+ " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
+ if(rs.next()) {
+ //found a conflict
+ String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+ StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+ String partitionName = rs.getString(5);
+ if(partitionName != null) {
+ resource.append('/').append(partitionName);
+ }
+ String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
+ " committed by " + committedTxn;
+ close(rs);
+ //remove WRITE_SET info for current txn since it's about to abort
+ dbConn.rollback(undoWriteSetForCurrentTxn);
+ LOG.info(msg);
+ //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
+ if(abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ dbConn.commit();
+ close(null, stmt, dbConn);
+ throw new TxnAbortedException(msg);
+ }
+ else {
+ //no conflicting operations, proceed with the rest of commit sequence
+ }
+ }
          // Move the record from txn_components into completed_txn_components so that the compactor
          // knows where to look to compact.
          String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
            "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
          LOG.debug("Going to execute insert <" + s + ">");
          if (stmt.executeUpdate(s) < 1) {
- //this can be reasonable for an empty txn START/COMMIT
+ //this can be reasonable for an empty txn START/COMMIT or read-only txn
            LOG.info("Expected to move at least one record from txn_components to " +
              "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
          }
-
- // Always access TXN_COMPONENTS before HIVE_LOCKS;
          s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
          LOG.debug("Going to execute update <" + s + ">");
          stmt.executeUpdate(s);
- // Always access HIVE_LOCKS before TXNS
          s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
          LOG.debug("Going to execute update <" + s + ">");
          stmt.executeUpdate(s);
@@ -532,6 +668,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          throw new MetaException("Unable to update transaction database "
            + StringUtils.stringifyException(e));
        } finally {
+ close(commitIdRs);
          close(lockHandle, stmt, dbConn);
          unlockInternal();
        }
@@ -539,7 +676,50 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        commitTxn(rqst);
      }
    }
-
+ @Override
+ public void performWriteSetGC() {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+ if(!rs.next()) {
+ throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
+ }
+ long highestAllocatedTxnId = rs.getLong(1);
+ close(rs);
+ rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
+ if(!rs.next()) {
+ throw new IllegalStateException("Scalar query returned no rows?!?!!");
+ }
+ long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
+ long lowestOpenTxnId = rs.getLong(1);
+ if(rs.wasNull()) {
+ //if here then there are no Open txns and highestAllocatedTxnId must be
+ //resolved (i.e. committed or aborted), either way
+ //there are no open txns with id <= highestAllocatedTxnId
+ //the +1 is there because "delete ..." below has < (which is correct for the case when
+ //there is an open txn
+ //Concurrency: even if new txn starts (or starts + commits) it is still true that
+ //there are no currently open txns that overlap with any committed txn with
+ //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough.
+ commitHighWaterMark = highestAllocatedTxnId + 1;
+ }
+ else {
+ commitHighWaterMark = lowestOpenTxnId;
+ }
+ int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
+ LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+ dbConn.commit();
+ } catch (SQLException ex) {
+ LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
+ }
+ finally {
+ close(rs, stmt, dbConn);
+ }
+ }
    /**
     * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
     * connection (but separate transactions). This avoid some flakiness in BONECP where if you
@@ -567,7 +747,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

    /**
     * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read
- * and then executeUpdate(). One other alternative would be to actually update the row in TXNX but
+ * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but
     * to the same value as before thus forcing db to acquire write lock for duration of the transaction.
     *
     * There is no real reason to return the ResultSet here other than to make sure the reference to it
@@ -638,6 +818,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          stmt.executeUpdate(s);

          if (txnid > 0) {
+ /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get)
+ * So if we add that to LockRequest we'll know that here
+ * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest
+ * to contain LockComponent for multiple operations.
+ * Deriving it from lock info doesn't distinguish between Update and Delete
+ *
+ * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
+ * FileSinkDesc.table is ql.metadata.Table
+ * Table.tableSpec which is TableSpec, which has specType which is SpecType
+ * So maybe this can work to know that this is part of dynamic partition insert in which case
+ * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
+ * In any case, that's an optimization for now; will be required when adding multi-stmt txns
+ */
            // For each component in this lock request,
            // add an entry to the txn_components table
            // This must be done before HIVE_LOCKS is accessed
@@ -646,10 +839,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
              String tblName = lc.getTablename();
              String partName = lc.getPartitionname();
              s = "insert into TXN_COMPONENTS " +
- "(tc_txnid, tc_database, tc_table, tc_partition) " +
+ "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " +
                "values (" + txnid + ", '" + dbName + "', " +
                (tblName == null ? "null" : "'" + tblName + "'") + ", " +
- (partName == null ? "null" : "'" + partName + "'") + ")";
+ (partName == null ? "null" : "'" + partName + "'")+ "," +
+ quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")";
              LOG.debug("Going to execute update <" + s + ">");
              stmt.executeUpdate(s);
            }
@@ -720,9 +914,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          lockInternal();
          if(dbConn.isClosed()) {
            //should only get here if retrying this op
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
          }
- dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
          return checkLock(dbConn, extLockId);
        } catch (SQLException e) {
          LOG.debug("Going to rollback");
@@ -778,7 +971,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and
          //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
          //extra heartbeat is logically harmless, but ...
- dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
          return checkLock(dbConn, extLockId);
        } catch (SQLException e) {
          LOG.debug("Going to rollback");
@@ -1184,11 +1376,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
      throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
        + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
    }
+
    public void addDynamicPartitions(AddDynamicPartitions rqst)
        throws NoSuchTxnException, TxnAbortedException, MetaException {
      Connection dbConn = null;
      Statement stmt = null;
      ResultSet lockHandle = null;
+ ResultSet rs = null;
      try {
        try {
          lockInternal();
@@ -1200,18 +1394,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
            shouldNeverHappen(rqst.getTxnid());
          }
+ //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to
+ //for multi stmt txns if same table is written more than once per tx
+ // MoveTask knows if it's I/U/D
+ // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions()
+ // which ends up here so we'd need to add a field to AddDynamicPartitions.
+ String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid()
+ + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+ //do limit 1 on this; currently they will all have the same operations
+ rs = stmt.executeQuery(addLimitClause(1, findOperationType));
+ if(!rs.next()) {
+ throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid()));
+ }
+ OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0));
+
+ //what if a txn writes the same table > 1 time... let's go with this for now, but really
+ //need to not write this in the first place, i.e. make this delete not needed
+ //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
+ String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
+ quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+ //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
+ //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which
+ //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually
+ //written to
+ stmt.executeUpdate(deleteSql);
          for (String partName : rqst.getPartitionnames()) {
- StringBuilder buff = new StringBuilder();
- buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values (");
- buff.append(rqst.getTxnid());
- buff.append(", '");
- buff.append(rqst.getDbname());
- buff.append("', '");
- buff.append(rqst.getTablename());
- buff.append("', '");
- buff.append(partName);
- buff.append("')");
- String s = buff.toString();
+ String s =
+ "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" +
+ rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+ "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")";
            LOG.debug("Going to execute update <" + s + ">");
            stmt.executeUpdate(s);
          }
@@ -1946,60 +2157,113 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
      return txnId != 0;
    }
    /**
+ * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+ * hl_lock_ext_id by only checking earlier locks.
+ *
+ * For any given SQL statment all locks required by it are grouped under single extLockId and are
+ * granted all at once or all locks wait.
+ *
+ * This is expected to run at READ_COMMITTED.
+ *
     * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
     * all locks for given extLockId or none. Would be more efficient to update state on all locks
- * at once. Semantics are the same since this is all part of the same txn@serializable.
+ * at once. Semantics are the same since this is all part of the same txn.
     *
- * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
- * hl_lock_ext_id by only checking earlier locks.
+ * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS.
+ * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid,
+ * checkLock() will in the worst case keep locks in Waiting state a little longer.
     */
- private LockResponse checkLock(Connection dbConn,
- long extLockId)
+ private LockResponse checkLock(Connection dbConn, long extLockId)
      throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
- if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) {
- //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations
- //that would be less prone to deadlocks
- throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation());
- }
- List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+ TxnStore.MutexAPI.LockHandle handle = null;
+ Statement stmt = null;
+ ResultSet rs = null;
      LockResponse response = new LockResponse();
- response.setLockid(extLockId);
-
- LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
- Savepoint save = dbConn.setSavepoint();//todo: get rid of this
- StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
- "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
-
- Set<String> strings = new HashSet<String>(locksBeingChecked.size());
- for (LockInfo info : locksBeingChecked) {
- strings.add(info.db);
- }
- boolean first = true;
- for (String s : strings) {
- if (first) first = false;
- else query.append(", ");
- query.append('\'');
- query.append(s);
- query.append('\'');
- }
- query.append(")");
-
- // If any of the table requests are null, then I need to pull all the
- // table locks for this db.
- boolean sawNull = false;
- strings.clear();
- for (LockInfo info : locksBeingChecked) {
- if (info.table == null) {
- sawNull = true;
- break;
- } else {
- strings.add(info.table);
+ /**
+ * todo: Longer term we should pass this from client somehow - this would be an optimization; once
+ * that is in place make sure to build and test "writeSet" below using OperationType not LockType
+ */
+ boolean isPartOfDynamicPartitionInsert = true;
+ try {
+ /**
+ * checkLock() must be mutexed against any other checkLock to make sure 2 conflicting locks
+ * are not granted by parallel checkLock() calls.
+ */
+ handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name());
+ List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+ response.setLockid(extLockId);
+
+ LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
+ Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+ StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
+ "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
+ "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
+
+ Set<String> strings = new HashSet<String>(locksBeingChecked.size());
+
+ //This the set of entities that the statement represnted by extLockId wants to update
+ List<LockInfo> writeSet = new ArrayList<>();
+
+ for (LockInfo info : locksBeingChecked) {
+ strings.add(info.db);
+ if(!isPartOfDynamicPartitionInsert && info.type == LockType.SHARED_WRITE) {
+ writeSet.add(info);
+ }
        }
- }
- if (!sawNull) {
- query.append(" and (hl_table is null or hl_table in(");
- first = true;
+ if(!writeSet.isEmpty()) {
+ if(writeSet.get(0).txnId == 0) {
+ //Write operation always start a txn
+ throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid");
+ }
+ stmt = dbConn.createStatement();
+ StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " +
+ "ws_txnid, ws_commit_id " +
+ "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality
+ for(LockInfo info : writeSet) {
+ sb.append("(ws_database = ").append(quoteString(info.db)).append(" and ws_table = ")
+ .append(quoteString(info.table)).append(" and ws_partition ")
+ .append(info.partition == null ? "is null" : "= " + quoteString(info.partition)).append(") or ");
+ }
+ sb.setLength(sb.length() - 4);//nuke trailing " or "
+ sb.append(")");
+ //1 row is sufficient to know we have to kill the query
+ rs = stmt.executeQuery(addLimitClause(1, sb.toString()));
+ if(rs.next()) {
+ /**
+ * if here, it means we found an already committed txn which overlaps with the current one and
+ * it updated the same resource the current txn wants to update. By First-committer-wins
+ * rule, current txn will not be allowed to commit so may as well kill it now; This is just an
+ * optimization to prevent wasting cluster resources to run a query which is known to be DOA.
+ * {@link #commitTxn(CommitTxnRequest)} has the primary responsibility to ensure this.
+ * checkLock() runs at READ_COMMITTED so you could have another (Hive) txn running commitTxn()
+ * in parallel and thus writing to WRITE_SET. commitTxn() logic is properly mutexed to ensure
+ * that we don't "miss" any WW conflicts. We could've mutexed the checkLock() and commitTxn()
+ * as well but this reduces concurrency for very little gain.
+ * Note that update/delete (which runs as dynamic partition insert) acquires a lock on the table,
+ * but WRITE_SET has entries for actual partitions updated. Thus this optimization will "miss"
+ * the WW conflict but it will be caught in commitTxn() where actual partitions written are known.
+ * This is OK since we want 2 concurrent updates that update different sets of partitions to both commit.
+ */
+ String resourceName = rs.getString(1) + '/' + rs.getString(2);
+ String partName = rs.getString(3);
+ if(partName != null) {
+ resourceName += '/' + partName;
+ }
+
+ String msg = "Aborting " + JavaUtils.txnIdToString(writeSet.get(0).txnId) +
+ " since a concurrent committed transaction [" + JavaUtils.txnIdToString(rs.getLong(4)) + "," + rs.getLong(5) +
+ "] has already updated resouce '" + resourceName + "'";
+ LOG.info(msg);
+ if(abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId)) != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ dbConn.commit();
+ throw new TxnAbortedException(msg);
+ }
+ close(rs, stmt, null);
+ }
+
+ boolean first = true;
        for (String s : strings) {
          if (first) first = false;
          else query.append(", ");
@@ -2007,22 +2271,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          query.append(s);
          query.append('\'');
        }
- query.append("))");
+ query.append(")");

- // If any of the partition requests are null, then I need to pull all
- // partition locks for this table.
- sawNull = false;
+ // If any of the table requests are null, then I need to pull all the
+ // table locks for this db.
+ boolean sawNull = false;
        strings.clear();
        for (LockInfo info : locksBeingChecked) {
- if (info.partition == null) {
+ if (info.table == null) {
            sawNull = true;
            break;
          } else {
- strings.add(info.partition);
+ strings.add(info.table);
          }
        }
        if (!sawNull) {
- query.append(" and (hl_partition is null or hl_partition in(");
+ query.append(" and (hl_table is null or hl_table in(");
          first = true;
          for (String s : strings) {
            if (first) first = false;
@@ -2032,14 +2296,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            query.append('\'');
          }
          query.append("))");
+
+ // If any of the partition requests are null, then I need to pull all
+ // partition locks for this table.
+ sawNull = false;
+ strings.clear();
+ for (LockInfo info : locksBeingChecked) {
+ if (info.partition == null) {
+ sawNull = true;
+ break;
+ } else {
+ strings.add(info.partition);
+ }
+ }
+ if (!sawNull) {
+ query.append(" and (hl_partition is null or hl_partition in(");
+ first = true;
+ for (String s : strings) {
+ if (first) first = false;
+ else query.append(", ");
+ query.append('\'');
+ query.append(s);
+ query.append('\'');
+ }
+ query.append("))");
+ }
        }
- }
- query.append(" and hl_lock_ext_id <= ").append(extLockId);
+ query.append(" and hl_lock_ext_id <= ").append(extLockId);

- LOG.debug("Going to execute query <" + query.toString() + ">");
- Statement stmt = null;
- ResultSet rs = null;
- try {
+ LOG.debug("Going to execute query <" + query.toString() + ">");
        stmt = dbConn.createStatement();
        rs = stmt.executeQuery(query.toString());
        SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
@@ -2155,6 +2440,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        response.setState(LockState.ACQUIRED);
      } finally {
        close(rs, stmt, null);
+ if(handle != null) {
+ handle.releaseLocks();
+ }
      }
      return response;
    }
@@ -2196,7 +2484,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
      String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
        //if lock is part of txn, heartbeat info is in txn record
        "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
- ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+ ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " +
        extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
      LOG.debug("Going to execute update <" + s + ">");
      int rc = stmt.executeUpdate(s);
@@ -2276,6 +2564,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        //todo: add LIMIT 1 instead of count - should be more efficient
        s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
        ResultSet rs2 = stmt.executeQuery(s);
+ //todo: strictly speaking you can commit an empty txn, thus 2nd conjunct is wrong but only
+ //possible for for multi-stmt txns
        boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
        LOG.debug("Going to rollback");
        dbConn.rollback();

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3aac11b..bd274ee 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -74,7 +74,7 @@ import java.util.Set;
  @InterfaceStability.Evolving
  public interface TxnStore {

- public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
+ public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, WriteSetCleaner}
    // Compactor states (Should really be enum)
    static final public String INITIATED_RESPONSE = "initiated";
    static final public String WORKING_RESPONSE = "working";
@@ -347,6 +347,12 @@ public interface TxnStore {
    public void purgeCompactionHistory() throws MetaException;

    /**
+ * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the
+ * transaction metadata once it becomes unnecessary.
+ */
+ public void performWriteSetGC();
+
+ /**
     * Determine if there are enough consecutive failures compacting a table or partition that no
     * new automatic compactions should be scheduled. User initiated compactions do not do this
     * check.

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 4c14eef..5391fb0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -69,6 +69,8 @@ public class TxnUtils {
     * @return a valid txn list.
     */
    public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+ //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" +
+ // quoteChar(TXN_OPEN) to compute compute HWM...
      long highWater = txns.getTxn_high_water_mark();
      long minOpenTxn = Long.MAX_VALUE;
      long[] exceptions = new long[txns.getOpen_txnsSize()];

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index bdeacb9..fc00e6d 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -397,7 +397,7 @@ public class TestCompactionTxnHandler {
      lc.setTablename(tableName);
      LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
      lr.setTxnid(txnId);
- LockResponse lock = txnHandler.lock(new LockRequest(Arrays.asList(lc), "me", "localhost"));
+ LockResponse lock = txnHandler.lock(lr);
      assertEquals(LockState.ACQUIRED, lock.getState());

      txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName,
@@ -413,8 +413,8 @@ public class TestCompactionTxnHandler {
        assertEquals(dbName, ci.dbname);
        assertEquals(tableName, ci.tableName);
        switch (i++) {
- case 0: assertEquals("ds=today", ci.partName); break;
- case 1: assertEquals("ds=yesterday", ci.partName); break;
+ case 0: assertEquals("ds=today", ci.partName); break;
+ case 1: assertEquals("ds=yesterday", ci.partName); break;
        default: throw new RuntimeException("What?");
        }
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 0cacef7..ccaf91c 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -444,6 +444,7 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.ACQUIRED);
    }
@@ -475,6 +476,7 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.WAITING);
    }
@@ -541,6 +543,7 @@ public class TestTxnHandler {
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      LockResponse res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.ACQUIRED);

@@ -563,6 +566,7 @@ public class TestTxnHandler {
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      LockResponse res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.ACQUIRED);

@@ -572,6 +576,7 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.WAITING);

@@ -594,6 +599,7 @@ public class TestTxnHandler {
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      LockResponse res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.ACQUIRED);

@@ -603,6 +609,7 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.WAITING);

@@ -612,6 +619,7 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.WAITING);
    }
@@ -643,6 +651,7 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      assertTrue(res.getState() == LockState.WAITING);
    }
@@ -686,6 +695,8 @@ public class TestTxnHandler {
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
+ long txnId = openTxn();
+ req.setTxnid(txnId);
      LockResponse res = txnHandler.lock(req);
      long lockid1 = res.getLockid();
      assertTrue(res.getState() == LockState.ACQUIRED);
@@ -696,11 +707,12 @@ public class TestTxnHandler {
      components.clear();
      components.add(comp);
      req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
      res = txnHandler.lock(req);
      long lockid2 = res.getLockid();
      assertTrue(res.getState() == LockState.WAITING);

- txnHandler.unlock(new UnlockRequest(lockid1));
+ txnHandler.abortTxn(new AbortTxnRequest(txnId));
      res = txnHandler.checkLock(new CheckLockRequest(lockid2));
      assertTrue(res.getState() == LockState.ACQUIRED);
    }
@@ -1031,16 +1043,14 @@ public class TestTxnHandler {
    @Test
    public void showLocks() throws Exception {
      long begining = System.currentTimeMillis();
- long txnid = openTxn();
      LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
      List<LockComponent> components = new ArrayList<LockComponent>(1);
      components.add(comp);
      LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
      LockResponse res = txnHandler.lock(req);

      // Open txn
- txnid = openTxn();
+ long txnid = openTxn();
      comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
      comp.setTablename("mytable");
      components = new ArrayList<LockComponent>(1);
@@ -1051,7 +1061,7 @@ public class TestTxnHandler {

      // Locks not associated with a txn
      components = new ArrayList<LockComponent>(1);
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "yourdb");
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
      comp.setTablename("yourtable");
      comp.setPartitionname("yourpartition");
      components.add(comp);
@@ -1065,14 +1075,13 @@ public class TestTxnHandler {
      for (int i = 0; i < saw.length; i++) saw[i] = false;
      for (ShowLocksResponseElement lock : locks) {
        if (lock.getLockid() == 1) {
- assertEquals(1, lock.getTxnid());
+ assertEquals(0, lock.getTxnid());
          assertEquals("mydb", lock.getDbname());
          assertNull(lock.getTablename());
          assertNull(lock.getPartname());
          assertEquals(LockState.ACQUIRED, lock.getState());
          assertEquals(LockType.EXCLUSIVE, lock.getType());
- assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
- lock.getTxnid() != 0);
+ assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
          assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
              + " and " + System.currentTimeMillis(),
              begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1080,7 +1089,7 @@ public class TestTxnHandler {
          assertEquals("localhost", lock.getHostname());
          saw[0] = true;
        } else if (lock.getLockid() == 2) {
- assertEquals(2, lock.getTxnid());
+ assertEquals(1, lock.getTxnid());
          assertEquals("mydb", lock.getDbname());
          assertEquals("mytable", lock.getTablename());
          assertNull(lock.getPartname());
@@ -1098,7 +1107,7 @@ public class TestTxnHandler {
          assertEquals("yourtable", lock.getTablename());
          assertEquals("yourpartition", lock.getPartname());
          assertEquals(LockState.ACQUIRED, lock.getState());
- assertEquals(LockType.SHARED_WRITE, lock.getType());
+ assertEquals(LockType.SHARED_READ, lock.getType());
          assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
              System.currentTimeMillis() >= lock.getLastheartbeat());
          assertTrue(begining <= lock.getAcquiredat() &&

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 160a31d..170dcd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -387,7 +387,7 @@ public enum ErrorMsg {
        "instantiated, check hive.txn.manager"),
    TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction {0} could be found, " +
        "may have timed out", true),
- TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true),
+ TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}. Reason: {1}", true),
    DBTXNMGR_REQUIRES_CONCURRENCY(10264,
        "To use DbTxnManager you must set hive.support.concurrency=true"),
    TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 6f7f961..f12024f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -158,8 +158,9 @@ public class DbLockManager implements HiveLockManager{
        LOG.error("Metastore could not find " + JavaUtils.txnIdToString(lock.getTxnid()));
        throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(lock.getTxnid()));
      } catch (TxnAbortedException e) {
- LOG.error("Transaction " + JavaUtils.txnIdToString(lock.getTxnid()) + " already aborted.");
- throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()));
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()), e.getMessage());
+ LOG.error(le.getMessage());
+ throw le;
      } catch (TException e) {
        throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
            e);

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 28ee8a8..904406e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -107,6 +107,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {

    @Override
    public long openTxn(String user) throws LockException {
+ //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
+ //whenever it chooses
      init();
      if(isTxnOpen()) {
        throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
@@ -132,8 +134,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {

    @Override
    public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
- acquireLocks(plan, ctx, username, true);
- startHeartbeat();
+ try {
+ acquireLocks(plan, ctx, username, true);
+ startHeartbeat();
+ }
+ catch(LockException e) {
+ if(e.getCause() instanceof TxnAbortedException) {
+ txnId = 0;
+ statementId = -1;
+ }
+ throw e;
+ }
    }

    /**
@@ -157,7 +168,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
      // For each source to read, get a shared lock
      for (ReadEntity input : plan.getInputs()) {
        if (!input.needsLock() || input.isUpdateOrDelete()) {
- // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+ // We don't want to acquire read locks during update or delete as we'll be acquiring write
          // locks instead.
          continue;
        }
@@ -319,8 +330,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
        throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
      } catch (TxnAbortedException e) {
- LOG.error("Transaction " + JavaUtils.txnIdToString(txnId) + " aborted");
- throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+ LOG.error(le.getMessage());
+ throw le;
      } catch (TException e) {
        throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
            e);
@@ -388,8 +400,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
          LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(txnId));
          throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
        } catch (TxnAbortedException e) {
- LOG.error("Transaction aborted " + JavaUtils.txnIdToString(txnId));
- throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+ LOG.error(le.getMessage());
+ throw le;
        } catch (TException e) {
          throw new LockException(
              ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
new file mode 100644
index 0000000..9085a6a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Periodically cleans WriteSet tracking information used in Transaction management
+ */
+public class AcidWriteSetService extends HouseKeeperServiceBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class);
+ @Override
+ protected long getStartDelayMs() {
+ return 0;
+ }
+ @Override
+ protected long getIntervalMs() {
+ return hiveConf.getTimeVar(HiveConf.ConfVars.WRITE_SET_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ @Override
+ protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ return new WriteSetReaper(hiveConf, isAliveCounter);
+ }
+ @Override
+ public String getServiceDescription() {
+ return "Periodically cleans obsolete WriteSet tracking information used in Transaction management";
+ }
+ private static final class WriteSetReaper implements Runnable {
+ private final TxnStore txnHandler;
+ private final AtomicInteger isAliveCounter;
+ private WriteSetReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
+ this.isAliveCounter = isAliveCounter;
+ }
+ @Override
+ public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name());
+ long startTime = System.currentTimeMillis();
+ txnHandler.performWriteSetGC();
+ int count = isAliveCounter.incrementAndGet();
+ LOG.info("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
+ }
+ catch(Throwable t) {
+ LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ }
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
+ }
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
index 947f17c..caab10d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -81,7 +81,7 @@ public abstract class HouseKeeperServiceBase implements HouseKeeperService {
     */
    protected abstract long getStartDelayMs();
    /**
- * Determines how fequently the service is running its task.
+ * Determines how frequently the service is running its task.
     */
    protected abstract long getIntervalMs();


http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 4aa68c9..e8c393c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -147,7 +147,7 @@ public class Initiator extends CompactorThread {
                if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
              } catch (Throwable t) {
                LOG.error("Caught exception while trying to determine if we should compact " +
- ci + ". Marking clean to avoid repeated failures, " +
+ ci + ". Marking failed to avoid repeated failures, " +
                    "" + StringUtils.stringifyException(t));
                txnHandler.markFailed(ci);
              }

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 0db7f8a..bf8e5cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -184,7 +184,7 @@ public class Worker extends CompactorThread {
            txnHandler.markCompacted(ci);
          } catch (Exception e) {
            LOG.error("Caught exception while trying to compact " + ci +
- ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
+ ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
            txnHandler.markFailed(ci);
          }
        } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index a901074..e30dcbb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -668,7 +668,7 @@ public class TestTxnCommands2 {
      t.run();
    }

- private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
+ public static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
      int lastCount = houseKeeperService.getIsAliveCounter();
      houseKeeperService.start(conf);
      int maxIter = 10;

http://git-wip-us.apache.org/repos/asf/hive/blob/7dbc53da/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 1b598f7..af70f0c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -64,6 +64,26 @@ public class TestAcidUtils {
      assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
        AcidUtils.createFilename(p, options).toString());
    }
+ @Test
+ public void testCreateFilenameLargeIds() throws Exception {
+ Path p = new Path("/tmp");
+ Configuration conf = new Configuration();
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .setOldStyle(true).bucket(123456789);
+ assertEquals("/tmp/123456789_0",
+ AcidUtils.createFilename(p, options).toString());
+ options.bucket(23)
+ .minimumTransactionId(1234567880)
+ .maximumTransactionId(1234567890)
+ .writingBase(true)
+ .setOldStyle(false);
+ assertEquals("/tmp/base_1234567890/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ options.writingBase(false);
+ assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ }
+

    @Test
    public void testParsing() throws Exception {

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 4 of 5 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 5, '16 at 8:37p
activeMay 5, '16 at 10:23p
posts5
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 5 posts

People

Translate

site design / logo © 2021 Grokbase