FAQ
Author: xuefu
Date: Thu Dec 18 18:58:11 2014
New Revision: 1646511

URL: http://svn.apache.org/r1646511
Log:
HIVE-9094: TimeoutException when trying get executor count from RSC [Spark Branch] (Chengxiang via Xuefu)

Modified:
     hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Dec 18 18:58:11 2014
@@ -1964,7 +1964,12 @@ public class HiveConf extends Configurat
      TEZ_EXEC_INPLACE_PROGRESS(
          "hive.tez.exec.inplace.progress",
          true,
- "Updates tez job execution progress in-place in the terminal.")
+ "Updates tez job execution progress in-place in the terminal."),
+ SPARK_CLIENT_FUTURE_TIMEOUT(
+ "hive.spark.client.future.timeout",
+ "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "remote spark client JobHandle future timeout value in seconds.")
      ;

      public final String varname;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Thu Dec 18 18:58:11 2014
@@ -28,7 +28,7 @@ import java.util.Properties;
  import org.apache.commons.compress.utils.CharsetNames;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.spark.SparkConf;
  import org.apache.spark.SparkException;

@@ -39,22 +39,22 @@ public class HiveSparkClientFactory {
    private static final String SPARK_DEFAULT_MASTER = "local";
    private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";

- public static HiveSparkClient createHiveSparkClient(Configuration configuration)
+ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
      throws IOException, SparkException {

- Map<String, String> conf = initiateSparkConf(configuration);
+ Map<String, String> sparkConf = initiateSparkConf(hiveconf);
      // Submit spark job through local spark context while spark master is local mode, otherwise submit
      // spark job through remote spark context.
- String master = conf.get("spark.master");
+ String master = sparkConf.get("spark.master");
      if (master.equals("local") || master.startsWith("local[")) {
        // With local spark context, all user sessions share the same spark context.
- return LocalHiveSparkClient.getInstance(generateSparkConf(conf));
+ return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf));
      } else {
- return new RemoteHiveSparkClient(conf);
+ return new RemoteHiveSparkClient(hiveconf, sparkConf);
      }
    }

- public static Map<String, String> initiateSparkConf(Configuration hiveConf) {
+ public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
      Map<String, String> sparkConf = new HashMap<String, String>();

      // set default spark configurations.

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Thu Dec 18 18:58:11 2014
@@ -71,12 +71,13 @@ public class RemoteHiveSparkClient imple

    private transient SparkClient remoteClient;
    private transient SparkConf sparkConf;
+ private transient HiveConf hiveConf;

    private transient List<String> localJars = new ArrayList<String>();
-
    private transient List<String> localFiles = new ArrayList<String>();

- RemoteHiveSparkClient(Map<String, String> conf) throws IOException, SparkException {
+ RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
+ this.hiveConf = hiveConf;
      sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
      remoteClient = SparkClientFactory.createClient(conf);
    }
@@ -88,8 +89,9 @@ public class RemoteHiveSparkClient imple

    @Override
    public int getExecutorCount() throws Exception {
+ long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
      Future<Integer> handler = remoteClient.getExecutorCount();
- return handler.get(5, TimeUnit.SECONDS).intValue();
+ return handler.get(timeout, TimeUnit.SECONDS).intValue();
    }

    @Override
@@ -108,9 +110,11 @@ public class RemoteHiveSparkClient imple
      byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
      byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);

+ long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+
      JobHandle<Serializable> jobHandle = remoteClient.submit(
          new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes));
- return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle));
+ return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
    }

    private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1646511&r1=1646510&r2=1646511&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Dec 18 18:58:11 2014
@@ -56,10 +56,12 @@ public class RemoteSparkJobStatus implem
    private final long startTime;
    private final SparkClient sparkClient;
    private final JobHandle<Serializable> jobHandle;
+ private final transient long sparkClientTimeoutInSeconds;

- public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
+ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle, long timeoutInSeconds) {
      this.sparkClient = sparkClient;
      this.jobHandle = jobHandle;
+ this.sparkClientTimeoutInSeconds = timeoutInSeconds;
      startTime = System.nanoTime();
    }

@@ -146,7 +148,7 @@ public class RemoteSparkJobStatus implem
      JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
          new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
      try {
- return getJobInfo.get();
+ return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
      } catch (Throwable t) {
        LOG.warn("Error getting job info", t);
        return null;
@@ -156,7 +158,7 @@ public class RemoteSparkJobStatus implem
    private SparkStageInfo getSparkStageInfo(int stageId) {
      JobHandle<SparkStageInfo> getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
      try {
- return getStageInfo.get();
+ return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
      } catch (Throwable t) {
        LOG.warn("Error getting stage info", t);
        return null;

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedDec 18, '14 at 6:58p
activeDec 18, '14 at 6:58p
posts1
users1
websitehive.apache.org

1 user in discussion

Xuefu: 1 post

People

Translate

site design / logo © 2021 Grokbase