Grokbase Groups Hive commits May 2015
FAQ
Repository: hive
Updated Branches:
   refs/heads/llap 2fbe0fae2 -> 8a62fc9c6


HIVE-10482 : LLAP: AsertionError cannot allocate when reading from orc (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8a62fc9c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8a62fc9c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8a62fc9c

Branch: refs/heads/llap
Commit: 8a62fc9c63ee1e30e02e702e365ec754211b4de0
Parents: 2fbe0fa
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Tue May 5 17:37:47 2015 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Tue May 5 17:37:47 2015 -0700

----------------------------------------------------------------------
  .../hadoop/hive/llap/cache/Allocator.java | 10 ++-
  .../hadoop/hive/llap/cache/BuddyAllocator.java | 95 +++++++++++++-------
  .../hive/llap/cache/LlapOomDebugDump.java | 23 +++++
  .../hive/llap/cache/LowLevelCacheImpl.java | 30 ++++++-
  .../llap/cache/LowLevelCacheMemoryManager.java | 5 ++
  .../hive/llap/cache/LowLevelCachePolicy.java | 3 +-
  .../llap/cache/LowLevelFifoCachePolicy.java | 25 ++++++
  .../llap/cache/LowLevelLrfuCachePolicy.java | 16 ++++
  .../hadoop/hive/llap/cache/MemoryManager.java | 2 +-
  .../hive/llap/io/api/impl/LlapIoImpl.java | 1 +
  .../hive/llap/cache/TestBuddyAllocator.java | 53 ++++++-----
  .../hive/llap/cache/TestLowLevelCacheImpl.java | 10 ++-
  .../hive/llap/cache/TestOrcMetadataCache.java | 14 ++-
  .../org/apache/hadoop/hive/llap/DebugUtils.java | 4 +
  .../hive/ql/io/orc/EncodedReaderImpl.java | 17 ++--
  .../apache/hadoop/hive/ql/io/orc/InStream.java | 22 +----
  16 files changed, 246 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
index 1bb64ae..4e990ef 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
@@ -19,9 +19,15 @@ package org.apache.hadoop.hive.llap.cache;

  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;

-
  public interface Allocator {
- boolean allocateMultiple(LlapMemoryBuffer[] dest, int size);
+ public static class LlapCacheOutOfMemoryException extends RuntimeException {
+ public LlapCacheOutOfMemoryException(String msg) {
+ super(msg);
+ }
+
+ private static final long serialVersionUID = 268124648177151761L;
+ }
+ void allocateMultiple(LlapMemoryBuffer[] dest, int size) throws LlapCacheOutOfMemoryException;
    void deallocate(LlapMemoryBuffer buffer);
    boolean isDirectAlloc();
    int getMaxAllocation();

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 4cc7682..9f1472f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -89,7 +89,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca

    // TODO: would it make sense to return buffers asynchronously?
    @Override
- public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ public void allocateMultiple(LlapMemoryBuffer[] dest, int size)
+ throws LlapCacheOutOfMemoryException {
      assert size > 0 : "size is " + size;
      if (size > maxAllocation) {
        throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
@@ -114,29 +115,54 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
        int startIndex = (int)(threadId % arenaCount), index = startIndex;
        do {
          int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize);
- if (newIx == dest.length) return true;
- if (newIx != -1) { // Shouldn't happen.
+ if (newIx == dest.length) return;
+ if (newIx != -1) { // TODO: check if it can still happen; count should take care of this.
            ix = newIx;
          }
+ ix = newIx;
          if ((++index) == arenaCount) {
            index = 0;
          }
        } while (index != startIndex);
      }

- // Then try to split bigger blocks. TODO: again, ideally we would tryLock at least once
- for (int i = 0; i < arenaCount; ++i) {
- int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
- if (newIx == -1) break; // Shouldn't happen.
- if (newIx == dest.length) return true;
- ix = newIx;
- }
- // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
- for (int i = arenaCount; i < arenas.length; ++i) {
- ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
- if (ix == dest.length) return true;
+ // TODO: this is very hacky.
+ // We called reserveMemory so we know that somewhere in there, there's memory waiting for us.
+ // However, we have a class of rare race conditions related to the order of locking/checking of
+ // different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
+ // We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
+ // we look at arena 2 and find no memory. Or, for single arena, 2 threads reserve 256k each,
+ // and a single 1Mb block is available. When the 1st thread locks the 1Mb freelist, the 2nd one
+ // might have already examined the 256k and 512k lists, finding nothing. Blocks placed by (1)
+ // into smaller lists after its split is done will not be found by (2); given that freelist
+ // locks don't overlap, (2) may even run completely between the time (1) takes out the 1Mb
+ // block and the time it returns the remaining 768Kb.
+ // Two solutions to this are some form of cross-thread helping (threads putting "demand"
+ // into some sort of queues that deallocate and split will examine), or having and "actor"
+ // allocator thread (or threads per arena).
+ // The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code.
+ // But for now we will just retry 5 times 0_o
+ for (int attempt = 0; attempt < 5; ++attempt) {
+ // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
+ for (int i = 0; i < arenaCount; ++i) {
+ int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
+ if (newIx == -1) break; // Shouldn't happen.
+ if (newIx == dest.length) return;
+ ix = newIx;
+ }
+ if (attempt == 0) {
+ // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
+ for (int i = arenaCount; i < arenas.length; ++i) {
+ ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
+ if (ix == dest.length) return;
+ }
+ }
+ LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry " + attempt);
      }
- return false;
+ String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length;
+ LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump()
+ + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom());
+ throw new LlapCacheOutOfMemoryException(msg);
    }

    @Override
@@ -170,7 +196,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
      for (Arena arena : arenas) {
        arena.debugDump(result);
      }
- result.append("\n");
      return result.toString();
    }

@@ -237,14 +262,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
        // Try to get as consistent view as we can; make copy of the headers.
        byte[] headers = new byte[this.headers.length];
        System.arraycopy(this.headers, 0, headers, 0, headers.length);
- for (int i = 0; i < headers.length; ++i) {
- byte header = headers[i];
- if (header == 0) continue;
- int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
- boolean isFree = (header & 1) == 0;
- result.append("\n block " + i + " at " + offset + ": size "
- + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
- }
        int allocSize = minAllocation;
        for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
          result.append("\n free list for size " + allocSize + ": ");
@@ -260,6 +277,14 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
            freeList.lock.unlock();
          }
        }
+ for (int i = 0; i < headers.length; ++i) {
+ byte header = headers[i];
+ if (header == 0) continue;
+ int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
+ boolean isFree = (header & 1) == 0;
+ result.append("\n block " + i + " at " + offset + ": size "
+ + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+ }
      }

      private int freeListFromHeader(byte header) {
@@ -297,25 +322,30 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
        int headerStep = 1 << freeListIx;
        int splitListIx = freeListIx + 1;
        while (remaining > 0 && splitListIx < freeLists.length) {
- int splitWays = 1 << (splitListIx - freeListIx);
- int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1;
+ int splitWaysLog2 = (splitListIx - freeListIx);
+ assert splitWaysLog2 > 0;
+ int splitWays = 1 << splitWaysLog2; // How many ways each block splits into target size.
+ int lastSplitBlocksRemaining = -1; // How many target-sized blocks remain from last split.
+ int lastSplitNextHeader = -1; // The header index for the beginning of the remainder.
          FreeList splitList = freeLists[splitListIx];
          splitList.lock.lock();
          try {
            int headerIx = splitList.listHead;
            while (headerIx >= 0 && remaining > 0) {
              int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
- int toTake = Math.min(splitWays, remaining);
+ int toTake = Math.min(splitWays, remaining); // We split it splitWays and take toTake.
              remaining -= toTake;
- lastSplitBlocksRemaining = splitWays - toTake;
+ lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
+ // Take toTake blocks by splitting the block at origOffset.
              for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
                headers[headerIx] = headerData;
+ // TODO: this could be done out of the lock, we only need to take the blocks out.
                ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
              }
- lastSplitNextHeader = headerIx;
- headerIx = data.getInt(origOffset + 4);
+ lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
+ headerIx = data.getInt(origOffset + 4); // Get next item from the free list.
            }
- replaceListHeadUnderLock(splitList, headerIx);
+ replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
          } finally {
            splitList.lock.unlock();
          }
@@ -449,6 +479,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
      }
    }

+ private static class Request {
+
+ }
    private static class FreeList {
      ReentrantLock lock = new ReentrantLock(false);
      int listHead = -1; // Index of where the buffer is; in minAllocation units

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java
new file mode 100644
index 0000000..30bf5a9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapOomDebugDump.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+public interface LlapOomDebugDump {
+ String debugDumpForOom();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 4855d46..249ed56 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;

  import com.google.common.annotations.VisibleForTesting;

-public class LowLevelCacheImpl implements LowLevelCache {
+public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
    private static final int DEFAULT_CLEANUP_INTERVAL = 600;
    private final EvictionAwareAllocator allocator;
    private AtomicInteger newEvictions = new AtomicInteger(0);
@@ -308,6 +308,9 @@ public class LowLevelCacheImpl implements LowLevelCache {
        if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
          cachePolicy.notifyUnlock(buffer);
        } else {
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached");
+ }
          allocator.deallocate(buffer);
        }
      }
@@ -469,4 +472,29 @@ public class LowLevelCacheImpl implements LowLevelCache {
    public Allocator getAllocator() {
      return allocator;
    }
+
+ @Override
+ public String debugDumpForOom() {
+ StringBuilder sb = new StringBuilder("File cache state ");
+ for (Map.Entry<Long, FileCache> e : cache.entrySet()) {
+ if (!e.getValue().incRef()) continue;
+ try {
+ sb.append("\n file " + e.getKey());
+ for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
+ if (e2.getValue().incRef() < 0) continue;
+ try {
+ sb.append("\n [").append(e2.getKey()).append(", ")
+ .append(e2.getKey() + e2.getValue().declaredCachedLength)
+ .append(") => ").append(e2.getValue().toString())
+ .append(" alloc ").append(e2.getValue().byteBuffer.position());
+ } finally {
+ e2.getValue().decRef();
+ }
+ }
+ } finally {
+ e.getValue().decRef();
+ }
+ }
+ return sb.toString();
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 405b14f..85f66f8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -99,4 +99,9 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
      } while (!usedMemory.compareAndSet(oldV, oldV - memoryToRelease));
    }

+ @Override
+ public String debugDumpForOom() {
+ return "cache state\n" + evictor.debugDumpForOom();
+ }
+
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index 85cce31..0b50749 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hive.llap.cache;

  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;

-public interface LowLevelCachePolicy {
+public interface LowLevelCachePolicy extends LlapOomDebugDump {
    void cache(LlapCacheableBuffer buffer, Priority priority);
    void notifyLock(LlapCacheableBuffer buffer);
    void notifyUnlock(LlapCacheableBuffer buffer);
    long evictSomeBlocks(long memoryToReserve);
    void setEvictionListener(EvictionListener listener);
+ void setParentDebugDumper(LlapOomDebugDump dumper);
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index 74e975c..a1ed7ea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -35,6 +35,7 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
    private final Lock lock = new ReentrantLock();
    private final LinkedList<LlapCacheableBuffer> buffers;
    private EvictionListener evictionListener;
+ private LlapOomDebugDump parentDebugDump;

    public LowLevelFifoCachePolicy(Configuration conf) {
      if (LlapIoImpl.LOGL.isInfoEnabled()) {
@@ -70,6 +71,11 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
    }

    @Override
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ this.parentDebugDump = dumper;
+ }
+
+ @Override
    public long evictSomeBlocks(long memoryToReserve) {
      long evicted = 0;
      lock.lock();
@@ -88,4 +94,23 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
      }
      return evicted;
    }
+
+ @Override
+ public String debugDumpForOom() {
+ StringBuilder sb = new StringBuilder("FIFO eviction list: ");
+ lock.lock();
+ try {
+ sb.append(buffers.size()).append(" elements): ");
+ Iterator<LlapCacheableBuffer> iter = buffers.iterator();
+ while (iter.hasNext()) {
+ sb.append(iter.next().toStringForCache()).append(",\n");
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (parentDebugDump != null) {
+ sb.append("\n").append(parentDebugDump.debugDumpForOom());
+ }
+ return sb.toString();
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index f7b493d..b43b31d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -64,6 +64,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
    /** Number of elements. */
    private int heapSize = 0;
    private EvictionListener evictionListener;
+ private LlapOomDebugDump parentDebugDump;

    public LowLevelLrfuCachePolicy(Configuration conf) {
      long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
@@ -170,6 +171,12 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
    }

    @Override
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ this.parentDebugDump = dumper;
+ }
+
+
+ @Override
    public long evictSomeBlocks(long memoryToReserve) {
      long evicted = 0;
      // In normal case, we evict the items from the list.
@@ -411,4 +418,13 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
      }
      return result.toString();
    }
+
+ @Override
+ public String debugDumpForOom() {
+ String result = debugDumpHeap();
+ if (parentDebugDump != null) {
+ result += "\n" + parentDebugDump.debugDumpForOom();
+ }
+ return result;
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index d454ec8..8e167ec 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -18,7 +18,7 @@

  package org.apache.hadoop.hive.llap.cache;

-public interface MemoryManager {
+public interface MemoryManager extends LlapOomDebugDump {
    boolean reserveMemory(long memoryToReserve, boolean waitForEviction);
    void releaseMemory(long memUsage);
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 2333331..761aefe 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -102,6 +102,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
        orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
        // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
        cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
+ cachePolicy.setParentDebugDumper(orcCache);
        orcCache.init();
      } else {
        cachePolicy.setEvictionListener(new EvictionDispatcher(null, metadataCache));

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 3bea70f..d35edb7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.fail;

  import java.util.Random;
+import java.util.concurrent.Callable;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.cache.Allocator.LlapCacheOutOfMemoryException;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
  import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
  import org.junit.Test;
@@ -47,21 +49,26 @@ public class TestBuddyAllocator {
      @Override
      public void releaseMemory(long memUsage) {
      }
+
+ @Override
+ public String debugDumpForOom() {
+ return "";
+ }
    }

    @Test
- public void testVariableSizeAllocs() {
+ public void testVariableSizeAllocs() throws Exception {
      testVariableSizeInternal(1, 2, 1);
    }

    @Test
- public void testVariableSizeMultiAllocs() {
+ public void testVariableSizeMultiAllocs() throws Exception {
      testVariableSizeInternal(3, 2, 3);
      testVariableSizeInternal(5, 2, 5);
    }

    @Test
- public void testSameSizes() {
+ public void testSameSizes() throws Exception {
      int min = 3, max = 8, maxAlloc = 1 << max;
      Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc);
      BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
@@ -72,7 +79,7 @@ public class TestBuddyAllocator {
    }

    @Test
- public void testMultipleArenas() {
+ public void testMultipleArenas() throws Exception {
      int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
      Configuration conf = createConf(1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount);
      BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
@@ -88,26 +95,29 @@ public class TestBuddyAllocator {
          LlapDaemonCacheMetrics.create("test", "1"));
      ExecutorService executor = Executors.newFixedThreadPool(3);
      final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1);
- FutureTask<Object> upTask = new FutureTask<Object>(new Runnable() {
- public void run() {
+ FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
          syncThreadStart(cdlIn, cdlOut);
          allocateUp(a, min, max, allocsPerSize, false);
          allocateUp(a, min, max, allocsPerSize, true);
+ return null;
        }
- }, null), downTask = new FutureTask<Object>(new Runnable() {
- public void run() {
+ }), downTask = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
          syncThreadStart(cdlIn, cdlOut);
          allocateDown(a, min, max, allocsPerSize, false);
          allocateDown(a, min, max, allocsPerSize, true);
+ return null;
        }
- }, null), sameTask = new FutureTask<Object>(new Runnable() {
- public void run() {
+ }), sameTask = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
          syncThreadStart(cdlIn, cdlOut);
          for (int i = min; i <= max; ++i) {
            allocSameSize(a, (1 << (max - i)) * allocsPerSize, i);
          }
+ return null;
        }
- }, null);
+ });
      executor.execute(sameTask);
      executor.execute(upTask);
      executor.execute(downTask);
@@ -131,7 +141,8 @@ public class TestBuddyAllocator {
      }
    }

- private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int arenaCount) {
+ private void testVariableSizeInternal(
+ int allocCount, int arenaSizeMult, int arenaCount) throws Exception {
      int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
      Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * arenaCount);
      BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
@@ -144,7 +155,7 @@ public class TestBuddyAllocator {
      allocateDown(a, min, max, allocCount, true);
    }

- private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) {
+ private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) throws Exception {
      LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[allocCount][];
      long[][] testValues = new long[allocCount][];
      for (int j = 0; j < allocCount; ++j) {
@@ -153,8 +164,8 @@ public class TestBuddyAllocator {
      deallocUpOrDown(a, false, allocs, testValues);
    }

- private void allocateUp(
- BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+ private void allocateUp(BuddyAllocator a, int min, int max, int allocPerSize,
+ boolean isSameOrderDealloc) throws Exception {
      int sizes = max - min + 1;
      LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
      // Put in the beginning; relies on the knowledge of internal implementation. Pave?
@@ -165,8 +176,8 @@ public class TestBuddyAllocator {
      deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
    }

- private void allocateDown(
- BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+ private void allocateDown(BuddyAllocator a, int min, int max, int allocPerSize,
+ boolean isSameOrderDealloc) throws Exception {
      int sizes = max - min + 1;
      LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
      // Put in the beginning; relies on the knowledge of internal implementation. Pave?
@@ -178,13 +189,15 @@ public class TestBuddyAllocator {
    }

    private void allocateAndUseBuffer(BuddyAllocator a, LlapMemoryBuffer[][] allocs,
- long[][] testValues, int allocCount, int index, int sizeLog2) {
+ long[][] testValues, int allocCount, int index, int sizeLog2) throws Exception {
      allocs[index] = new LlapMemoryBuffer[allocCount];
      testValues[index] = new long[allocCount];
      int size = (1 << sizeLog2) - 1;
- if (!a.allocateMultiple(allocs[index], size)) {
+ try {
+ a.allocateMultiple(allocs[index], size);
+ } catch (LlapCacheOutOfMemoryException ex) {
        LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDump());
- fail();
+ throw ex;
      }
      // LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
      for (int j = 0; j < allocCount; ++j) {

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 47bdf1e..2c87ec1 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -54,13 +54,12 @@ public class TestLowLevelCacheImpl {

    private static class DummyAllocator implements EvictionAwareAllocator {
      @Override
- public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
        for (int i = 0; i < dest.length; ++i) {
          LlapDataBuffer buf = new LlapDataBuffer();
          buf.initialize(0, null, -1, size);
          dest[i] = buf;
        }
- return true;
      }

      @Override
@@ -101,6 +100,13 @@ public class TestLowLevelCacheImpl {

      public void setEvictionListener(EvictionListener listener) {
      }
+
+ public String debugDumpForOom() {
+ return "";
+ }
+
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ }
    }

    @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 9d769c5..513aedf 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
  import org.junit.Test;

  public class TestOrcMetadataCache {
- private static final Log LOG = LogFactory.getLog(TestOrcMetadataCache.class);
-
    private static class DummyCachePolicy implements LowLevelCachePolicy {
      public DummyCachePolicy() {
      }
@@ -49,6 +47,13 @@ public class TestOrcMetadataCache {

      public void setEvictionListener(EvictionListener listener) {
      }
+
+ public String debugDumpForOom() {
+ return "";
+ }
+
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ }
    }

    private static class DummyMemoryManager implements MemoryManager {
@@ -64,6 +69,11 @@ public class TestOrcMetadataCache {
      public void releaseMemory(long memUsage) {
        --allocs;
      }
+
+ @Override
+ public String debugDumpForOom() {
+ return "";
+ }
    }

    @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 8dc62d8..fc58cf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -35,6 +35,10 @@ public class DebugUtils {
      return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
    }

+ public static boolean isTraceRangesEnabled() {
+ return true; // TODO: temporary, should be hardcoded false
+ }
+
    public static boolean isTraceLockingEnabled() {
      return false;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
index 016f470..e931d09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
@@ -336,12 +336,14 @@ public class EncodedReaderImpl implements EncodedReader {

      // 2. Now, read all of the ranges from cache or disk.
      toRead = new DiskRangeListMutateHelper(listToRead.get());
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
        LOG.info("Resulting disk ranges to read (file " + fileId + "): "
            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }
      cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
        LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
            + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }
@@ -350,7 +352,8 @@ public class EncodedReaderImpl implements EncodedReader {
      RecordReaderUtils.readDiskRanges(
          file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc());

- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
        LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset
              + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }
@@ -369,7 +372,8 @@ public class EncodedReaderImpl implements EncodedReader {
            }
          }
        }
- if (DebugUtils.isTraceOrcEnabled()) {
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
          LOG.info("Disk ranges after pre-read (file " + fileId + ", base offset "
              + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
        }
@@ -454,8 +458,9 @@ public class EncodedReaderImpl implements EncodedReader {
      }
      releaseContexts(colCtxs);

- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Disk ranges after processing all the data "
+ if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+ && LOG.isInfoEnabled()) {
+ LOG.info("Disk ranges after preparing all the data "
            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }


http://git-wip-us.apache.org/repos/asf/hive/blob/8a62fc9c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
index e04ee4d..b7633ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
@@ -756,10 +756,7 @@ public abstract class InStream extends InputStream {
        targetBuffers[ix] = chunk.buffer;
        ++ix;
      }
- boolean canAlloc = cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
+ cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);

      // 4. Now decompress (or copy) the data into cache buffers.
      for (ProcCacheChunk chunk : toDecompress) {
@@ -1017,11 +1014,8 @@ public abstract class InStream extends InputStream {
        cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
        ++ix;
      }
- boolean canAlloc = cache.getAllocator().allocateMultiple(
+ cache.getAllocator().allocateMultiple(
          targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }

      // 4. Now copy the data into cache buffers.
      ix = 0;
@@ -1070,11 +1064,7 @@ public abstract class InStream extends InputStream {
      // We thought we had the entire part to cache, but we don't; convert start to
      // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
      singleAlloc[0] = null;
- boolean canAlloc = cache.getAllocator().allocateMultiple(
- singleAlloc, (int)(candidateEnd - partOffset));
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
+ cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));

      LlapMemoryBuffer buffer = singleAlloc[0];
      cache.notifyReused(buffer);
@@ -1088,11 +1078,7 @@ public abstract class InStream extends InputStream {
    private static TrackedCacheChunk copyAndReplaceUncompressedToNonCached(
        BufferChunk bc, LowLevelCache cache, LlapMemoryBuffer[] singleAlloc) {
      singleAlloc[0] = null;
- boolean canAlloc = cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
- if (!canAlloc) {
- throw new AssertionError("Cannot allocate");
- }
-
+ cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
      LlapMemoryBuffer buffer = singleAlloc[0];
      cache.notifyReused(buffer);
      ByteBuffer dest = buffer.getByteBufferRaw();

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 6, '15 at 12:37a
activeMay 6, '15 at 12:37a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase