FAQ
Author: sershe
Date: Wed Apr 1 18:00:38 2015
New Revision: 1670742

URL: http://svn.apache.org/r1670742
Log:
HIVE-10123 : Hybrid grace Hash join : Use estimate key count from stats to initialize BytesBytesMultiHashMap (Mostafa Mokhtar, reviewed by Sergey Shelukhin)

Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1670742&r1=1670741&r2=1670742&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Apr 1 18:00:38 2015
@@ -498,26 +498,26 @@ public class MapJoinOperator extends Abs
    private void reloadHashTable(HashPartition partition,
                                 HybridHashTableContainer hybridHtContainer)
        throws IOException, ClassNotFoundException, HiveException, SerDeException {
- // Deserialize the on-disk hash table
- // We're sure this part is smaller than memory limit
- BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk();
- int rowCount = restoredHashMap.getNumValues();
- LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
- LOG.info("Hybrid Grace Hash Join: Number of rows restored from hashmap: " + rowCount);

      // Merge the sidefile into the newly created hash table
      // This is where the spilling may happen again
      KeyValueContainer kvContainer = partition.getSidefileKVContainer();
- rowCount += kvContainer.size();
+ int rowCount = kvContainer.size();
      LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " +
          kvContainer.size());

+ // Deserialize the on-disk hash table
+ // We're sure this part is smaller than memory limit
+ BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount);
+ rowCount += restoredHashMap.getNumValues();
+ LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
+ LOG.info("Hybrid Grace Hash Join: Number of rows in hashmap: " + rowCount);
+
      // If based on the new key count, keyCount is smaller than a threshold,
      // then just load the entire restored hashmap into memory.
      // The size of deserialized partition shouldn't exceed half of memory limit
      if (rowCount * hybridHtContainer.getTableRowSize() >= hybridHtContainer.getMemoryThreshold() / 2) {
- throw new RuntimeException("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" +
- " will be greater than memory limit. Recursive spilling is currently not supported");
+ LOG.info("Hybrid Grace Hash Join: Hash table reload can fail since it will be greater than memory limit. Recursive spilling is currently not supported");
      }

      KeyValueHelper writeHelper = hybridHtContainer.getWriteHelper();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1670742&r1=1670741&r2=1670742&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Wed Apr 1 18:00:38 2015
@@ -146,7 +146,7 @@ public final class BytesBytesMultiHashMa
    private long[] refs;
    private int startingHashBitCount, hashBitCount;

- private int metricPutConflict = 0, metricExpands = 0, metricExpandsUs = 0;
+ private int metricPutConflict = 0, metricGetConflict = 0, metricExpands = 0, metricExpandsMs = 0;

    /** We have 39 bits to store list pointer from the first record; this is size limit */
    final static long MAX_WB_SIZE = ((long)1) << 38;
@@ -341,6 +341,17 @@ public final class BytesBytesMultiHashMa
      this.keysAssigned = 0;
    }

+ public void expandAndRehashToTarget(int estimateNewRowCount) {
+ int oldRefsCount = refs.length;
+ int newRefsCount = oldRefsCount + estimateNewRowCount;
+ if (resizeThreshold <= newRefsCount) {
+ newRefsCount =
+ (Long.bitCount(newRefsCount) == 1) ? estimateNewRowCount : nextHighestPowerOfTwo(newRefsCount);
+ expandAndRehashImpl(newRefsCount);
+ LOG.info("Expand and rehash to " + newRefsCount + " from " + oldRefsCount);
+ }
+ }
+
    private static void validateCapacity(long capacity) {
      if (Long.bitCount(capacity) != 1) {
        throw new AssertionError("Capacity must be a power of two");
@@ -405,6 +416,7 @@ public final class BytesBytesMultiHashMa
        if (isSameKey(key, length, ref, hashCode)) {
          return ref;
        }
+ ++metricGetConflict;
        probeSlot += (++i);
        if (i > largestNumberOfSteps) {
          // We know we never went that far when we were inserting.
@@ -501,9 +513,13 @@ public final class BytesBytesMultiHashMa
    }

    private void expandAndRehash() {
- long expandTime = System.nanoTime();
- final long[] oldRefs = refs;
      long capacity = refs.length << 1;
+ expandAndRehashImpl(capacity);
+ }
+
+ private void expandAndRehashImpl(long capacity) {
+ long expandTime = System.currentTimeMillis();
+ final long[] oldRefs = refs;
      validateCapacity(capacity);
      long[] newRefs = new long[(int)capacity];

@@ -533,9 +549,8 @@ public final class BytesBytesMultiHashMa
      this.largestNumberOfSteps = maxSteps;
      this.hashBitCount = newHashBitCount;
      this.resizeThreshold = (int)(capacity * loadFactor);
- metricExpandsUs += (System.nanoTime() - expandTime);
+ metricExpandsMs += (System.currentTimeMillis() - expandTime);
      ++metricExpands;
-
    }

    /**
@@ -753,7 +768,8 @@ public final class BytesBytesMultiHashMa
    public void debugDumpMetrics() {
      LOG.info("Map metrics: keys allocated " + this.refs.length +", keys assigned " + keysAssigned
          + ", write conflict " + metricPutConflict + ", write max dist " + largestNumberOfSteps
- + ", expanded " + metricExpands + " times in " + metricExpandsUs + "us");
+ + ", read conflict " + metricGetConflict
+ + ", expanded " + metricExpands + " times in " + metricExpandsMs + "ms");
    }

    private void debugDumpKeyProbe(long keyOffset, int keyLength, int hashCode, int finalSlot) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java?rev=1670742&r1=1670741&r2=1670742&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java Wed Apr 1 18:00:38 2015
@@ -123,15 +123,20 @@ public class HybridHashTableContainer im
      /* Restore the hashmap from disk by deserializing it.
       * Currently Kryo is used for this purpose.
       */
- public BytesBytesMultiHashMap getHashMapFromDisk()
+ public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity)
          throws IOException, ClassNotFoundException {
        if (hashMapSpilledOnCreation) {
- return new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, -1);
+ return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1);
        } else {
          InputStream inputStream = Files.newInputStream(hashMapLocalPath);
          com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream);
          Kryo kryo = Utilities.runtimeSerializationKryo.get();
          BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
+
+ if (initialCapacity > 0) {
+ restoredHashMap.expandAndRehashToTarget(initialCapacity);
+ }
+
          input.close();
          inputStream.close();
          Files.delete(hashMapLocalPath);
@@ -163,7 +168,8 @@ public class HybridHashTableContainer im

    public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize)
        throws SerDeException {
- this(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
+ this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
+ HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
           HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
           HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
           HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
@@ -171,22 +177,27 @@ public class HybridHashTableContainer im
           tableSize, keyCount, memUsage);
    }

- private HybridHashTableContainer(int threshold, float loadFactor, int wbSize,
+ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, int wbSize,
                                     long noConditionalTaskThreshold, int memCheckFreq, long tableSize,
                                     long keyCount, long memUsage) throws SerDeException {
+
+ int newKeyCount = HashMapWrapper.calculateTableSize(
+ keyCountAdj, threshold, loadFactor, keyCount);
+
      memoryThreshold = noConditionalTaskThreshold;
- tableRowSize = tableSize / keyCount;
+ tableRowSize = tableSize / newKeyCount;
      memoryCheckFrequency = memCheckFreq;

      int numPartitions = calcNumPartitions(tableSize, wbSize); // estimate # of partitions to create
      hashPartitions = new HashPartition[numPartitions];
      int numPartitionsSpilledOnCreation = 0;
      long memoryAllocated = 0;
+ int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions);
      for (int i = 0; i < numPartitions; i++) {
        if (i == 0) { // We unconditionally create a hashmap for the first hash partition
- hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage, true);
+ hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage, true);
        } else {
- hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage,
+ hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage,
                                                memoryAllocated + wbSize < memoryThreshold);
        }
        if (isHashMapSpilledOnCreation(i)) {

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 1, '15 at 6:00p
activeApr 1, '15 at 6:00p
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase