FAQ
Author: brock
Date: Sat Feb 14 18:42:20 2015
New Revision: 1659842

URL: http://svn.apache.org/r1659842
Log:
HIVE-9425 - Add jar/file doesn't work with yarn-cluster mode [Spark Branch] (Rui via Brock)

Modified:
     hive/branches/branch-1.1/ (props changed)
     hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
     hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
     hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
     hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
     hive/branches/branch-1.1/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java

Propchange: hive/branches/branch-1.1/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 14 18:42:20 2015
@@ -1,6 +1,6 @@
  /hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
  /hive/branches/cbo:1605012-1627125
-/hive/branches/spark:1608589-1654414,1654553,1654869,1654873,1655427,1655468,1655926-1655927,1656573,1658877
+/hive/branches/spark:1608589-1654414,1654553,1654869,1654873,1655427,1655468,1655926-1655927,1656573,1658877,1659840
  /hive/branches/tez:1494760-1622766
  /hive/branches/vectorization:1466908-1527856
  /hive/trunk:1655202,1655210,1655213,1655436,1655460,1655894-1655895,1656061,1656114,1656234,1656587,1656780,1657742,1657990,1658221,1658471,1658766,1659027,1659106,1659432,1659434

Modified: hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1659842&r1=1659841&r2=1659842&view=diff
==============================================================================
--- hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Sat Feb 14 18:42:20 2015
@@ -22,8 +22,8 @@ import com.google.common.base.Strings;

  import java.io.IOException;
  import java.io.Serializable;
-import java.net.MalformedURLException;
-import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
  import java.util.ArrayList;
  import java.util.List;
  import java.util.Map;
@@ -77,8 +77,8 @@ public class RemoteHiveSparkClient imple
    private transient SparkConf sparkConf;
    private transient HiveConf hiveConf;

- private transient List<URL> localJars = new ArrayList<URL>();
- private transient List<URL> localFiles = new ArrayList<URL>();
+ private transient List<URI> localJars = new ArrayList<URI>();
+ private transient List<URI> localFiles = new ArrayList<URI>();

    private final transient long sparkClientTimtout;

@@ -128,7 +128,7 @@ public class RemoteHiveSparkClient imple
      return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);
    }

- private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException {
      // add hive-exec jar
      addJars((new JobConf(this.getClass())).getJar());

@@ -160,30 +160,32 @@ public class RemoteHiveSparkClient imple
      addResources(addedArchives);
    }

- private void addResources(String addedFiles) {
+ private void addResources(String addedFiles) throws IOException {
      for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
        try {
- URL fileUrl = SparkUtilities.getURL(addedFile);
- if (fileUrl != null && !localFiles.contains(fileUrl)) {
- localFiles.add(fileUrl);
- remoteClient.addFile(fileUrl);
+ URI fileUri = SparkUtilities.getURI(addedFile);
+ if (fileUri != null && !localFiles.contains(fileUri)) {
+ fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf);
+ localFiles.add(fileUri);
+ remoteClient.addFile(fileUri);
          }
- } catch (MalformedURLException e) {
- LOG.warn("Failed to add file:" + addedFile);
+ } catch (URISyntaxException e) {
+ LOG.warn("Failed to add file:" + addedFile, e);
        }
      }
    }

- private void addJars(String addedJars) {
+ private void addJars(String addedJars) throws IOException {
      for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
        try {
- URL jarUrl = SparkUtilities.getURL(addedJar);
- if (jarUrl != null && !localJars.contains(jarUrl)) {
- localJars.add(jarUrl);
- remoteClient.addJar(jarUrl);
+ URI jarUri = SparkUtilities.getURI(addedJar);
+ if (jarUri != null && !localJars.contains(jarUri)) {
+ jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf);
+ localJars.add(jarUri);
+ remoteClient.addJar(jarUri);
          }
- } catch (MalformedURLException e) {
- LOG.warn("Failed to add jar:" + addedJar);
+ } catch (URISyntaxException e) {
+ LOG.warn("Failed to add jar:" + addedJar, e);
        }
      }
    }

Modified: hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1659842&r1=1659841&r2=1659842&view=diff
==============================================================================
--- hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Sat Feb 14 18:42:20 2015
@@ -18,11 +18,15 @@
  package org.apache.hadoop.hive.ql.exec.spark;

  import java.io.File;
+import java.io.IOException;
  import java.net.MalformedURLException;
  import java.net.URI;
  import java.net.URISyntaxException;
  import java.net.URL;

+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
  import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
@@ -50,25 +54,50 @@ public class SparkUtilities {
      return copy;
    }

- public static URL getURL(String path) throws MalformedURLException {
+ public static URI getURI(String path) throws URISyntaxException {
      if (path == null) {
        return null;
      }

- URL url = null;
- try {
        URI uri = new URI(path);
- if (uri.getScheme() != null) {
- url = uri.toURL();
- } else {
+ if (uri.getScheme() == null) {
          // if no file schema in path, we assume it's file on local fs.
- url = new File(path).toURI().toURL();
+ uri = new File(path).toURI();
        }
- } catch (URISyntaxException e) {
- // do nothing here, just return null if input path is not a valid URI.
+
+ return uri;
+ }
+
+ /**
+ * Copies local file to HDFS in yarn-cluster mode.
+ *
+ * @param source
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
+ URI result = source;
+ if (conf.get("spark.master").equals("yarn-cluster")) {
+ if (!source.getScheme().equals("hdfs")) {
+ Path tmpDir = SessionState.getHDFSSessionPath(conf);
+ FileSystem fileSystem = FileSystem.get(conf);
+ fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir);
+ String filePath = tmpDir + File.separator + getFileName(source);
+ Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath();
+ result = fullPath.toUri();
+ }
+ }
+ return result;
+ }
+
+ private static String getFileName(URI uri) {
+ if (uri == null) {
+ return null;
      }

- return url;
+ String[] splits = uri.getPath().split(File.separator);
+ return splits[splits.length-1];
    }

    public static SparkSession getSparkSession(HiveConf conf,

Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1659842&r1=1659841&r2=1659842&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Sat Feb 14 18:42:20 2015
@@ -18,7 +18,7 @@
  package org.apache.hive.spark.client;

  import java.io.Serializable;
-import java.net.URL;
+import java.net.URI;
  import java.util.concurrent.Future;

  import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -68,10 +68,10 @@ public interface SparkClient extends Ser
     * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
     * on that node (and not on the client machine).
     *
- * @param url The location of the jar file.
+ * @param uri The location of the jar file.
     * @return A future that can be used to monitor the operation.
     */
- Future<?> addJar(URL url);
+ Future<?> addJar(URI uri);

    /**
     * Adds a file to the running remote context.
@@ -80,10 +80,10 @@ public interface SparkClient extends Ser
     * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
     * on that node (and not on the client machine).
     *
- * @param url The location of the file.
+ * @param uri The location of the file.
     * @return A future that can be used to monitor the operation.
     */
- Future<?> addFile(URL url);
+ Future<?> addFile(URI uri);

    /**
     * Get the count of executors.

Modified: hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1659842&r1=1659841&r2=1659842&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/branch-1.1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Sat Feb 14 18:42:20 2015
@@ -17,6 +17,14 @@

  package org.apache.hive.spark.client;

+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
  import io.netty.channel.ChannelHandlerContext;
  import io.netty.util.concurrent.GenericFutureListener;
  import io.netty.util.concurrent.Promise;
@@ -30,14 +38,12 @@ import java.io.InputStreamReader;
  import java.io.OutputStreamWriter;
  import java.io.Serializable;
  import java.io.Writer;
-import java.net.URL;
+import java.net.URI;
  import java.util.List;
  import java.util.Map;
  import java.util.Properties;
  import java.util.UUID;
  import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
  import java.util.concurrent.atomic.AtomicInteger;

  import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,14 +55,6 @@ import org.apache.spark.SparkException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
  class SparkClientImpl implements SparkClient {
    private static final long serialVersionUID = 1L;

@@ -154,13 +152,13 @@ class SparkClientImpl implements SparkCl
    }

    @Override
- public Future<?> addJar(URL url) {
- return run(new AddJarJob(url.toString()));
+ public Future<?> addJar(URI uri) {
+ return run(new AddJarJob(uri.toString()));
    }

    @Override
- public Future<?> addFile(URL url) {
- return run(new AddFileJob(url.toString()));
+ public Future<?> addFile(URI uri) {
+ return run(new AddFileJob(uri.toString()));
    }

    @Override

Modified: hive/branches/branch-1.1/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.1/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1659842&r1=1659841&r2=1659842&view=diff
==============================================================================
--- hive/branches/branch-1.1/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/branches/branch-1.1/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Sat Feb 14 18:42:20 2015
@@ -22,7 +22,7 @@ import java.io.FileInputStream;
  import java.io.FileOutputStream;
  import java.io.InputStream;
  import java.io.Serializable;
-import java.net.URL;
+import java.net.URI;
  import java.util.Arrays;
  import java.util.HashMap;
  import java.util.Map;
@@ -204,7 +204,7 @@ public class TestSparkClient {
            jarFile.closeEntry();
            jarFile.close();

- client.addJar(new URL("file:" + jar.getAbsolutePath()))
+ client.addJar(new URI("file:" + jar.getAbsolutePath()))
              .get(TIMEOUT, TimeUnit.SECONDS);

            // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
@@ -220,7 +220,7 @@ public class TestSparkClient {
            fileStream.write("test file".getBytes("UTF-8"));
            fileStream.close();

- client.addJar(new URL("file:" + file.getAbsolutePath()))
+ client.addJar(new URI("file:" + file.getAbsolutePath()))
              .get(TIMEOUT, TimeUnit.SECONDS);

            // The same applies to files added with "addFile". They're only guaranteed to be available

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedFeb 14, '15 at 6:42p
activeFeb 14, '15 at 6:42p
posts1
users1
websitehive.apache.org

1 user in discussion

Brock: 1 post

People

Translate

site design / logo © 2021 Grokbase