FAQ
Author: xuefu
Date: Fri Mar 6 22:43:26 2015
New Revision: 1664747

URL: http://svn.apache.org/r1664747
Log:
HIVE-9882: Add jar/file doesn't work with yarn-cluster mode [Spark Branch] (Rui via Xuefu)

Modified:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1664747&r1=1664746&r2=1664747&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Fri Mar 6 22:43:26 2015
@@ -224,7 +224,8 @@ public class RemoteHiveSparkClient imple
        // may need to load classes from this jar in other threads.
        List<String> addedJars = jc.getAddedJars();
        if (addedJars != null && !addedJars.isEmpty()) {
- SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]));
+ SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]),
+ localJobConf, jc.getLocalTmpDir());
          localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
        }


Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1664747&r1=1664746&r2=1664747&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Fri Mar 6 22:43:26 2015
@@ -17,6 +17,7 @@

  package org.apache.hive.spark.client;

+import java.io.File;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@ -58,4 +59,9 @@ public interface JobContext {
     */
    List<String> getAddedJars();

+ /**
+ * Returns a local tmp dir specific to the context
+ */
+ File getLocalTmpDir();
+
  }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1664747&r1=1664746&r2=1664747&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Fri Mar 6 22:43:26 2015
@@ -17,6 +17,7 @@

  package org.apache.hive.spark.client;

+import java.io.File;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@ -34,12 +35,14 @@ class JobContextImpl implements JobConte
    private final ThreadLocal<MonitorCallback> monitorCb;
    private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
    private final List<String> addedJars;
+ private final File localTmpDir;

- public JobContextImpl(JavaSparkContext sc) {
+ public JobContextImpl(JavaSparkContext sc, File localTmpDir) {
      this.sc = sc;
      this.monitorCb = new ThreadLocal<MonitorCallback>();
      monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
      addedJars = new CopyOnWriteArrayList<String>();
+ this.localTmpDir = localTmpDir;
    }


@@ -65,6 +68,11 @@ class JobContextImpl implements JobConte
      return addedJars;
    }

+ @Override
+ public File getLocalTmpDir() {
+ return localTmpDir;
+ }
+
    void setMonitorCb(MonitorCallback cb) {
      monitorCb.set(cb);
    }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1664747&r1=1664746&r2=1664747&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Fri Mar 6 22:43:26 2015
@@ -18,9 +18,12 @@
  package org.apache.hive.spark.client;

  import com.google.common.base.Throwables;
+import com.google.common.io.Files;
  import io.netty.channel.ChannelHandlerContext;
  import io.netty.channel.nio.NioEventLoopGroup;

+import java.io.File;
+import java.io.IOException;
  import java.io.Serializable;
  import java.util.Iterator;
  import java.util.List;
@@ -34,6 +37,7 @@ import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicInteger;

+import org.apache.commons.io.FileUtils;
  import org.apache.hadoop.hive.common.classification.InterfaceAudience;
  import org.apache.hive.spark.client.metrics.Metrics;
  import org.apache.hive.spark.client.rpc.Rpc;
@@ -85,6 +89,8 @@ public class RemoteDriver {
    private final NioEventLoopGroup egroup;
    private final Rpc clientRpc;
    private final DriverProtocol protocol;
+ // a local temp dir specific to this driver
+ private final File localTmpDir;

    // Used to queue up requests while the SparkContext is being created.
    private final List<JobWrapper<?>> jobQueue = Lists.newLinkedList();
@@ -98,6 +104,7 @@ public class RemoteDriver {
      this.activeJobs = Maps.newConcurrentMap();
      this.jcLock = new Object();
      this.shutdownLock = new Object();
+ localTmpDir = Files.createTempDir();

      SparkConf conf = new SparkConf();
      String serverAddress = null;
@@ -162,7 +169,7 @@ public class RemoteDriver {
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.sc().addSparkListener(new ClientListener());
        synchronized (jcLock) {
- jc = new JobContextImpl(sc);
+ jc = new JobContextImpl(sc, localTmpDir);
          jcLock.notifyAll();
        }
      } catch (Exception e) {
@@ -188,6 +195,11 @@ public class RemoteDriver {
        }
      }
      executor.shutdownNow();
+ try {
+ FileUtils.deleteDirectory(localTmpDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete local tmp dir: " + localTmpDir, e);
+ }
    }

    private void submit(JobWrapper<?> job) {

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1664747&r1=1664746&r2=1664747&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Fri Mar 6 22:43:26 2015
@@ -250,6 +250,7 @@ class SparkClientImpl implements SparkCl
        if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
          throw new IOException("Cannot change permissions of job properties file.");
        }
+ properties.deleteOnExit();

        Properties allProps = new Properties();
        // first load the defaults from spark-defaults.conf if available

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java?rev=1664747&r1=1664746&r2=1664747&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java Fri Mar 6 22:43:26 2015
@@ -28,6 +28,9 @@ import java.util.List;
  import org.apache.commons.lang.StringUtils;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;

  public class SparkClientUtilities {
    protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class);
@@ -37,20 +40,22 @@ public class SparkClientUtilities {
     *
     * @param newPaths Array of classpath elements
     */
- public static void addToClassPath(String[] newPaths) throws Exception {
+ public static void addToClassPath(String[] newPaths, Configuration conf, File localTmpDir)
+ throws Exception {
      ClassLoader cloader = Thread.currentThread().getContextClassLoader();
      URLClassLoader loader = (URLClassLoader) cloader;
      List<URL> curPath = Lists.newArrayList(loader.getURLs());

      for (String newPath : newPaths) {
- URL newUrl = urlFromPathString(newPath);
+ URL newUrl = urlFromPathString(newPath, conf, localTmpDir);
        if (newUrl != null && !curPath.contains(newUrl)) {
          curPath.add(newUrl);
          LOG.info("Added jar[" + newUrl + "] to classpath.");
        }
      }

- URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
+ URLClassLoader newLoader =
+ new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
      Thread.currentThread().setContextClassLoader(newLoader);
    }

@@ -60,16 +65,24 @@ public class SparkClientUtilities {
     * @param path path string
     * @return
     */
- private static URL urlFromPathString(String path) {
+ private static URL urlFromPathString(String path, Configuration conf, File localTmpDir) {
      URL url = null;
      try {
        if (StringUtils.indexOf(path, "file:/") == 0) {
          url = new URL(path);
+ } else if (StringUtils.indexOf(path, "hdfs:/") == 0) {
+ Path remoteFile = new Path(path);
+ Path localFile =
+ new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName());
+ LOG.info("Copying " + remoteFile + " to " + localFile);
+ FileSystem fs = remoteFile.getFileSystem(conf);
+ fs.copyToLocalFile(remoteFile, localFile);
+ return urlFromPathString(localFile.toString(), conf, localTmpDir);
        } else {
          url = new File(path).toURL();
        }
      } catch (Exception err) {
- LOG.error("Bad URL " + path + ", ignoring path");
+ LOG.error("Bad URL " + path + ", ignoring path", err);
      }
      return url;
    }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 6, '15 at 10:43p
activeMar 6, '15 at 10:43p
posts1
users1
websitehive.apache.org

1 user in discussion

Xuefu: 1 post

People

Translate

site design / logo © 2021 Grokbase