FAQ
Repository: hive
Updated Branches:
   refs/heads/master 6d936b533 -> 0918ff959


HIVE-12252 Streaming API HiveEndPoint can be created w/o partitionVals for partitioned table (Wei Zheng via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0918ff95
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0918ff95
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0918ff95

Branch: refs/heads/master
Commit: 0918ff959e6b0fd67a6b8b478290436af9532a31
Parents: 6d936b5
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu Nov 5 10:07:30 2015 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu Nov 5 10:07:30 2015 -0800

----------------------------------------------------------------------
  .../hcatalog/streaming/ConnectionError.java | 4 ++
  .../hive/hcatalog/streaming/HiveEndPoint.java | 51 +++++++++++++++-----
  .../hive/hcatalog/streaming/TestStreaming.java | 35 +++++++++++---
  3 files changed, 71 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
index 1aeef76..ffa51c9 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
@@ -20,6 +20,10 @@ package org.apache.hive.hcatalog.streaming;

  public class ConnectionError extends StreamingException {

+ public ConnectionError(String msg) {
+ super(msg);
+ }
+
    public ConnectionError(String msg, Exception innerEx) {
      super(msg, innerEx);
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 306c93d..2f2d44a 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -279,23 +279,48 @@ public class HiveEndPoint {
        }
      }

- private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
- // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ /**
+ * Checks the validity of endpoint
+ * @param endPoint the HiveEndPoint to be checked
+ * @param msClient the metastore client
+ * @throws InvalidTable
+ */
+ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
+ throws InvalidTable, ConnectionError {
+ Table t;
        try {
- Table t = msClient.getTable(endPoint.database, endPoint.table);
- Map<String, String> params = t.getParameters();
- if(params != null) {
- String transactionalProp = params.get("transactional");
- if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
- return;
- }
- }
- LOG.error("'transactional' property is not set on Table " + endPoint);
- throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+ t = msClient.getTable(endPoint.database, endPoint.table);
        } catch (Exception e) {
- LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+ LOG.warn("Unable to check the endPoint: " + endPoint, e);
          throw new InvalidTable(endPoint.database, endPoint.table, e);
        }
+
+ // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ Map<String, String> params = t.getParameters();
+ if (params != null) {
+ String transactionalProp = params.get("transactional");
+ if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
+ LOG.error("'transactional' property is not set on Table " + endPoint);
+ throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +
+ " is not set on Table"); }
+ }
+
+ // 2 - check if partitionvals are legitimate
+ if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
+ && endPoint.partitionVals.isEmpty()) {
+ // Invalid if table is partitioned, but endPoint's partitionVals is empty
+ String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " +
+ "partitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
+ if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
+ && !endPoint.partitionVals.isEmpty()) {
+ // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
+ String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
      }

      /**

http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index d9a7eae..58cfbaa 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -204,7 +204,7 @@ public class TestStreaming {

      dropDB(msClient, dbName2);
      String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
- partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2);
+ partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, bucketCols, null, loc2, 2);

      String loc3 = dbFolder.newFolder("testing5.db").toString();
      createStoreSales("testing5", loc3);
@@ -477,15 +477,38 @@ public class TestStreaming {

    @Test
    public void testEndpointConnection() throws Exception {
- // 1) Basic
- HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
- , partitionVals);
+ // For partitioned table, partitionVals are specified
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
      StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
      connection.close();

- // 2) Leave partition unspecified
- endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ // For unpartitioned table, partitionVals are not specified
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
      endPt.newConnection(false, null).close(); // should not throw
+
+ // For partitioned table, partitionVals are not specified
+ try {
+ endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ connection = endPt.newConnection(true);
+ Assert.assertTrue("ConnectionError was not thrown", false);
+ connection.close();
+ } catch (ConnectionError e) {
+ // expecting this exception
+ String errMsg = "doesn't specify any partitions for partitioned table";
+ Assert.assertTrue(e.toString().endsWith(errMsg));
+ }
+
+ // For unpartitioned table, partition values are specified
+ try {
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
+ connection = endPt.newConnection(false);
+ Assert.assertTrue("ConnectionError was not thrown", false);
+ connection.close();
+ } catch (ConnectionError e) {
+ // expecting this exception
+ String errMsg = "specifies partitions for unpartitioned table";
+ Assert.assertTrue(e.toString().endsWith(errMsg));
+ }
    }

    @Test

Search Discussions

Discussion Posts

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 2 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedNov 5, '15 at 6:07p
activeNov 5, '15 at 6:17p
posts2
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 2 posts

People

Translate

site design / logo © 2021 Grokbase