FAQ
Repository: hive
Updated Branches:
   refs/heads/master d3144256d -> d89a7d1e7


HIVE-11170 : port parts of HIVE-11015 to master for ease of future merging (Sergey Shelukhin, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d89a7d1e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d89a7d1e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d89a7d1e

Branch: refs/heads/master
Commit: d89a7d1e7fe7fb51aeb514e4357ae149158b2a34
Parents: d314425
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Thu Jul 9 17:50:32 2015 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Thu Jul 9 17:50:32 2015 -0700

----------------------------------------------------------------------
  .../hadoop/hive/ql/exec/FilterOperator.java | 3 +-
  .../hive/ql/exec/mr/ExecMapperContext.java | 10 +-
  .../ql/io/HiveContextAwareRecordReader.java | 2 +-
  .../org/apache/hadoop/hive/ql/io/IOContext.java | 43 ------
  .../apache/hadoop/hive/ql/io/IOContextMap.java | 81 +++++++++++
  .../hadoop/hive/ql/exec/TestOperators.java | 3 +-
  .../ql/io/TestHiveBinarySearchRecordReader.java | 2 +-
  .../hadoop/hive/ql/io/TestIOContextMap.java | 133 +++++++++++++++++++
  8 files changed, 223 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index 65301c0..ae35766 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.plan.FilterDesc;
  import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,7 +62,7 @@ public class FilterOperator extends Operator<FilterDesc> implements
        }

        conditionInspector = null;
- ioContext = IOContext.get(hconf);
+ ioContext = IOContextMap.get(hconf);
      } catch (Throwable e) {
        throw new HiveException(e);
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
index 13d0650..fc5abfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
@@ -22,8 +22,8 @@ import java.util.Map;
  import org.apache.commons.logging.Log;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
  import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
  import org.apache.hadoop.mapred.JobConf;

@@ -63,11 +63,11 @@ public class ExecMapperContext {

    public ExecMapperContext(JobConf jc) {
      this.jc = jc;
- ioCxt = IOContext.get(jc);
+ ioCxt = IOContextMap.get(jc);
    }

    public void clear() {
- IOContext.clear();
+ IOContextMap.clear();
      ioCxt = null;
    }

@@ -151,8 +151,4 @@ public class ExecMapperContext {
    public IOContext getIoCxt() {
      return ioCxt;
    }
-
- public void setIoCxt(IOContext ioCxt) {
- this.ioCxt = ioCxt;
- }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 9b3f8ec..738ca9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -162,7 +162,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader
    }

    public IOContext getIOContext() {
- return IOContext.get(jobConf);
+ return IOContextMap.get(jobConf);
    }

    private void initIOContext(long startPos, boolean isBlockPointer,

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index ebad0a6..019db8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -18,13 +18,7 @@

  package org.apache.hadoop.hive.ql.io;

-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;

  /**
   * IOContext basically contains the position information of the current
@@ -35,43 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
   * nextBlockStart refers the end of current row and beginning of next row.
   */
  public class IOContext {
-
- /**
- * Spark uses this thread local
- */
- private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
- @Override
- protected IOContext initialValue() { return new IOContext(); }
- };
-
- private static IOContext get() {
- return IOContext.threadLocal.get();
- }
-
- /**
- * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
- */
- private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
-
- public static IOContext get(Configuration conf) {
- if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
- return get();
- }
- String inputName = conf.get(Utilities.INPUT_NAME);
- if (!inputNameIOContextMap.containsKey(inputName)) {
- IOContext ioContext = new IOContext();
- inputNameIOContextMap.put(inputName, ioContext);
- }
-
- return inputNameIOContextMap.get(inputName);
- }
-
- public static void clear() {
- IOContext.threadLocal.remove();
- inputNameIOContextMap.clear();
- }
-
    private long currentBlockStart;
    private long nextBlockStart;
    private long currentRow;

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
new file mode 100644
index 0000000..342c526
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+/**
+ * NOTE: before LLAP branch merge, there's no LLAP code here.
+ * There used to be a global static map of IOContext-s inside IOContext (Hive style!).
+ * Unfortunately, due to variety of factors, this is now a giant fustercluck.
+ * 1) Spark doesn't apparently care about multiple inputs, but has multiple threads, so one
+ * threadlocal IOContext was added for it.
+ * 2) LLAP has lots of tasks in the same process so globals no longer cut it either.
+ * 3) However, Tez runs 2+ threads for one task (e.g. TezTaskEventRouter and TezChild), and these
+ * surprisingly enough need the same context. Tez, in its infinite wisdom, doesn't allow them
+ * to communicate in any way nor provide any shared context.
+ * So we are going to...
+ * 1) Keep the good ol' global map for MR and Tez. Hive style!
+ * 2) Keep the threadlocal for Spark. Hive style!
+ * 3) Create inheritable (TADA!) threadlocal with attemptId, only set in LLAP; that will propagate
+ * to all the little Tez threads, and we will keep a map per attempt. Hive style squared!
+ */
+public class IOContextMap {
+ public static final String DEFAULT_CONTEXT = "";
+ private static final Log LOG = LogFactory.getLog(IOContextMap.class);
+
+ /** Used for Tez and MR */
+ private static final ConcurrentHashMap<String, IOContext> globalMap =
+ new ConcurrentHashMap<String, IOContext>();
+
+ /** Used for Spark */
+ private static final ThreadLocal<IOContext> sparkThreadLocal = new ThreadLocal<IOContext>(){
+ @Override
+ protected IOContext initialValue() { return new IOContext(); }
+ };
+
+ public static IOContext get(Configuration conf) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return sparkThreadLocal.get();
+ }
+ String inputName = conf.get(Utilities.INPUT_NAME);
+ if (inputName == null) {
+ inputName = DEFAULT_CONTEXT;
+ }
+ ConcurrentHashMap<String, IOContext> map;
+ map = globalMap;
+
+ IOContext ioContext = map.get(inputName);
+ if (ioContext != null) return ioContext;
+ ioContext = new IOContext();
+ IOContext oldContext = map.putIfAbsent(inputName, ioContext);
+ return (oldContext == null) ? ioContext : oldContext;
+ }
+
+ public static void clear() {
+ sparkThreadLocal.remove();
+ globalMap.clear();
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 62057d8..c3a36c0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.Driver;
  import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
  import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
  import org.apache.hadoop.hive.ql.plan.CollectDesc;
  import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -272,7 +273,7 @@ public class TestOperators extends TestCase {
        JobConf hconf = new JobConf(TestOperators.class);
        HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
            "hdfs:///testDir/testFile");
- IOContext.get(hconf).setInputPath(
+ IOContextMap.get(hconf).setInputPath(
            new Path("hdfs:///testDir/testFile"));

        // initialize pathToAliases

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
index 7a1748c..9dc4f5b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
@@ -116,7 +116,7 @@ public class TestHiveBinarySearchRecordReader extends TestCase {

    private void resetIOContext() {
      conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
- ioContext = IOContext.get(conf);
+ ioContext = IOContextMap.get(conf);
      ioContext.setUseSorted(false);
      ioContext.setBinarySearching(false);
      ioContext.setEndBinarySearch(false);

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
new file mode 100644
index 0000000..4469353
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestIOContextMap {
+
+ private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ cdlIn.countDown();
+ try {
+ cdlOut.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testMRTezGlobalMap() throws Exception {
+ // Tests concurrent modification, and that results are the same per input across threads
+ // but different between inputs.
+ final int THREAD_COUNT = 2, ITER_COUNT = 1000;
+ final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+ final CountDownLatch phase1End = new CountDownLatch(THREAD_COUNT);
+ final IOContext[] results = new IOContext[ITER_COUNT];
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
+ Configuration conf = new Configuration();
+ syncThreadStart(cdlIn, cdlOut);
+ // Phase 1 - create objects.
+ while (true) {
+ int nextIx = countdown.decrementAndGet();
+ if (nextIx < 0) break;
+ conf.set(Utilities.INPUT_NAME, "Input " + nextIx);
+ results[nextIx] = IOContextMap.get(conf);
+ if (nextIx == 0) break;
+ }
+ phase1End.countDown();
+ phase1End.await();
+ // Phase 2 - verify we get the expected objects created by all threads.
+ for (int i = 0; i < ITER_COUNT; ++i) {
+ conf.set(Utilities.INPUT_NAME, "Input " + i);
+ IOContext ctx = IOContextMap.get(conf);
+ assertSame(results[i], ctx);
+ }
+ return null;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i].get();
+ }
+ Set<IOContext> resultSet = Sets.newIdentityHashSet();
+ for (int i = 0; i < results.length; ++i) {
+ assertTrue(resultSet.add(results[i])); // All the objects must be different.
+ }
+ }
+
+ @Test
+ public void testSparkThreadLocal() throws Exception {
+ // Test that input name does not change IOContext returned, and that each thread gets its own.
+ final Configuration conf1 = new Configuration();
+ conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark");
+ final Configuration conf2 = new Configuration(conf1);
+ conf2.set(Utilities.INPUT_NAME, "Other input");
+ final int THREAD_COUNT = 2;
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+ @SuppressWarnings("unchecked")
+ FutureTask<IOContext>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<IOContext>(new Callable<IOContext>() {
+ public IOContext call() throws Exception {
+ syncThreadStart(cdlIn, cdlOut);
+ IOContext c1 = IOContextMap.get(conf1), c2 = IOContextMap.get(conf2);
+ assertSame(c1, c2);
+ return c1;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ Set<IOContext> results = Sets.newIdentityHashSet();
+ for (int i = 0; i < tasks.length; ++i) {
+ assertTrue(results.add(tasks[i].get())); // All the objects must be different.
+ }
+ }
+
+}

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJul 10, '15 at 12:50a
activeJul 10, '15 at 12:50a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase