FAQ
Author: szehon
Date: Tue Nov 18 06:46:42 2014
New Revision: 1640277

URL: http://svn.apache.org/r1640277
Log:
HIVE-8833 : Unify spark client API and implement remote spark client.[Spark Branch] (Chengxiang Li via Szehon)

Added:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
Removed:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Modified:
     hive/branches/spark/ql/pom.xml
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
     hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java

Modified: hive/branches/spark/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/pom.xml (original)
+++ hive/branches/spark/ql/pom.xml Tue Nov 18 06:46:42 2014
@@ -61,6 +61,11 @@
        <artifactId>hive-shims</artifactId>
        <version>${project.version}</version>
      </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>spark-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
      <!-- inter-project -->
      <dependency>
        <groupId>com.esotericsoftware.kryo</groupId>

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+public interface HiveSparkClient extends Serializable, Closeable {
+ /**
+ * HiveSparkClient should generate Spark RDD graph by given sparkWork and driverContext,
+ * and submit RDD graph to Spark cluster.
+ * @param driverContext
+ * @param sparkWork
+ * @return SparkJobRef could be used to track spark job progress and metrics.
+ * @throws Exception
+ */
+ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+public class HiveSparkClientFactory {
+ protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
+
+ private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
+ private static final String SPARK_DEFAULT_MASTER = "local";
+ private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
+
+ public static HiveSparkClient createHiveSparkClient(Configuration configuration)
+ throws IOException, SparkException {
+
+ Map<String, String> conf = initiateSparkConf(configuration);
+ // Submit spark job through local spark context while spark master is local mode, otherwise submit
+ // spark job through remote spark context.
+ String master = conf.get("spark.master");
+ if (master.equals("local") || master.startsWith("local[")) {
+ // With local spark context, all user sessions share the same spark context.
+ return LocalHiveSparkClient.getInstance(generateSparkConf(conf));
+ } else {
+ return new RemoteHiveSparkClient(conf);
+ }
+ }
+
+ private static Map<String, String> initiateSparkConf(Configuration hiveConf) {
+ Map<String, String> sparkConf = new HashMap<String, String>();
+
+ // set default spark configurations.
+ sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
+ sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
+ sparkConf.put("spark.serializer",
+ "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.put("spark.default.parallelism", "1");
+
+ // load properties from spark-defaults.conf.
+ InputStream inputStream = null;
+ try {
+ inputStream = HiveSparkClientFactory.class.getClassLoader()
+ .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
+ if (inputStream != null) {
+ LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
+ Properties properties = new Properties();
+ properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8));
+ for (String propertyName : properties.stringPropertyNames()) {
+ if (propertyName.startsWith("spark")) {
+ String value = properties.getProperty(propertyName);
+ sparkConf.put(propertyName, properties.getProperty(propertyName));
+ LOG.info(String.format(
+ "load spark configuration from %s (%s -> %s).",
+ SPARK_DEFAULT_CONF_FILE, propertyName, value));
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to open spark configuration file:"
+ + SPARK_DEFAULT_CONF_FILE, e);
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOG.debug("Failed to close inputstream.", e);
+ }
+ }
+ }
+
+ // load properties from hive configurations.
+ for (Map.Entry<String, String> entry : hiveConf) {
+ String propertyName = entry.getKey();
+ if (propertyName.startsWith("spark")) {
+ String value = entry.getValue();
+ sparkConf.put(propertyName, value);
+ LOG.info(String.format(
+ "load spark configuration from hive configuration (%s -> %s).",
+ propertyName, value));
+ }
+ }
+
+ return sparkConf;
+ }
+
+ private static SparkConf generateSparkConf(Map<String, String> conf) {
+ SparkConf sparkConf = new SparkConf(false);
+ for (Map.Entry<String, String> entry : conf.entrySet()) {
+ sparkConf.set(entry.getKey(), entry.getValue());
+ }
+ return sparkConf;
+ }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * LocalSparkClient submit Spark job in local driver, it's responsible for build spark client
+ * environment and execute spark work.
+ */
+public class LocalHiveSparkClient implements HiveSparkClient {
+ private static final long serialVersionUID = 1L;
+
+ private static final String MR_JAR_PROPERTY = "tmpjars";
+ protected static transient final Log LOG = LogFactory
+ .getLog(LocalHiveSparkClient.class);
+
+ private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
+ private static LocalHiveSparkClient client;
+
+ public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) {
+ if (client == null) {
+ client = new LocalHiveSparkClient(sparkConf);
+ }
+ return client;
+ }
+
+ /**
+ * Get Spark shuffle memory per task, and total number of cores. This
+ * information can be used to estimate how many reducers a task can have.
+ *
+ * @return a tuple, the first element is the shuffle memory per task in bytes,
+ * the second element is the number of total cores usable by the client
+ */
+ public Tuple2<Long, Integer> getMemoryAndCores() {
+ SparkContext sparkContext = sc.sc();
+ SparkConf sparkConf = sparkContext.conf();
+ int cores = sparkConf.getInt("spark.executor.cores", 1);
+ double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+ // sc.executorMemory() is in MB, need to convert to bytes
+ long memoryPerTask =
+ (long) (sparkContext.executorMemory() * memoryFraction * 1024 * 1024 / cores);
+ int executors = sparkContext.getExecutorMemoryStatus().size();
+ int totalCores = executors * cores;
+ LOG.info("Spark cluster current has executors: " + executors
+ + ", cores per executor: " + cores + ", memory per executor: "
+ + sparkContext.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
+ return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTask),
+ Integer.valueOf(totalCores));
+ }
+
+ private JavaSparkContext sc;
+
+ private List<String> localJars = new ArrayList<String>();
+
+ private List<String> localFiles = new ArrayList<String>();
+
+ private JobMetricsListener jobMetricsListener;
+
+ private LocalHiveSparkClient(SparkConf sparkConf) {
+ sc = new JavaSparkContext(sparkConf);
+ jobMetricsListener = new JobMetricsListener();
+ sc.sc().listenerBus().addListener(jobMetricsListener);
+ }
+
+ @Override
+ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
+ Context ctx = driverContext.getCtx();
+ HiveConf hiveConf = (HiveConf) ctx.getConf();
+ refreshLocalResources(sparkWork, hiveConf);
+ JobConf jobConf = new JobConf(hiveConf);
+
+ // Create temporary scratch dir
+ Path emptyScratchDir;
+ emptyScratchDir = ctx.getMRTmpPath();
+ FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+ fs.mkdirs(emptyScratchDir);
+
+ SparkCounters sparkCounters = new SparkCounters(sc, hiveConf);
+ Map<String, List<String>> prefixes = sparkWork.getRequiredCounterPrefix();
+ if (prefixes != null) {
+ for (String group : prefixes.keySet()) {
+ for (String counterName : prefixes.get(group)) {
+ sparkCounters.createCounter(group, counterName);
+ }
+ }
+ }
+ SparkReporter sparkReporter = new SparkReporter(sparkCounters);
+
+ // Generate Spark plan
+ SparkPlanGenerator gen =
+ new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter);
+ SparkPlan plan = gen.generate(sparkWork);
+
+ // Execute generated plan.
+ JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+ // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+ JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+ // As we always use foreach action to submit RDD graph, it would only trigger on job.
+ int jobId = future.jobIds().get(0);
+ SimpleSparkJobStatus sparkJobStatus =
+ new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future);
+ return new SparkJobRef(Integer.toString(jobId), sparkJobStatus);
+ }
+
+ /**
+ * At this point single SparkContext is used by more than one thread, so make this
+ * method synchronized.
+ *
+ * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+ * issue we have to live with until multiple SparkContexts are supported in a single JVM.
+ */
+ private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+ // add hive-exec jar
+ addJars((new JobConf(this.getClass())).getJar());
+
+ // add aux jars
+ addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
+
+ // add added jars
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+ addJars(addedJars);
+
+ // add plugin module jars on demand
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(MR_JAR_PROPERTY, "");
+ for (BaseWork work : sparkWork.getAllWork()) {
+ work.configureJobConf(jobConf);
+ }
+ addJars(conf.get(MR_JAR_PROPERTY));
+
+ // add added files
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+ addResources(addedFiles);
+
+ // add added archives
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ addResources(addedArchives);
+ }
+
+ private void addResources(String addedFiles) {
+ for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+ if (!localFiles.contains(addedFile)) {
+ localFiles.add(addedFile);
+ sc.addFile(addedFile);
+ }
+ }
+ }
+
+ private void addJars(String addedJars) {
+ for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+ if (!localJars.contains(addedJar)) {
+ localJars.add(addedJar);
+ sc.addJar(addedJar);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ sc.stop();
+ client = null;
+ }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1640277&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Nov 18 06:46:42 2014
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.spark.client.Job;
+import org.apache.hive.spark.client.JobContext;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.client.SparkClientFactory;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which
+ * wrap a spark job request and send to an remote SparkContext.
+ */
+public class RemoteHiveSparkClient implements HiveSparkClient {
+ private static final long serialVersionUID = 1L;
+
+ private static final String MR_JAR_PROPERTY = "tmpjars";
+ protected static transient final Log LOG = LogFactory
+ .getLog(RemoteHiveSparkClient.class);
+
+ private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
+ private transient SparkClient remoteClient;
+
+ private transient List<String> localJars = new ArrayList<String>();
+
+ private transient List<String> localFiles = new ArrayList<String>();
+
+ RemoteHiveSparkClient(Map<String, String> sparkConf) throws IOException, SparkException {
+ SparkClientFactory.initialize(sparkConf);
+ remoteClient = SparkClientFactory.createClient(sparkConf);
+ }
+
+ @Override
+ public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
+ final Context ctx = driverContext.getCtx();
+ final HiveConf hiveConf = (HiveConf) ctx.getConf();
+ refreshLocalResources(sparkWork, hiveConf);
+ final JobConf jobConf = new JobConf(hiveConf);
+
+ // Create temporary scratch dir
+ final Path emptyScratchDir = ctx.getMRTmpPath();
+ FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+ fs.mkdirs(emptyScratchDir);
+
+ final byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
+ final byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
+ final byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
+
+ JobHandle<Serializable> jobHandle = remoteClient.submit(new Job<Serializable>() {
+ @Override
+ public Serializable call(JobContext jc) throws Exception {
+ JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes);
+ Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
+ SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class);
+
+ SparkCounters sparkCounters = new SparkCounters(jc.sc(), localJobConf);
+ Map<String, List<String>> prefixes = localSparkWork.getRequiredCounterPrefix();
+ if (prefixes != null) {
+ for (String group : prefixes.keySet()) {
+ for (String counterName : prefixes.get(group)) {
+ sparkCounters.createCounter(group, counterName);
+ }
+ }
+ }
+ SparkReporter sparkReporter = new SparkReporter(sparkCounters);
+
+ // Generate Spark plan
+ SparkPlanGenerator gen =
+ new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter);
+ SparkPlan plan = gen.generate(localSparkWork);
+
+ // Execute generated plan.
+ JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+ // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+ JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+ jc.monitor(future);
+ return null;
+ }
+ });
+ jobHandle.get();
+ return new SparkJobRef(jobHandle.getClientJobId());
+ }
+
+ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+ // add hive-exec jar
+ addJars((new JobConf(this.getClass())).getJar());
+
+ // add aux jars
+ addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
+
+ // add added jars
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+ addJars(addedJars);
+
+ // add plugin module jars on demand
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(MR_JAR_PROPERTY, "");
+ for (BaseWork work : sparkWork.getAllWork()) {
+ work.configureJobConf(jobConf);
+ }
+ addJars(conf.get(MR_JAR_PROPERTY));
+
+ // add added files
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+ addResources(addedFiles);
+
+ // add added archives
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ addResources(addedArchives);
+ }
+
+ private void addResources(String addedFiles) {
+ for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+ if (!localFiles.contains(addedFile)) {
+ localFiles.add(addedFile);
+ try {
+ remoteClient.addFile(SparkUtilities.getURL(addedFile));
+ } catch (MalformedURLException e) {
+ LOG.warn("Failed to add file:" + addedFile);
+ }
+ }
+ }
+ }
+
+ private void addJars(String addedJars) {
+ for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+ if (!localJars.contains(addedJar)) {
+ localJars.add(addedJar);
+ try {
+ remoteClient.addJar(SparkUtilities.getURL(addedJar));
+ } catch (MalformedURLException e) {
+ LOG.warn("Failed to add jar:" + addedJar);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ remoteClient.stop();
+ }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue Nov 18 06:46:42 2014
@@ -110,15 +110,17 @@ public class SparkTask extends Task<Spar

        SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
        SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
- sparkCounters = sparkJobStatus.getCounter();
- SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
- monitor.startMonitor();
- SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
- if (LOG.isInfoEnabled() && sparkStatistics != null) {
- LOG.info(String.format("=====Spark Job[%d] statistics=====", jobRef.getJobId()));
- logSparkStatistic(sparkStatistics);
+ if (sparkJobStatus != null) {
+ sparkCounters = sparkJobStatus.getCounter();
+ SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
+ monitor.startMonitor();
+ SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
+ if (LOG.isInfoEnabled() && sparkStatistics != null) {
+ LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
+ logSparkStatistic(sparkStatistics);
+ }
+ sparkJobStatus.cleanup();
        }
- sparkJobStatus.cleanup();
        rc = 0;
      } catch (Exception e) {
        LOG.error("Failed to execute spark task.", e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Nov 18 06:46:42 2014
@@ -20,6 +20,12 @@ package org.apache.hadoop.hive.ql.exec.s
  import org.apache.hadoop.hive.ql.io.HiveKey;
  import org.apache.hadoop.io.BytesWritable;

+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
  /**
   * Contains utilities methods used as part of Spark tasks
   */
@@ -41,4 +47,25 @@ public class SparkUtilities {
      copy.set(bw);
      return copy;
    }
+
+ public static URL getURL(String path) throws MalformedURLException {
+ if (path == null) {
+ return null;
+ }
+
+ URL url = null;
+ try {
+ URI uri = new URI(path);
+ if (uri.getScheme() != null) {
+ url = uri.toURL();
+ } else {
+ // if no file schema in path, we assume it's file on local fs.
+ url = new File(path).toURI().toURL();
+ }
+ } catch (URISyntaxException e) {
+ // do nothing here, just return null if input path is not a valid URI.
+ }
+
+ return url;
+ }
  }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Tue Nov 18 06:46:42 2014
@@ -18,12 +18,17 @@
  package org.apache.hadoop.hive.ql.exec.spark.session;

  import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
  import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
  import org.apache.hadoop.hive.ql.plan.SparkWork;

+import java.io.IOException;
  import java.util.UUID;

  /**
@@ -31,10 +36,12 @@ import java.util.UUID;
   * SparkClient which is shared by all SparkSession instances.
   */
  public class SparkSessionImpl implements SparkSession {
+ private static final Log LOG = LogFactory.getLog(SparkSession.class);
+
    private HiveConf conf;
    private boolean isOpen;
    private final String sessionId;
- private SparkClient sparkClient;
+ private HiveSparkClient hiveSparkClient;

    public SparkSessionImpl() {
      sessionId = makeSessionId();
@@ -49,8 +56,9 @@ public class SparkSessionImpl implements
    @Override
    public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
      Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
- sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf());
- return sparkClient.execute(driverContext, sparkWork);
+ Configuration hiveConf = driverContext.getCtx().getConf();
+ hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf);
+ return hiveSparkClient.execute(driverContext, sparkWork);
    }

    @Override
@@ -71,10 +79,14 @@ public class SparkSessionImpl implements
    @Override
    public void close() {
      isOpen = false;
- if (sparkClient != null) {
- sparkClient.close();
+ if (hiveSparkClient != null) {
+ try {
+ hiveSparkClient.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close spark session (" + sessionId + ").", e);
+ }
      }
- sparkClient = null;
+ hiveSparkClient = null;
    }

    public static String makeSessionId() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Tue Nov 18 06:46:42 2014
@@ -19,26 +19,26 @@ package org.apache.hadoop.hive.ql.exec.s

  public class SparkJobRef {

- private int jobId;
+ private String jobId;

    private SparkJobStatus sparkJobStatus;

    public SparkJobRef() {}

- public SparkJobRef(int jobId) {
+ public SparkJobRef(String jobId) {
      this.jobId = jobId;
    }

- public SparkJobRef(int jobId, SparkJobStatus sparkJobStatus) {
+ public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) {
      this.jobId = jobId;
      this.sparkJobStatus = sparkJobStatus;
    }

- public int getJobId() {
+ public String getJobId() {
      return jobId;
    }

- public void setJobId(int jobId) {
+ public void setJobId(String jobId) {
      this.jobId = jobId;
    }


Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Tue Nov 18 06:46:42 2014
@@ -18,6 +18,7 @@

  package org.apache.hadoop.hive.ql.optimizer.spark;

+import java.io.IOException;
  import java.util.Stack;

  import org.apache.commons.logging.Log;
@@ -26,7 +27,9 @@ import org.apache.hadoop.hive.conf.HiveC
  import org.apache.hadoop.hive.ql.exec.Operator;
  import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
  import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
  import org.apache.hadoop.hive.ql.lib.Node;
  import org.apache.hadoop.hive.ql.lib.NodeProcessor;
  import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.s
  import org.apache.hadoop.hive.ql.plan.OperatorDesc;
  import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;

+import org.apache.spark.SparkException;
  import scala.Tuple2;

  /**
@@ -69,39 +73,54 @@ public class SetSparkReducerParallelism

      context.getVisitedReduceSinks().add(sink);

+
      if (desc.getNumReducers() <= 0) {
        if (constantReducers > 0) {
          LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
          desc.setNumReducers(constantReducers);
        } else {
- long numberOfBytes = 0;
+ try {
+ // TODO try to make this still work after integration with remote spark context, so that we
+ // don't break test, we should implement automatic calculate reduce number for remote spark
+ // client and refactor code later, track it with HIVE-8855.
+ HiveSparkClient sparkClient = HiveSparkClientFactory.createHiveSparkClient(context.getConf());
+ if (sparkClient instanceof LocalHiveSparkClient) {
+ LocalHiveSparkClient localHiveSparkClient = (LocalHiveSparkClient)sparkClient;
+ long numberOfBytes = 0;
+
+ // we need to add up all the estimates from the siblings of this reduce sink
+ for (Operator<? extends OperatorDesc> sibling:
+ sink.getChildOperators().get(0).getParentOperators()) {
+ if (sibling.getStatistics() != null) {
+ numberOfBytes += sibling.getStatistics().getDataSize();
+ } else {
+ LOG.warn("No stats available from: " + sibling);
+ }
+ }
+
+ if (sparkMemoryAndCores == null) {
+ sparkMemoryAndCores = localHiveSparkClient.getMemoryAndCores();
+ }
+
+ // Divide it by 2 so that we can have more reducers
+ long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
+ int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+ maxReducers, false);
+
+ // If there are more cores, use the number of cores
+ int cores = sparkMemoryAndCores._2.intValue();
+ if (numReducers < cores) {
+ numReducers = cores;
+ }
+ LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+ desc.setNumReducers(numReducers);

- // we need to add up all the estimates from the siblings of this reduce sink
- for (Operator<? extends OperatorDesc> sibling:
- sink.getChildOperators().get(0).getParentOperators()) {
- if (sibling.getStatistics() != null) {
- numberOfBytes += sibling.getStatistics().getDataSize();
            } else {
- LOG.warn("No stats available from: " + sibling);
+ sparkClient.close();
            }
+ } catch (Exception e) {
+ LOG.warn("Failed to create spark client.", e);
          }
-
- if (sparkMemoryAndCores == null) {
- sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf());
- }
-
- // Divide it by 2 so that we can have more reducers
- long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
- int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
- maxReducers, false);
-
- // If there are more cores, use the number of cores
- int cores = sparkMemoryAndCores._2.intValue();
- if (numReducers < cores) {
- numReducers = cores;
- }
- LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
- desc.setNumReducers(numReducers);
        }
      } else {
        LOG.info("Number of reducers determined to be: " + desc.getNumReducers());

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Tue Nov 18 06:46:42 2014
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.common.cla
   * Defines the API for the Spark remote client.
   */
  @InterfaceAudience.Private
-public interface SparkClient {
+public interface SparkClient extends Serializable {

    /**
     * Submits a job for asynchronous execution.

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1640277&r1=1640276&r2=1640277&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Nov 18 06:46:42 2014
@@ -190,7 +190,7 @@ class SparkClientImpl implements SparkCl
          LOG.info("No spark.home provided, calling SparkSubmit directly.");
          argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());

- if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client")) {
+ if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
            String mem = conf.get("spark.driver.memory");
            if (mem != null) {
              argv.add("-Xms" + mem);

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedNov 18, '14 at 6:47a
activeNov 18, '14 at 6:47a
posts1
users1
websitehive.apache.org

1 user in discussion

Szehon: 1 post

People

Translate

site design / logo © 2022 Grokbase