FAQ
Repository: hive
Updated Branches:
   refs/heads/master dd2bdfc67 -> 0e54991d8


HIVE-11581: HiveServer2 should store connection params in ZK when using dynamic service discovery for simpler client connection string (Vaibhav Gumashta reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e54991d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e54991d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e54991d

Branch: refs/heads/master
Commit: 0e54991d897c9acc26b015b6df82b44c0c90c6fb
Parents: dd2bdfc
Author: Vaibhav Gumashta <vgumashta@apache.org>
Authored: Mon Aug 24 17:14:27 2015 -0700
Committer: Vaibhav Gumashta <vgumashta@apache.org>
Committed: Mon Aug 24 17:15:22 2015 -0700

----------------------------------------------------------------------
  .../org/apache/hadoop/hive/conf/HiveConf.java | 72 ++++++------
  .../org/apache/hive/jdbc/HiveConnection.java | 4 +-
  jdbc/src/java/org/apache/hive/jdbc/Utils.java | 117 +++++++------------
  .../hive/jdbc/ZooKeeperHiveClientHelper.java | 104 ++++++++++++++---
  .../apache/hive/service/server/HiveServer2.java | 74 +++++++++++-
  5 files changed, 239 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index da171b1..8706a2d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1683,22 +1683,6 @@ public class HiveConf extends Configuration {
          "to construct a list exception handlers to handle exceptions thrown\n" +
          "by record readers"),

- // operation log configuration
- HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,
- "When true, HS2 will save operation logs and make them available for clients"),
- HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location",
- "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator +
- "operation_logs",
- "Top level directory where operation logs are stored if logging functionality is enabled"),
- HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION",
- new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"),
- "HS2 operation logging mode available to clients to be set at session level.\n" +
- "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" +
- " NONE: Ignore any logging\n" +
- " EXECUTION: Log completion of tasks\n" +
- " PERFORMANCE: Execution + Performance logs \n" +
- " VERBOSE: All logs" ),
- HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
      // logging configuration
      HIVE_LOG4J_FILE("hive.log4j.file", "",
          "Hive log4j configuration file.\n" +
@@ -1790,6 +1774,7 @@ public class HiveConf extends Configuration {
          "hive.zookeeper.quorum in their connection string."),
      HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2",
          "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."),
+
      // HiveServer2 global init file location
      HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
          "Either the location of a HS2 global init file or a directory containing a .hiverc file. If the \n" +
@@ -1801,6 +1786,39 @@ public class HiveConf extends Configuration {
      HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" +
          "enable parallel compilation between sessions on HiveServer2. The default is false."),

+ // Tez session settings
+ HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
+ "A list of comma separated values corresponding to YARN queues of the same name.\n" +
+ "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" +
+ "for multiple Tez sessions to run in parallel on the cluster."),
+ HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1,
+ "A positive integer that determines the number of Tez sessions that should be\n" +
+ "launched on each of the queues specified by \"hive.server2.tez.default.queues\".\n" +
+ "Determines the parallelism on each queue."),
+ HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", false,
+ "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" +
+ "turning on Tez for HiveServer2. The user could potentially want to run queries\n" +
+ "over Tez without the pool of sessions."),
+
+ // Operation log configuration
+ HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,
+ "When true, HS2 will save operation logs and make them available for clients"),
+ HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location",
+ "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator +
+ "operation_logs",
+ "Top level directory where operation logs are stored if logging functionality is enabled"),
+ HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION",
+ new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"),
+ "HS2 operation logging mode available to clients to be set at session level.\n" +
+ "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" +
+ " NONE: Ignore any logging\n" +
+ " EXECUTION: Log completion of tasks\n" +
+ " PERFORMANCE: Execution + Performance logs \n" +
+ " VERBOSE: All logs" ),
+
+ // Enable metric collection for HiveServer2
+ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
+
      // http (over thrift) transport settings
      HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
          "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."),
@@ -1816,7 +1834,7 @@ public class HiveConf extends Configuration {
          "Keepalive time for an idle http worker thread. When the number of workers exceeds min workers, " +
          "excessive threads are killed after this time interval."),

- // Cookie based authentication
+ // Cookie based authentication when using HTTP Transport
      HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED("hive.server2.thrift.http.cookie.auth.enabled", true,
          "When true, HiveServer2 in HTTP transport mode, will use cookie based authentication mechanism."),
      HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE("hive.server2.thrift.http.cookie.max.age", "86400s",
@@ -1963,6 +1981,8 @@ public class HiveConf extends Configuration {
          " HIVE : Exposes Hive's native table types like MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW\n" +
          " CLASSIC : More generic types like TABLE and VIEW"),
      HIVE_SERVER2_SESSION_HOOK("hive.server2.session.hook", "", ""),
+
+ // SSL settings
      HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false,
          "Set this to true for using SSL encryption in HiveServer2."),
      HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "",
@@ -1983,9 +2003,6 @@ public class HiveConf extends Configuration {
           "Comma separated list of udfs names. These udfs will not be allowed in queries." +
           " The udf black list takes precedence over udf white list"),

- HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
- "Comma separated list of non-SQL Hive commands users are authorized to execute"),
-
      HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
          new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
          "The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."),
@@ -2002,6 +2019,8 @@ public class HiveConf extends Configuration {
          " This setting takes effect only if session idle timeout (hive.server2.idle.session.timeout) and checking\n" +
          "(hive.server2.session.check.interval) are enabled."),

+ HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
+ "Comma separated list of non-SQL Hive commands users are authorized to execute"),
      HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
          "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
          "Comma separated list of configuration options which are immutable at runtime"),
@@ -2127,19 +2146,6 @@ public class HiveConf extends Configuration {
      HIVECOUNTERGROUP("hive.counters.group.name", "HIVE",
          "The name of counter group for internal Hive variables (CREATED_FILE, FATAL_ERROR, etc.)"),

- HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
- "A list of comma separated values corresponding to YARN queues of the same name.\n" +
- "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" +
- "for multiple Tez sessions to run in parallel on the cluster."),
- HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1,
- "A positive integer that determines the number of Tez sessions that should be\n" +
- "launched on each of the queues specified by \"hive.server2.tez.default.queues\".\n" +
- "Determines the parallelism on each queue."),
- HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", false,
- "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" +
- "turning on Tez for HiveServer2. The user could potentially want to run queries\n" +
- "over Tez without the pool of sessions."),
-
      HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column",
          new StringSet("none", "column"),
          "Whether to use quoted identifier. 'none' or 'column' can be used. \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index a9dac03..ba971fd 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -211,13 +211,13 @@ public class HiveConnection implements java.sql.Connection {
          break;
        } catch (TTransportException e) {
          LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString);
- // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper
+ // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper
          if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null)
              && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap
                  .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) {
            try {
              // Update jdbcUriString, host & port variables in connParams
- // Throw an exception if all HiveServer2 uris have been exhausted,
+ // Throw an exception if all HiveServer2 nodes have been exhausted,
              // or if we're unable to connect to ZooKeeper.
              Utils.updateConnParamsFromZooKeeper(connParams);
            } catch (ZooKeeperHiveClientException ze) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 0e4693b..d8368a4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -19,7 +19,6 @@
  package org.apache.hive.jdbc;

  import java.net.URI;
-import java.net.URISyntaxException;
  import java.sql.SQLException;
  import java.util.ArrayList;
  import java.util.Arrays;
@@ -37,22 +36,22 @@ import org.apache.hive.service.cli.thrift.TStatusCode;
  import org.apache.http.client.CookieStore;
  import org.apache.http.cookie.Cookie;

-public class Utils {
- public static final Log LOG = LogFactory.getLog(Utils.class.getName());
+class Utils {
+ static final Log LOG = LogFactory.getLog(Utils.class.getName());
    /**
      * The required prefix for the connection URL.
      */
- public static final String URL_PREFIX = "jdbc:hive2://";
+ static final String URL_PREFIX = "jdbc:hive2://";

    /**
      * If host is provided, without a port.
      */
- public static final String DEFAULT_PORT = "10000";
+ static final String DEFAULT_PORT = "10000";

    /**
     * Hive's default database name
     */
- public static final String DEFAULT_DATABASE = "default";
+ static final String DEFAULT_DATABASE = "default";

    private static final String URI_JDBC_PREFIX = "jdbc:";

@@ -63,7 +62,7 @@ public class Utils {
    static final String HIVE_SERVER2_RETRY_TRUE = "true";
    static final String HIVE_SERVER2_RETRY_FALSE = "false";

- public static class JdbcConnectionParams {
+ static class JdbcConnectionParams {
      // Note on client side parameter naming convention:
      // Prefer using a shorter camelCase param name instead of using the same name as the
      // corresponding
@@ -129,7 +128,7 @@ public class Utils {
      static final String SSL_TRUST_STORE_TYPE = "JKS";

      private String host = null;
- private int port;
+ private int port = 0;
      private String jdbcUriString;
      private String dbName = DEFAULT_DATABASE;
      private Map<String,String> hiveConfs = new LinkedHashMap<String,String>();
@@ -238,17 +237,17 @@ public class Utils {
    }

    // Verify success or success_with_info status, else throw SQLException
- public static void verifySuccessWithInfo(TStatus status) throws SQLException {
+ static void verifySuccessWithInfo(TStatus status) throws SQLException {
      verifySuccess(status, true);
    }

    // Verify success status, else throw SQLException
- public static void verifySuccess(TStatus status) throws SQLException {
+ static void verifySuccess(TStatus status) throws SQLException {
      verifySuccess(status, false);
    }

    // Verify success and optionally with_info status, else throw SQLException
- public static void verifySuccess(TStatus status, boolean withInfo) throws SQLException {
+ static void verifySuccess(TStatus status, boolean withInfo) throws SQLException {
      if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS ||
          (withInfo && status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS)) {
        return;
@@ -279,7 +278,7 @@ public class Utils {
     * @return
     * @throws SQLException
     */
- public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
+ static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
        SQLException, ZooKeeperHiveClientException {
      JdbcConnectionParams connParams = new JdbcConnectionParams();

@@ -383,7 +382,6 @@ public class Utils {
      newUsage = usageUrlBase + JdbcConnectionParams.HTTP_PATH + "=<http_path_value>";
      handleParamDeprecation(connParams.getHiveConfs(), connParams.getSessionVars(),
          JdbcConnectionParams.HTTP_PATH_DEPRECATED, JdbcConnectionParams.HTTP_PATH, newUsage);
-
      // Extract host, port
      if (connParams.isEmbeddedMode()) {
        // In case of embedded mode we were supplied with an empty authority.
@@ -391,23 +389,15 @@ public class Utils {
        connParams.setHost(jdbcURI.getHost());
        connParams.setPort(jdbcURI.getPort());
      } else {
- // Else substitute the dummy authority with a resolved one.
- // In case of dynamic service discovery using ZooKeeper, it picks a server uri from ZooKeeper
- String resolvedAuthorityString = resolveAuthority(connParams);
- LOG.info("Resolved authority: " + resolvedAuthorityString);
- uri = uri.replace(dummyAuthorityString, resolvedAuthorityString);
+ // Configure host, port and params from ZooKeeper if used,
+ // and substitute the dummy authority with a resolved one
+ configureConnParams(connParams);
+ // We check for invalid host, port while configuring connParams with configureConnParams()
+ String authorityStr = connParams.getHost() + ":" + connParams.getPort();
+ LOG.info("Resolved authority: " + authorityStr);
+ uri = uri.replace(dummyAuthorityString, authorityStr);
        connParams.setJdbcUriString(uri);
- // Create a Java URI from the resolved URI for extracting the host/port
- URI resolvedAuthorityURI = null;
- try {
- resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null);
- } catch (URISyntaxException e) {
- throw new JdbcUriParseException("Bad URL format: ", e);
- }
- connParams.setHost(resolvedAuthorityURI.getHost());
- connParams.setPort(resolvedAuthorityURI.getPort());
      }
-
      return connParams;
    }

@@ -471,22 +461,17 @@ public class Utils {
      return authorities;
    }

- /**
- * Get a string representing a specific host:port
- * @param connParams
- * @return
- * @throws JdbcUriParseException
- * @throws ZooKeeperHiveClientException
- */
- private static String resolveAuthority(JdbcConnectionParams connParams)
+ private static void configureConnParams(JdbcConnectionParams connParams)
        throws JdbcUriParseException, ZooKeeperHiveClientException {
      String serviceDiscoveryMode =
          connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
      if ((serviceDiscoveryMode != null)
          && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
              .equalsIgnoreCase(serviceDiscoveryMode))) {
- // Resolve using ZooKeeper
- return resolveAuthorityUsingZooKeeper(connParams);
+ // Set ZooKeeper ensemble in connParams for later use
+ connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
+ // Configure using ZooKeeper
+ ZooKeeperHiveClientHelper.configureConnParams(connParams);
      } else {
        String authority = connParams.getAuthorityList()[0];
        URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority);
@@ -494,32 +479,28 @@ public class Utils {
        // to separate the 'path' portion of URI can result in this.
        // The missing "/" common typo while using secure mode, eg of such url -
        // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
- if ((jdbcURI.getAuthority() != null) && (jdbcURI.getHost() == null)) {
- throw new JdbcUriParseException("Bad URL format. Hostname not found "
- + " in authority part of the url: " + jdbcURI.getAuthority()
- + ". Are you missing a '/' after the hostname ?");
+ if (jdbcURI.getAuthority() != null) {
+ String host = jdbcURI.getHost();
+ int port = jdbcURI.getPort();
+ if (host == null) {
+ throw new JdbcUriParseException("Bad URL format. Hostname not found "
+ + " in authority part of the url: " + jdbcURI.getAuthority()
+ + ". Are you missing a '/' after the hostname ?");
+ }
+ // Set the port to default value; we do support jdbc url like:
+ // jdbc:hive2://localhost/db
+ if (port <= 0) {
+ port = Integer.parseInt(Utils.DEFAULT_PORT);
+ }
+ connParams.setHost(jdbcURI.getHost());
+ connParams.setPort(jdbcURI.getPort());
        }
- // Return the 1st element of the array
- return jdbcURI.getAuthority();
      }
    }

    /**
- * Read a specific host:port from ZooKeeper
- * @param connParams
- * @return
- * @throws ZooKeeperHiveClientException
- */
- private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams)
- throws ZooKeeperHiveClientException {
- // Set ZooKeeper ensemble in connParams for later use
- connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
- return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
- }
-
- /**
     * Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already
- * explored. Also update the host, port, jdbcUriString fields of connParams.
+ * explored. Also update the host, port, jdbcUriString and other configs published by the server.
     *
     * @param connParams
     * @throws ZooKeeperHiveClientException
@@ -528,25 +509,13 @@ public class Utils {
        throws ZooKeeperHiveClientException {
      // Add current host to the rejected list
      connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath());
- // Get another HiveServer2 uri from ZooKeeper
- String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
- // Parse serverUri to a java URI and extract host, port
- URI serverUri = null;
- try {
- // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor
- // to construct a valid URI
- serverUri = new URI(null, serverUriString, null, null, null);
- } catch (URISyntaxException e) {
- throw new ZooKeeperHiveClientException(e);
- }
      String oldServerHost = connParams.getHost();
      int oldServerPort = connParams.getPort();
- String newServerHost = serverUri.getHost();
- int newServerPort = serverUri.getPort();
- connParams.setHost(newServerHost);
- connParams.setPort(newServerPort);
+ // Update connection params (including host, port) from ZooKeeper
+ ZooKeeperHiveClientHelper.configureConnParams(connParams);
      connParams.setJdbcUriString(connParams.getJdbcUriString().replace(
- oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort));
+ oldServerHost + ":" + oldServerPort, connParams.getHost() + ":" + connParams.getPort()));
+ LOG.info("Selected HiveServer2 instance with uri: " + connParams.getJdbcUriString());
    }

    private static String joinStringArray(String[] stringArray, String seperator) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
index e24b3dc..eeb3cf9 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -19,9 +19,10 @@
  package org.apache.hive.jdbc;

  import java.nio.charset.Charset;
-import java.sql.SQLException;
  import java.util.List;
  import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;

  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
@@ -31,26 +32,19 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
  import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
  import org.apache.zookeeper.Watcher;

-public class ZooKeeperHiveClientHelper {
- public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
-
+class ZooKeeperHiveClientHelper {
+ static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
+ // Pattern for key1=value1;key2=value2
+ private static final Pattern kvPattern = Pattern.compile("([^=;]*)=([^;]*)[;]?");
    /**
     * A no-op watcher class
     */
- public static class DummyWatcher implements Watcher {
+ static class DummyWatcher implements Watcher {
      public void process(org.apache.zookeeper.WatchedEvent event) {
      }
    }

- /**
- * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly.
- *
- * @param uri
- * @param connParams
- * @return
- * @throws SQLException
- */
- static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
+ static void configureConnParams(JdbcConnectionParams connParams)
        throws ZooKeeperHiveClientException {
      String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
      String zooKeeperNamespace =
@@ -73,17 +67,17 @@ public class ZooKeeperHiveClientHelper {
          throw new ZooKeeperHiveClientException(
              "Tried all existing HiveServer2 uris from ZooKeeper.");
        }
- // Now pick a host randomly
+ // Now pick a server node randomly
        serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
        connParams.setCurrentHostZnodePath(serverNode);
- String serverUri =
+ // Read config string from the znode for this server node
+ String serverConfStr =
            new String(
                zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
                Charset.forName("UTF-8"));
- LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
- return serverUri;
+ applyConfs(serverConfStr, connParams);
      } catch (Exception e) {
- throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
+ throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
      } finally {
        // Close the client connection with ZooKeeper
        if (zooKeeperClient != null) {
@@ -91,4 +85,76 @@ public class ZooKeeperHiveClientHelper {
        }
      }
    }
+
+ /**
+ * Apply configs published by the server. Configs specified from client's JDBC URI override
+ * configs published by the server.
+ *
+ * @param serverConfStr
+ * @param connParams
+ * @throws Exception
+ */
+ private static void applyConfs(String serverConfStr, JdbcConnectionParams connParams)
+ throws Exception {
+ Matcher matcher = kvPattern.matcher(serverConfStr);
+ while (matcher.find()) {
+ // Have to use this if-else since switch-case on String is supported Java 7 onwards
+ if ((matcher.group(1) != null)) {
+ if ((matcher.group(2) == null)) {
+ throw new Exception("Null config value for: " + matcher.group(1)
+ + " published by the server.");
+ }
+ // Set host
+ if ((matcher.group(1).equals("hive.server2.thrift.bind.host"))
+ && (connParams.getHost() == null)) {
+ connParams.setHost(matcher.group(2));
+ }
+ // Set transportMode
+ if ((matcher.group(1).equals("hive.server2.transport.mode"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.TRANSPORT_MODE))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.TRANSPORT_MODE, matcher.group(2));
+ }
+ // Set port
+ if ((matcher.group(1).equals("hive.server2.thrift.port")) && !(connParams.getPort() > 0)) {
+ connParams.setPort(Integer.parseInt(matcher.group(2)));
+ }
+ if ((matcher.group(1).equals("hive.server2.thrift.http.port"))
+ && !(connParams.getPort() > 0)) {
+ connParams.setPort(Integer.parseInt(matcher.group(2)));
+ }
+ // Set sasl qop
+ if ((matcher.group(1).equals("hive.server2.thrift.sasl.qop"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_QOP))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.AUTH_QOP, matcher.group(2));
+ }
+ // Set http path
+ if ((matcher.group(1).equals("hive.server2.thrift.http.path"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.HTTP_PATH))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.HTTP_PATH, "/" + matcher.group(2));
+ }
+ // Set SSL
+ if ((matcher.group(1) != null) && (matcher.group(1).equals("hive.server2.use.SSL"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.USE_SSL))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.USE_SSL, matcher.group(2));
+ }
+ // Set authentication configs
+ // Note that in JDBC driver, we have 3 auth modes: NOSASL, Kerberos and password based
+ // The use of "JdbcConnectionParams.AUTH_TYPE=JdbcConnectionParams.AUTH_SIMPLE" picks NOSASL
+ // The presence of "JdbcConnectionParams.AUTH_PRINCIPAL=<principal>" picks Kerberos
+ // Otherwise password based (which includes NONE, PAM, LDAP, CUSTOM)
+ if ((matcher.group(1).equals("hive.server2.authentication"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_TYPE))) {
+ if (matcher.group(2).equalsIgnoreCase("NOSASL")) {
+ connParams.getSessionVars().put(JdbcConnectionParams.AUTH_TYPE,
+ JdbcConnectionParams.AUTH_SIMPLE);
+ }
+ }
+ // Set server's kerberos principal
+ if ((matcher.group(1).equals("hive.server2.authentication.kerberos.principal"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_PRINCIPAL))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.AUTH_PRINCIPAL, matcher.group(2));
+ }
+ }
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 4a4be97..d7ba964 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -21,7 +21,9 @@ package org.apache.hive.service.server;
  import java.io.IOException;
  import java.nio.charset.Charset;
  import java.util.ArrayList;
+import java.util.HashMap;
  import java.util.List;
+import java.util.Map;
  import java.util.Properties;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.TimeUnit;
@@ -69,6 +71,8 @@ import org.apache.zookeeper.ZooDefs.Ids;
  import org.apache.zookeeper.ZooDefs.Perms;
  import org.apache.zookeeper.data.ACL;

+import com.google.common.base.Joiner;
+
  /**
   * HiveServer2.
   *
@@ -100,7 +104,12 @@ public class HiveServer2 extends CompositeService {
      }
      addService(thriftCLIService);
      super.init(hiveConf);
-
+ // Set host name in hiveConf
+ try {
+ hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getServerHost());
+ } catch (Throwable t) {
+ throw new Error("Unable to intitialize HiveServer2", t);
+ }
      // Add a shutdown hook for catching SIGTERM & SIGINT
      final HiveServer2 hiveServer2 = this;
      Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -122,6 +131,14 @@ public class HiveServer2 extends CompositeService {
      return false;
    }

+ public static boolean isKerberosAuthMode(HiveConf hiveConf) {
+ String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION);
+ if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
+ return true;
+ }
+ return false;
+ }
+
    /**
     * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
     */
@@ -158,9 +175,12 @@ public class HiveServer2 extends CompositeService {
    private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
      String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
      String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
- String instanceURI = getServerInstanceURI(hiveConf);
- byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+ String instanceURI = getServerInstanceURI();
      setUpZooKeeperAuth(hiveConf);
+ // HiveServer2 configs that this instance will publish to ZooKeeper,
+ // so that the clients can read these and configure themselves properly.
+ Map<String, String> confsToPublish = new HashMap<String, String>();
+ addConfsToPublish(hiveConf, confsToPublish);
      int sessionTimeout =
          (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
              TimeUnit.MILLISECONDS);
@@ -193,6 +213,10 @@ public class HiveServer2 extends CompositeService {
            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
                + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
                + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
+ String znodeData = "";
+ // Publish configs for this instance as the data on the node
+ znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
+ byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
        znode =
            new PersistentEphemeralNode(zooKeeperClient,
                PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
@@ -220,6 +244,41 @@ public class HiveServer2 extends CompositeService {
    }

    /**
+ * Add conf keys, values that HiveServer2 will publish to ZooKeeper.
+ * @param hiveConf
+ */
+ private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish) {
+ // Hostname
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST));
+ // Transport mode
+ confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE));
+ // Transport specific confs
+ if (isHTTPTransportMode(hiveConf)) {
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT));
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+ } else {
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_PORT));
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP));
+ }
+ // Auth specific confs
+ confsToPublish.put(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION));
+ if (isKerberosAuthMode(hiveConf)) {
+ confsToPublish.put(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
+ }
+ // SSL conf
+ confsToPublish.put(ConfVars.HIVE_SERVER2_USE_SSL.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_USE_SSL));
+ }
+
+ /**
     * For a kerberized cluster, we dynamically set up the client's JAAS conf.
     *
     * @param hiveConf
@@ -289,7 +348,7 @@ public class HiveServer2 extends CompositeService {
      this.registeredWithZooKeeper = registeredWithZooKeeper;
    }

- private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+ private String getServerInstanceURI() throws Exception {
      if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
        throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
      }
@@ -297,6 +356,13 @@ public class HiveServer2 extends CompositeService {
          + thriftCLIService.getPortNumber();
    }

+ private String getServerHost() throws Exception {
+ if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
+ throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+ }
+ return thriftCLIService.getServerIPAddress().getHostName();
+ }
+
    @Override
    public synchronized void start() {
      super.start();

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 2 of 2 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedAug 25, '15 at 12:14a
activeAug 25, '15 at 12:15a
posts2
users1
websitehive.apache.org

1 user in discussion

Vgumashta: 2 posts

People

Translate

site design / logo © 2021 Grokbase