Grokbase Groups Hive commits May 2015
FAQ
Repository: hive
Updated Branches:
   refs/heads/master 4bffffb3e -> 82e797728


HIVE-10835: Concurrency issues in JDBC driver (Chaoyu Tang reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82e79772
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82e79772
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82e79772

Branch: refs/heads/master
Commit: 82e797728c78f5fbeeb66a7d94b21296b37ebb40
Parents: 4bffffb
Author: Vaibhav Gumashta <vgumashta@apache.org>
Authored: Sat May 30 13:38:34 2015 -0700
Committer: Vaibhav Gumashta <vgumashta@apache.org>
Committed: Sat May 30 13:38:34 2015 -0700

----------------------------------------------------------------------
  .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 123 +++++++++++++++++++
  .../org/apache/hive/jdbc/HiveConnection.java | 45 ++++++-
  .../apache/hive/jdbc/HiveQueryResultSet.java | 25 +---
  .../org/apache/hive/jdbc/HiveStatement.java | 24 +---
  4 files changed, 171 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 7210480..306e3fe 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -29,9 +29,21 @@ import java.sql.DriverManager;
  import java.sql.ResultSet;
  import java.sql.SQLException;
  import java.sql.Statement;
+import java.util.ArrayList;
  import java.util.HashMap;
+import java.util.List;
  import java.util.Map;
  import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;

  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
@@ -110,6 +122,117 @@ public class TestJdbcWithMiniHS2 {
      stmt.close();
    }

+ @Test
+ public void testConcurrentStatements() throws Exception {
+ String tableName = "testConcurrentStatements";
+ Statement stmt = hs2Conn.createStatement();
+
+ // create table
+ stmt.execute("DROP TABLE IF EXISTS " + tableName);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
+
+ // load data
+ stmt.execute("load data local inpath '"
+ + dataFilePath.toString() + "' into table " + tableName);
+
+ ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
+ assertTrue(res.next());
+ res.close();
+ stmt.close();
+
+ // Start concurrent testing
+ int POOL_SIZE = 100;
+ int TASK_COUNT = 300;
+
+ SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>();
+ ExecutorService workers = new ThreadPoolExecutor(1, POOL_SIZE, 20, TimeUnit.SECONDS, executorQueue);
+ List<Future<Boolean>> list = new ArrayList<Future<Boolean>>();
+ int i = 0;
+ while(i < TASK_COUNT) {
+ try {
+ Future<Boolean> future = workers.submit(new JDBCTask(hs2Conn, i, tableName));
+ list.add(future);
+ i++;
+ } catch (RejectedExecutionException ree) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ for (Future<Boolean> future : list) {
+ try {
+ Boolean result = future.get(30, TimeUnit.SECONDS);
+ assertTrue(result);
+ } catch (ExecutionException ee) {
+ fail("Concurrent Statement failed: " + ee.getCause());
+ } catch (TimeoutException te) {
+ System.out.println("Task was timeout after 30 second: " + te);
+ } catch (CancellationException ce) {
+ System.out.println("Task was interrupted: " + ce);
+ } catch (InterruptedException ie) {
+ System.out.println("Thread was interrupted: " + ie);
+ }
+ }
+ workers.shutdown();
+ }
+
+ static class JDBCTask implements Callable<Boolean> {
+ private String showsql = "show tables";
+ private String querysql;
+ private int seq = 0;
+ Connection con = null;
+ Statement stmt = null;
+ ResultSet res = null;
+
+ JDBCTask(Connection con, int seq, String tblName) {
+ this.con = con;
+ this.seq = seq;
+ querysql = "SELECT count(value) FROM " + tblName;
+ }
+
+ public Boolean call() throws SQLException {
+ int mod = seq%10;
+ try {
+ if (mod < 2) {
+ String name = con.getMetaData().getDatabaseProductName();
+ } else if (mod < 5) {
+ stmt = con.createStatement();
+ res = stmt.executeQuery(querysql);
+ while (res.next()) {
+ res.getInt(1);
+ }
+ } else if (mod < 7) {
+ res = con.getMetaData().getSchemas();
+ if (res.next()) {
+ res.getString(1);
+ }
+ } else {
+ stmt = con.createStatement();
+ res = stmt.executeQuery(showsql);
+ if (res.next()) {
+ res.getString(1);
+ }
+ }
+ return new Boolean(true);
+ } finally {
+ try {
+ if (res != null) {
+ res.close();
+ res = null;
+ }
+ if (stmt != null) {
+ stmt.close();
+ stmt = null;
+ }
+ } catch (SQLException sqle1) {
+ }
+ }
+ }
+ }

    /** This test is to connect to any database without using the command "Use <<DB>>"
     * 1)connect to default database.

http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index 306a1cd..277f6d4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -20,6 +20,10 @@ package org.apache.hive.jdbc;

  import java.io.FileInputStream;
  import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
  import java.security.KeyStore;
  import java.security.SecureRandom;
  import java.sql.Array;
@@ -176,7 +180,6 @@ public class HiveConnection implements java.sql.Connection {
        // set up the client
        client = new TCLIService.Client(new TBinaryProtocol(transport));
      }
-
      // add supported protocols
      supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
      supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
@@ -189,6 +192,9 @@ public class HiveConnection implements java.sql.Connection {

      // open client session
      openSession();
+
+ // Wrap the client with a thread-safe proxy to serialize the RPC calls
+ client = newSynchronizedClient(client);
    }

    private void openTransport() throws SQLException {
@@ -1357,4 +1363,41 @@ public class HiveConnection implements java.sql.Connection {
    public TProtocolVersion getProtocol() {
      return protocol;
    }
+
+ public static TCLIService.Iface newSynchronizedClient(
+ TCLIService.Iface client) {
+ return (TCLIService.Iface) Proxy.newProxyInstance(
+ HiveConnection.class.getClassLoader(),
+ new Class [] { TCLIService.Iface.class },
+ new SynchronizedHandler(client));
+ }
+
+ private static class SynchronizedHandler implements InvocationHandler {
+ private final TCLIService.Iface client;
+
+ SynchronizedHandler(TCLIService.Iface client) {
+ this.client = client;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object [] args)
+ throws Throwable {
+ try {
+ synchronized (client) {
+ return method.invoke(client, args);
+ }
+ } catch (InvocationTargetException e) {
+ // all IFace APIs throw TException
+ if (e.getTargetException() instanceof TException) {
+ throw (TException)e.getTargetException();
+ } else {
+ // should not happen
+ throw new TException("Error in calling method " + method.getName(),
+ e.getTargetException());
+ }
+ } catch (Exception e) {
+ throw new TException("Error in calling method " + method.getName(), e);
+ }
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
index e93795a..f6860f0 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
@@ -78,8 +78,6 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
    private boolean fetchFirst = false;

    private final TProtocolVersion protocol;
- private ReentrantLock transportLock;
-

    public static class Builder {

@@ -191,7 +189,6 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
      this.stmtHandle = builder.stmtHandle;
      this.sessHandle = builder.sessHandle;
      this.fetchSize = builder.fetchSize;
- this.transportLock = builder.transportLock;
      columnNames = new ArrayList<String>();
      normalizedColumnNames = new ArrayList<String>();
      columnTypes = new ArrayList<String>();
@@ -252,16 +249,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
        TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(stmtHandle);
        // TODO need session handle
        TGetResultSetMetadataResp metadataResp;
- if (transportLock == null) {
- metadataResp = client.GetResultSetMetadata(metadataReq);
- } else {
- transportLock.lock();
- try {
- metadataResp = client.GetResultSetMetadata(metadataReq);
- } finally {
- transportLock.unlock();
- }
- }
+ metadataResp = client.GetResultSetMetadata(metadataReq);
        Utils.verifySuccess(metadataResp.getStatus());

        StringBuilder namesSb = new StringBuilder();
@@ -372,16 +360,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
          TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
              orientation, fetchSize);
          TFetchResultsResp fetchResp;
- if (transportLock == null) {
- fetchResp = client.FetchResults(fetchReq);
- } else {
- transportLock.lock();
- try {
- fetchResp = client.FetchResults(fetchReq);
- } finally {
- transportLock.unlock();
- }
- }
+ fetchResp = client.FetchResults(fetchReq);
          Utils.verifySuccessWithInfo(fetchResp.getStatus());

          TRowSet results = fetchResp.getResults();

http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 6b3d05c..170fc53 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -108,9 +108,6 @@ public class HiveStatement implements java.sql.Statement {
     */
    private boolean isExecuteStatementFailed = false;

- // A fair reentrant lock
- private ReentrantLock transportLock = new ReentrantLock(true);
-
    public HiveStatement(HiveConnection connection, TCLIService.Iface client,
        TSessionHandle sessHandle) {
      this(connection, client, sessHandle, false);
@@ -148,7 +145,6 @@ public class HiveStatement implements java.sql.Statement {
        return;
      }

- transportLock.lock();
      try {
        if (stmtHandle != null) {
          TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle);
@@ -159,8 +155,6 @@ public class HiveStatement implements java.sql.Statement {
        throw e;
      } catch (Exception e) {
        throw new SQLException(e.toString(), "08S01", e);
- } finally {
- transportLock.unlock();
      }
      isCancelled = true;
    }
@@ -188,7 +182,6 @@ public class HiveStatement implements java.sql.Statement {
    }

    void closeClientOperation() throws SQLException {
- transportLock.lock();
      try {
        if (stmtHandle != null) {
          TCloseOperationReq closeReq = new TCloseOperationReq(stmtHandle);
@@ -199,8 +192,6 @@ public class HiveStatement implements java.sql.Statement {
        throw e;
      } catch (Exception e) {
        throw new SQLException(e.toString(), "08S01", e);
- } finally {
- transportLock.unlock();
      }
      isQueryClosed = true;
      isExecuteStatementFailed = false;
@@ -251,7 +242,6 @@ public class HiveStatement implements java.sql.Statement {
      execReq.setRunAsync(true);
      execReq.setConfOverlay(sessConf);

- transportLock.lock();
      try {
        TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
        Utils.verifySuccessWithInfo(execResp.getStatus());
@@ -263,8 +253,6 @@ public class HiveStatement implements java.sql.Statement {
      } catch (Exception ex) {
        isExecuteStatementFailed = true;
        throw new SQLException(ex.toString(), "08S01", ex);
- } finally {
- transportLock.unlock();
      }

      TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
@@ -278,12 +266,7 @@ public class HiveStatement implements java.sql.Statement {
           * For an async SQLOperation, GetOperationStatus will use the long polling approach
           * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
           */
- transportLock.lock();
- try {
- statusResp = client.GetOperationStatus(statusReq);
- } finally {
- transportLock.unlock();
- }
+ statusResp = client.GetOperationStatus(statusReq);
          Utils.verifySuccessWithInfo(statusResp.getStatus());
          if (statusResp.isSetOperationState()) {
            switch (statusResp.getOperationState()) {
@@ -322,7 +305,7 @@ public class HiveStatement implements java.sql.Statement {
      }
      resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
          .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
- .setScrollable(isScrollableResultset).setTransportLock(transportLock)
+ .setScrollable(isScrollableResultset)
          .build();
      return true;
    }
@@ -813,7 +796,6 @@ public class HiveStatement implements java.sql.Statement {

      List<String> logs = new ArrayList<String>();
      TFetchResultsResp tFetchResultsResp = null;
- transportLock.lock();
      try {
        if (stmtHandle != null) {
          TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle,
@@ -837,8 +819,6 @@ public class HiveStatement implements java.sql.Statement {
        throw e;
      } catch (Exception e) {
        throw new SQLException("Error when getting query log: " + e, e);
- } finally {
- transportLock.unlock();
      }

      RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 30, '15 at 8:39p
activeMay 30, '15 at 8:39p
posts1
users1
websitehive.apache.org

1 user in discussion

Vgumashta: 1 post

People

Translate

site design / logo © 2021 Grokbase