FAQ
Author: xuefu
Date: Fri Oct 24 18:24:01 2014
New Revision: 1634116

URL: http://svn.apache.org/r1634116
Log:
HIVE-8457: MapOperator initialization fails when multiple Spark threads is enabled [Spark Branch] (Chao via Xuefu)

Modified:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Fri Oct 24 18:24:01 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
  import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
  import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
  import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.io.IOContext;
  import org.apache.hadoop.hive.ql.plan.MapWork;
  import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
  import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -88,6 +89,14 @@ public class SparkMapRecordHandler exten
          mo = new MapOperator();
        }
        mo.setConf(mrwork);
+
+ // If the current thread's IOContext is not initialized (because it's reading from a
+ // cached input HadoopRDD), copy from the saved result.
+ IOContext ioContext = IOContext.get();
+ if (ioContext.getInputPath() == null) {
+ IOContext.copy(ioContext, IOContext.getMap().get(SparkUtilities.MAP_IO_CONTEXT));
+ }
+
        // initialize map operator
        mo.setChildren(job);
        l4j.info(mo.dump(0));
@@ -199,6 +208,10 @@ public class SparkMapRecordHandler exten
      } finally {
        MapredContext.close();
        Utilities.clearWorkMap();
+
+ // It's possible that a thread get reused for different queries, so we need to
+ // reset the input path.
+ IOContext.get().setInputPath(null);
      }
    }


Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Fri Oct 24 18:24:01 2014
@@ -294,8 +294,7 @@ public class SparkPlanGenerator {

      JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
          WritableComparable.class, Writable.class);
- MapInput result = new MapInput(hadoopRDD,
- false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+ MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
      return result;
    }


Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Fri Oct 24 18:24:01 2014
@@ -25,6 +25,9 @@ import org.apache.hadoop.io.BytesWritabl
   */
  public class SparkUtilities {

+ // Used to save and retrieve IOContext for multi-insertion.
+ public static final String MAP_IO_CONTEXT = "MAP_IO_CONTEXT";
+
    public static HiveKey copyHiveKey(HiveKey key) {
      HiveKey copy = new HiveKey();
      copy.setDistKeyLength(key.getDistKeyLength());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Fri Oct 24 18:24:01 2014
@@ -27,9 +27,11 @@ import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.exec.FooterBuffer;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
  import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
  import org.apache.hadoop.hive.ql.plan.PartitionDesc;
  import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -171,6 +173,18 @@ public abstract class HiveContextAwareRe
      ioCxtRef.isBlockPointer = isBlockPointer;
      ioCxtRef.inputPath = inputPath;
      LOG.info("Processing file " + inputPath);
+
+ // In spark, in multi-insert an input HadoopRDD maybe be shared by multiple
+ // mappers, and if we cache it, only the first thread will have its thread-local
+ // IOContext initialized, while the rest will not.
+ // To solve this issue, we need to save a copy of the initialized IOContext, so that
+ // later it can be used for other threads.
+ if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ IOContext iocontext = new IOContext();
+ IOContext.copy(iocontext, ioCxtRef);
+ IOContext.getMap().put(SparkUtilities.MAP_IO_CONTEXT, iocontext);
+ }
+
      initDone = true;
    }


Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Fri Oct 24 18:24:01 2014
@@ -112,6 +112,27 @@ public class IOContext {
      this.ioExceptions = false;
    }

+ /**
+ * Copy all fields values from orig to dest, all existing fields in dest will be overwritten.
+ *
+ * @param dest the IOContext to copy to
+ * @param orig the IOContext to copy from
+ */
+ public static void copy(IOContext dest, IOContext orig) {
+ dest.currentBlockStart = orig.currentBlockStart;
+ dest.nextBlockStart = orig.nextBlockStart;
+ dest.currentRow = orig.currentRow;
+ dest.isBlockPointer = orig.isBlockPointer;
+ dest.ioExceptions = orig.ioExceptions;
+ dest.useSorted = orig.useSorted;
+ dest.isBinarySearching = orig.isBinarySearching;
+ dest.endBinarySearch = orig.endBinarySearch;
+ dest.comparison = orig.comparison;
+ dest.genericUDFClassName = orig.genericUDFClassName;
+ dest.ri = orig.ri;
+ dest.inputPath = orig.inputPath;
+ }
+
    public long getCurrentBlockStart() {
      return currentBlockStart;
    }
@@ -224,4 +245,5 @@ public class IOContext {
      this.comparison = null;
      this.genericUDFClassName = null;
    }
+
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 24, '14 at 6:25p
activeOct 24, '14 at 6:25p
posts1
users1
websitehive.apache.org

1 user in discussion

Xuefu: 1 post

People

Translate

site design / logo © 2021 Grokbase