Grokbase Groups Hive commits May 2015
FAQ
Repository: hive
Updated Branches:
   refs/heads/master 77b7fc3df -> 07fcb098b


HIVE-10456: Grace Hash Join should not load spilled partitions on abort (Prasanth Jayachandran reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/07fcb098
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/07fcb098
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/07fcb098

Branch: refs/heads/master
Commit: 07fcb098b63003cf74718351269c79870100b8de
Parents: 77b7fc3
Author: Prasanth Jayachandran <j.prasanth.j@gmail.com>
Authored: Sat May 2 17:40:01 2015 -0700
Committer: Prasanth Jayachandran <j.prasanth.j@gmail.com>
Committed: Sat May 2 17:40:01 2015 -0700

----------------------------------------------------------------------
  .../hadoop/hive/ql/exec/MapJoinOperator.java | 136 +++++++++++--------
  .../apache/hadoop/hive/ql/exec/ObjectCache.java | 7 +
  .../hadoop/hive/ql/exec/mr/ObjectCache.java | 5 +
  .../persistence/HybridHashTableContainer.java | 35 ++++-
  .../hive/ql/exec/tez/HashTableLoader.java | 5 -
  .../hadoop/hive/ql/exec/tez/ObjectCache.java | 6 +
  .../mapjoin/VectorMapJoinRowBytesContainer.java | 2 +-
  7 files changed, 131 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index f2b800a..1cfc411 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -284,7 +284,17 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem

      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
      loader.init(mapContext, mrContext, hconf, this);
- loader.load(mapJoinTables, mapJoinTableSerdes);
+ try {
+ loader.load(mapJoinTables, mapJoinTableSerdes);
+ } catch (HiveException e) {
+ if (isLogInfoEnabled) {
+ LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers.");
+ }
+
+ // there could be some spilled partitions which needs to be cleaned up
+ clearAllTableContainers();
+ throw e;
+ }

      hashTblInitedOnce = true;

@@ -433,7 +443,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
    @Override
    public void closeOp(boolean abort) throws HiveException {
      boolean spilled = false;
- for (MapJoinTableContainer container: mapJoinTables) {
+ for (MapJoinTableContainer container : mapJoinTables) {
        if (container != null) {
          spilled = spilled || container.hasSpill();
          container.dumpMetrics();
@@ -442,79 +452,93 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem

      // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed next
      if (spilled) {
- if (hashMapRowGetters == null) {
- hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
- }
- int numPartitions = 0;
- // Find out number of partitions for each small table (should be same across tables)
- for (byte pos = 0; pos < mapJoinTables.length; pos++) {
- if (pos != conf.getPosBigTable()) {
- firstSmallTable = (HybridHashTableContainer)mapJoinTables[pos];
- numPartitions = firstSmallTable.getHashPartitions().length;
- break;
+ if (!abort) {
+ if (hashMapRowGetters == null) {
+ hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
          }
- }
- assert numPartitions != 0 : "Number of partitions must be greater than 0!";
-
- if (firstSmallTable.hasSpill()) {
- spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
- hybridMapJoinLeftover = true;
-
- // Clear all in-memory partitions first
+ int numPartitions = 0;
+ // Find out number of partitions for each small table (should be same across tables)
          for (byte pos = 0; pos < mapJoinTables.length; pos++) {
- MapJoinTableContainer tableContainer = mapJoinTables[pos];
- if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) {
- HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
- hybridHtContainer.dumpStats();
-
- HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
- // Clear all in memory partitions first
- for (int i = 0; i < hashPartitions.length; i++) {
- if (!hashPartitions[i].isHashMapOnDisk()) {
- hybridHtContainer.setTotalInMemRowCount(
- hybridHtContainer.getTotalInMemRowCount() -
- hashPartitions[i].getHashMapFromMemory().getNumValues());
- hashPartitions[i].getHashMapFromMemory().clear();
+ if (pos != conf.getPosBigTable()) {
+ firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos];
+ numPartitions = firstSmallTable.getHashPartitions().length;
+ break;
+ }
+ }
+ assert numPartitions != 0 : "Number of partitions must be greater than 0!";
+
+ if (firstSmallTable.hasSpill()) {
+ spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
+ hybridMapJoinLeftover = true;
+
+ // Clear all in-memory partitions first
+ for (byte pos = 0; pos < mapJoinTables.length; pos++) {
+ MapJoinTableContainer tableContainer = mapJoinTables[pos];
+ if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) {
+ HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
+ hybridHtContainer.dumpStats();
+
+ HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
+ // Clear all in memory partitions first
+ for (int i = 0; i < hashPartitions.length; i++) {
+ if (!hashPartitions[i].isHashMapOnDisk()) {
+ hybridHtContainer.setTotalInMemRowCount(
+ hybridHtContainer.getTotalInMemRowCount() -
+ hashPartitions[i].getHashMapFromMemory().getNumValues());
+ hashPartitions[i].getHashMapFromMemory().clear();
+ }
                }
+ assert hybridHtContainer.getTotalInMemRowCount() == 0;
              }
- assert hybridHtContainer.getTotalInMemRowCount() == 0;
            }
- }

- // Reprocess the spilled data
- for (int i = 0; i < numPartitions; i++) {
- HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
- if (hashPartitions[i].isHashMapOnDisk()) {
- try {
- continueProcess(i); // Re-process spilled data
- } catch (IOException e) {
- e.printStackTrace();
- } catch (SerDeException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- for (byte pos = 0; pos < order.length; pos++) {
- if (pos != conf.getPosBigTable())
- spilledMapJoinTables[pos] = null;
+ // Reprocess the spilled data
+ for (int i = 0; i < numPartitions; i++) {
+ HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
+ if (hashPartitions[i].isHashMapOnDisk()) {
+ try {
+ continueProcess(i); // Re-process spilled data
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ for (byte pos = 0; pos < order.length; pos++) {
+ if (pos != conf.getPosBigTable())
+ spilledMapJoinTables[pos] = null;
+ }
              }
            }
          }
        }
+
+ if (isLogInfoEnabled) {
+ LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions.");
+ }
+
+ // spilled tables are loaded always (no sharing), so clear it
+ clearAllTableContainers();
+ cache.remove(cacheKey);
      }

+ // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry.
      if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
- && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
- && mapJoinTables != null) {
+ && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())) {
+ if (isLogInfoEnabled) {
+ LOG.info("MR: Clearing all map join table containers.");
+ }
+ clearAllTableContainers();
+ }
+
+ super.closeOp(abort);
+ }
+
+ private void clearAllTableContainers() {
+ if (mapJoinTables != null) {
        for (MapJoinTableContainer tableContainer : mapJoinTables) {
          if (tableContainer != null) {
            tableContainer.clear();
          }
        }
      }
- cache.release(cacheKey);
- this.loader = null;
- super.closeOp(abort);
    }

    /**

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
index f0df2d3..440e0a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
@@ -53,4 +53,11 @@ public interface ObjectCache {
     * @return the last cached object with the key, null if none.
     */
    public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException;
+
+ /**
+ * Removes the specified key from the object cache.
+ *
+ * @param key - key to be removed
+ */
+ public void remove(String key);
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
index a6f698d..bf4ae8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
@@ -91,4 +91,9 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
        }
      };
    }
+
+ @Override
+ public void remove(String key) {
+ // nothing to do
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index 3f6d61e..412226e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -183,6 +183,36 @@ public class HybridHashTableContainer
      public boolean isHashMapOnDisk() {
        return hashMapOnDisk;
      }
+
+ public void clear() {
+ if (hashMap != null) {
+ hashMap.clear();
+ hashMap = null;
+ }
+
+ if (hashMapLocalPath != null) {
+ try {
+ Files.delete(hashMapLocalPath);
+ } catch (Throwable ignored) {
+ }
+ hashMapLocalPath = null;
+ }
+
+ if (sidefileKVContainer != null) {
+ sidefileKVContainer.clear();
+ sidefileKVContainer = null;
+ }
+
+ if (matchfileObjContainer != null) {
+ matchfileObjContainer.clear();
+ matchfileObjContainer = null;
+ }
+
+ if (matchfileRowBytesContainer != null) {
+ matchfileRowBytesContainer.clear();
+ matchfileRowBytesContainer = null;
+ }
+ }
    }

    public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
@@ -546,12 +576,11 @@ public class HybridHashTableContainer
      return toSpillPartitionId;
    }

- /* Clean up in memory hashtables */
    @Override
    public void clear() {
      for (HashPartition hp : hashPartitions) {
- if (hp.hashMap != null) {
- hp.hashMap.clear();
+ if (hp != null) {
+ hp.clear();
        }
      }
      memoryUsed = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 6a81f11..536b92c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
  import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
@@ -181,10 +180,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
          }
          tableContainer.seal();
          mapJoinTables[pos] = tableContainer;
- } catch (IOException e) {
- throw new HiveException(e);
- } catch (SerDeException e) {
- throw new HiveException(e);
        } catch (Exception e) {
          throw new HiveException(e);
        }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
index c0bcb21..64295d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
@@ -93,4 +93,10 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
        }
      });
    }
+
+ @Override
+ public void remove(String key) {
+ LOG.info("Removing key: " + key);
+ registry.delete(key);
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
index c8359d3..1c91be6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
@@ -290,7 +290,7 @@ public class VectorMapJoinRowBytesContainer {
      return currentLength;
    }

- public void clear() throws IOException {
+ public void clear() {
      if (fileInputStream != null) {
        try {
          fileInputStream.close();

Search Discussions

  • Prasanthj at May 3, 2015 at 12:46 am
    Repository: hive
    Updated Branches:
       refs/heads/branch-1.2 a0d84d837 -> 29f588e3a


    HIVE-10456: Grace Hash Join should not load spilled partitions on abort (Prasanth Jayachandran reviewed by Gunther Hagleitner)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/29f588e3
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/29f588e3
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/29f588e3

    Branch: refs/heads/branch-1.2
    Commit: 29f588e3a1e6e73cd3c44a76a4c003043e093de6
    Parents: a0d84d8
    Author: Prasanth Jayachandran <j.prasanth.j@gmail.com>
    Authored: Sat May 2 17:46:07 2015 -0700
    Committer: Prasanth Jayachandran <j.prasanth.j@gmail.com>
    Committed: Sat May 2 17:46:07 2015 -0700

    ----------------------------------------------------------------------
      .../hadoop/hive/ql/exec/MapJoinOperator.java | 136 +++++++++++--------
      .../apache/hadoop/hive/ql/exec/ObjectCache.java | 7 +
      .../hadoop/hive/ql/exec/mr/ObjectCache.java | 5 +
      .../persistence/HybridHashTableContainer.java | 35 ++++-
      .../hive/ql/exec/tez/HashTableLoader.java | 5 -
      .../hadoop/hive/ql/exec/tez/ObjectCache.java | 6 +
      .../mapjoin/VectorMapJoinRowBytesContainer.java | 2 +-
      7 files changed, 131 insertions(+), 65 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    index f2b800a..1cfc411 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    @@ -284,7 +284,17 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem

          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
          loader.init(mapContext, mrContext, hconf, this);
    - loader.load(mapJoinTables, mapJoinTableSerdes);
    + try {
    + loader.load(mapJoinTables, mapJoinTableSerdes);
    + } catch (HiveException e) {
    + if (isLogInfoEnabled) {
    + LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers.");
    + }
    +
    + // there could be some spilled partitions which needs to be cleaned up
    + clearAllTableContainers();
    + throw e;
    + }

          hashTblInitedOnce = true;

    @@ -433,7 +443,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
        @Override
        public void closeOp(boolean abort) throws HiveException {
          boolean spilled = false;
    - for (MapJoinTableContainer container: mapJoinTables) {
    + for (MapJoinTableContainer container : mapJoinTables) {
            if (container != null) {
              spilled = spilled || container.hasSpill();
              container.dumpMetrics();
    @@ -442,79 +452,93 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem

          // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed next
          if (spilled) {
    - if (hashMapRowGetters == null) {
    - hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
    - }
    - int numPartitions = 0;
    - // Find out number of partitions for each small table (should be same across tables)
    - for (byte pos = 0; pos < mapJoinTables.length; pos++) {
    - if (pos != conf.getPosBigTable()) {
    - firstSmallTable = (HybridHashTableContainer)mapJoinTables[pos];
    - numPartitions = firstSmallTable.getHashPartitions().length;
    - break;
    + if (!abort) {
    + if (hashMapRowGetters == null) {
    + hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
              }
    - }
    - assert numPartitions != 0 : "Number of partitions must be greater than 0!";
    -
    - if (firstSmallTable.hasSpill()) {
    - spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
    - hybridMapJoinLeftover = true;
    -
    - // Clear all in-memory partitions first
    + int numPartitions = 0;
    + // Find out number of partitions for each small table (should be same across tables)
              for (byte pos = 0; pos < mapJoinTables.length; pos++) {
    - MapJoinTableContainer tableContainer = mapJoinTables[pos];
    - if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) {
    - HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
    - hybridHtContainer.dumpStats();
    -
    - HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
    - // Clear all in memory partitions first
    - for (int i = 0; i < hashPartitions.length; i++) {
    - if (!hashPartitions[i].isHashMapOnDisk()) {
    - hybridHtContainer.setTotalInMemRowCount(
    - hybridHtContainer.getTotalInMemRowCount() -
    - hashPartitions[i].getHashMapFromMemory().getNumValues());
    - hashPartitions[i].getHashMapFromMemory().clear();
    + if (pos != conf.getPosBigTable()) {
    + firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos];
    + numPartitions = firstSmallTable.getHashPartitions().length;
    + break;
    + }
    + }
    + assert numPartitions != 0 : "Number of partitions must be greater than 0!";
    +
    + if (firstSmallTable.hasSpill()) {
    + spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
    + hybridMapJoinLeftover = true;
    +
    + // Clear all in-memory partitions first
    + for (byte pos = 0; pos < mapJoinTables.length; pos++) {
    + MapJoinTableContainer tableContainer = mapJoinTables[pos];
    + if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) {
    + HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
    + hybridHtContainer.dumpStats();
    +
    + HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
    + // Clear all in memory partitions first
    + for (int i = 0; i < hashPartitions.length; i++) {
    + if (!hashPartitions[i].isHashMapOnDisk()) {
    + hybridHtContainer.setTotalInMemRowCount(
    + hybridHtContainer.getTotalInMemRowCount() -
    + hashPartitions[i].getHashMapFromMemory().getNumValues());
    + hashPartitions[i].getHashMapFromMemory().clear();
    + }
                    }
    + assert hybridHtContainer.getTotalInMemRowCount() == 0;
                  }
    - assert hybridHtContainer.getTotalInMemRowCount() == 0;
                }
    - }

    - // Reprocess the spilled data
    - for (int i = 0; i < numPartitions; i++) {
    - HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
    - if (hashPartitions[i].isHashMapOnDisk()) {
    - try {
    - continueProcess(i); // Re-process spilled data
    - } catch (IOException e) {
    - e.printStackTrace();
    - } catch (SerDeException e) {
    - e.printStackTrace();
    - } catch (ClassNotFoundException e) {
    - e.printStackTrace();
    - }
    - for (byte pos = 0; pos < order.length; pos++) {
    - if (pos != conf.getPosBigTable())
    - spilledMapJoinTables[pos] = null;
    + // Reprocess the spilled data
    + for (int i = 0; i < numPartitions; i++) {
    + HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
    + if (hashPartitions[i].isHashMapOnDisk()) {
    + try {
    + continueProcess(i); // Re-process spilled data
    + } catch (Exception e) {
    + throw new HiveException(e);
    + }
    + for (byte pos = 0; pos < order.length; pos++) {
    + if (pos != conf.getPosBigTable())
    + spilledMapJoinTables[pos] = null;
    + }
                  }
                }
              }
            }
    +
    + if (isLogInfoEnabled) {
    + LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions.");
    + }
    +
    + // spilled tables are loaded always (no sharing), so clear it
    + clearAllTableContainers();
    + cache.remove(cacheKey);
          }

    + // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry.
          if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
    - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
    - && mapJoinTables != null) {
    + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())) {
    + if (isLogInfoEnabled) {
    + LOG.info("MR: Clearing all map join table containers.");
    + }
    + clearAllTableContainers();
    + }
    +
    + super.closeOp(abort);
    + }
    +
    + private void clearAllTableContainers() {
    + if (mapJoinTables != null) {
            for (MapJoinTableContainer tableContainer : mapJoinTables) {
              if (tableContainer != null) {
                tableContainer.clear();
              }
            }
          }
    - cache.release(cacheKey);
    - this.loader = null;
    - super.closeOp(abort);
        }

        /**

    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
    index f0df2d3..440e0a1 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
    @@ -53,4 +53,11 @@ public interface ObjectCache {
         * @return the last cached object with the key, null if none.
         */
        public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException;
    +
    + /**
    + * Removes the specified key from the object cache.
    + *
    + * @param key - key to be removed
    + */
    + public void remove(String key);
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
    index a6f698d..bf4ae8d 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
    @@ -91,4 +91,9 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
            }
          };
        }
    +
    + @Override
    + public void remove(String key) {
    + // nothing to do
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
    index 3f6d61e..412226e 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
    @@ -183,6 +183,36 @@ public class HybridHashTableContainer
          public boolean isHashMapOnDisk() {
            return hashMapOnDisk;
          }
    +
    + public void clear() {
    + if (hashMap != null) {
    + hashMap.clear();
    + hashMap = null;
    + }
    +
    + if (hashMapLocalPath != null) {
    + try {
    + Files.delete(hashMapLocalPath);
    + } catch (Throwable ignored) {
    + }
    + hashMapLocalPath = null;
    + }
    +
    + if (sidefileKVContainer != null) {
    + sidefileKVContainer.clear();
    + sidefileKVContainer = null;
    + }
    +
    + if (matchfileObjContainer != null) {
    + matchfileObjContainer.clear();
    + matchfileObjContainer = null;
    + }
    +
    + if (matchfileRowBytesContainer != null) {
    + matchfileRowBytesContainer.clear();
    + matchfileRowBytesContainer = null;
    + }
    + }
        }

        public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
    @@ -546,12 +576,11 @@ public class HybridHashTableContainer
          return toSpillPartitionId;
        }

    - /* Clean up in memory hashtables */
        @Override
        public void clear() {
          for (HashPartition hp : hashPartitions) {
    - if (hp.hashMap != null) {
    - hp.hashMap.clear();
    + if (hp != null) {
    + hp.clear();
            }
          }
          memoryUsed = 0;

    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    index 6a81f11..536b92c 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
      import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
      import org.apache.hadoop.hive.ql.metadata.HiveException;
      import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
    -import org.apache.hadoop.hive.serde2.SerDeException;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
      import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    @@ -181,10 +180,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
              }
              tableContainer.seal();
              mapJoinTables[pos] = tableContainer;
    - } catch (IOException e) {
    - throw new HiveException(e);
    - } catch (SerDeException e) {
    - throw new HiveException(e);
            } catch (Exception e) {
              throw new HiveException(e);
            }

    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    index c0bcb21..64295d4 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    @@ -93,4 +93,10 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
            }
          });
        }
    +
    + @Override
    + public void remove(String key) {
    + LOG.info("Removing key: " + key);
    + registry.delete(key);
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/29f588e3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
    index c8359d3..1c91be6 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
    @@ -290,7 +290,7 @@ public class VectorMapJoinRowBytesContainer {
          return currentLength;
        }

    - public void clear() throws IOException {
    + public void clear() {
          if (fileInputStream != null) {
            try {
              fileInputStream.close();

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 3, '15 at 12:45a
activeMay 3, '15 at 12:46a
posts2
users1
websitehive.apache.org

1 user in discussion

Prasanthj: 2 posts

People

Translate

site design / logo © 2021 Grokbase