FAQ
HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec8c793c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec8c793c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec8c793c

Branch: refs/heads/master
Commit: ec8c793c3bfc6edafada2329939553e5cd6cb0f3
Parents: ba83fd7
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Sat Oct 10 10:11:48 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Sat Oct 10 10:11:48 2015 -0700

----------------------------------------------------------------------
  .../hive/hcatalog/streaming/HiveEndPoint.java | 21 ++++++++++
  .../hive/hcatalog/streaming/InvalidTable.java | 8 ++++
  .../hive/hcatalog/streaming/TestStreaming.java | 41 +++++++++++++++++++-
  .../hive/ql/txn/compactor/TestCompactor.java | 13 ++++---
  4 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 7e99008..5de3f1d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
+import java.util.Map;

  /**
   * Information about the hive end point (i.e. table or partition) to write to.
@@ -272,11 +273,31 @@ public class HiveEndPoint {
        }
        this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
        this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+ checkEndPoint(endPoint, msClient);
        if (createPart && !endPoint.partitionVals.isEmpty()) {
          createPartitionIfNotExists(endPoint, msClient, conf);
        }
      }

+ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
+ // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ try {
+ Table t = msClient.getTable(endPoint.database, endPoint.table);
+ Map<String, String> params = t.getParameters();
+ if(params != null) {
+ String transactionalProp = params.get("transactional");
+ if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
+ return;
+ }
+ }
+ LOG.error("'transactional' property is not set on Table " + endPoint);
+ throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+ } catch (Exception e) {
+ LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+ throw new InvalidTable(endPoint.database, endPoint.table, e);
+ }
+ }
+
      /**
       * Close connection
       */

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
index 903c37e..98ef688 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
@@ -27,4 +27,12 @@ public class InvalidTable extends StreamingException {
    public InvalidTable(String db, String table) {
      super(makeMsg(db,table), null);
    }
+
+ public InvalidTable(String db, String table, String msg) {
+ super(msg);
+ }
+
+ public InvalidTable(String db, String table, Exception inner) {
+ super(inner.getMessage(), inner);
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 340ab6c..d9a7eae 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -388,6 +388,44 @@ public class TestStreaming {
    }


+ @Test
+ public void testTableValidation() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tbl1 = "validation1";
+ String tbl2 = "validation2";
+
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+
+ runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc) ;
+
+ runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+ endPt.newConnection(false);
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+ endPt.newConnection(false);
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ }
+
+
    private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                  String... records) throws Exception {
      ValidTxnList txns = msClient.getValidTxns();
@@ -1191,7 +1229,8 @@ public class TestStreaming {
              " clustered by ( " + join(bucketCols, ",") + " )" +
              " into " + bucketCount + " buckets " +
              " stored as orc " +
- " location '" + tableLoc + "'";
+ " location '" + tableLoc + "'" +
+ " TBLPROPERTIES ('transactional'='true') ";
      runDDL(driver, crtTbl);
      if(partNames!=null && partNames.length!=0) {
        return addPartition(driver, tableName, partVals, partNames);

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index abca1ce..e2910dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -154,11 +154,12 @@ public class TestCompactor {
      executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
        " PARTITIONED BY(bkt INT)" +
        " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
      executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
        " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
        " STORED AS TEXTFILE" +
- " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'", driver);
+ " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'" +
+ " TBLPROPERTIES ('transactional'='true')", driver);

      executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
        "' overwrite into table " + tblNameStg, driver);
@@ -411,7 +412,7 @@ public class TestCompactor {
      executeStatementOnDriver("drop table if exists " + tblName, driver);
      executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
          " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);

      HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
      DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -468,7 +469,7 @@ public class TestCompactor {
      executeStatementOnDriver("drop table if exists " + tblName, driver);
      executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
          " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true') ", driver);

      HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
      DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -516,7 +517,7 @@ public class TestCompactor {
      executeStatementOnDriver("drop table if exists " + tblName, driver);
      executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
          " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);

      HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
      DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -576,7 +577,7 @@ public class TestCompactor {
      executeStatementOnDriver("drop table if exists " + tblName, driver);
      executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
          " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);

      HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
      DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 2 of 6 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 10, '15 at 5:42p
activeOct 10, '15 at 6:31p
posts6
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 6 posts

People

Translate

site design / logo © 2021 Grokbase