FAQ
Author: sershe
Date: Wed Jan 14 02:47:08 2015
New Revision: 1651559

URL: http://svn.apache.org/r1651559
Log:
Finish reworking LRFU policy for low-level cache (not clear if it's a good pick due to concurrency); tests; some pipeline adjustments

Added:
     hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/
     hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
Removed:
     hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java
Modified:
     hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
     hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
     hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 14 02:47:08 2015
@@ -1969,10 +1969,11 @@ public class HiveConf extends Configurat
          "Updates tez job execution progress in-place in the terminal."),

      LLAP_ENABLED("hive.llap.enabled", true, ""),
- LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.minalloc", 128 * 1024, ""),
- LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.minalloc", 16 * 1024 * 1024, ""),
- LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.minalloc", 128L * 1024 * 1024, ""),
- LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.minalloc", 1024L * 1024 * 1024, ""),
+ LLAP_LOW_LEVEL_CACHE("hive.llap.use.lowlevel.cache", true, ""),
+ LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.alloc.min", 128 * 1024, ""),
+ LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.alloc.max", 16 * 1024 * 1024, ""),
+ LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.arena.size", 128 * 1024 * 1024, ""),
+ LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.size", 1024L * 1024 * 1024, ""),
      LLAP_REQUEST_THREAD_COUNT("hive.llap.request.thread.count", 16, ""),
      LLAP_USE_LRFU("hive.llap.use.lrfu", true, ""),
      LLAP_LRFU_LAMBDA("hive.llap.lrfu.lambda", 0.01f, "")

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Wed Jan 14 02:47:08 2015
@@ -18,10 +18,17 @@

  package org.apache.hadoop.hive.llap.io.api;

+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+
  public class EncodedColumn<BatchKey> {
    // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
    // generality, and ability to not copy data from underlying low-level cached buffers.
- public static class ColumnBuffer {}
+ public static class ColumnBuffer {
+ // TODO: given how ORC will allocate, it might make sense to share array between all
+ // returned encodedColumn-s, and store index and length in the array.
+ public LlapMemoryBuffer[] cacheBuffers;
+ public int firstOffset, lastLength;
+ }
    public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) {
      this.batchKey = batchKey;
      this.columnIndex = columnIndex;

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Wed Jan 14 02:47:08 2015
@@ -42,4 +42,7 @@ public interface LowLevelCache {
     * Allocate dest.length new blocks of size into dest.
     */
    void allocateMultiple(LlapMemoryBuffer[] dest, int size);
+
+ void releaseBuffers(LlapMemoryBuffer[] cacheBuffers);
+
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Wed Jan 14 02:47:08 2015
@@ -41,8 +41,9 @@ public final class LlapCacheableBuffer e
    // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object.
    public double priority;
    public long lastUpdate = -1;
- public int indexInHeap = -1;
- public boolean isLockedInHeap; // TODO#: this flag is invalid and not thread safe
+ public LlapCacheableBuffer prev = null, next = null;
+ public int indexInHeap = NOT_IN_CACHE;
+ public static final int IN_LIST = -2, NOT_IN_CACHE = -1;

    @Override
    public int hashCode() {
@@ -60,15 +61,16 @@ public final class LlapCacheableBuffer e
          && this.offset == other.offset && this.length == other.length;
    }

- int lock() {
- int oldRefCount = -1;
+ int incRef() {
+ int newRefCount = -1;
      while (true) {
- oldRefCount = refCount.get();
+ int oldRefCount = refCount.get();
        if (oldRefCount == EVICTED_REFCOUNT) return -1;
        assert oldRefCount >= 0;
- if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break;
+ newRefCount = oldRefCount + 1;
+ if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
      }
- return oldRefCount;
+ return newRefCount;
    }

    public boolean isLocked() {
@@ -81,7 +83,7 @@ public final class LlapCacheableBuffer e
      return refCount.get() == EVICTED_REFCOUNT;
    }

- int unlock() {
+ int decRef() {
      int newRefCount = refCount.decrementAndGet();
      if (newRefCount < 0) {
        throw new AssertionError("Unexpected refCount " + newRefCount);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java Wed Jan 14 02:47:08 2015
@@ -33,8 +33,9 @@ import org.apache.hadoop.hive.llap.io.ap
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
  import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;

+// TODO: refactor the cache and allocator parts?
  public class LowLevelBuddyCache implements LowLevelCache, EvictionListener {
- private final ArrayList<arena> arenas;
+ private final ArrayList<Arena> arenas;
    private AtomicInteger newEvictions = new AtomicInteger(0);
    private final Thread cleanupThread;
    private final ConcurrentHashMap<String, FileCache> cache =
@@ -44,21 +45,21 @@ public class LowLevelBuddyCache implemen
    // Config settings
    private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;

- private final int minAllocation, maxAllocation;
- private final long maxSize, arenaSize;
-
+ private final int minAllocation, maxAllocation, arenaSize;
+ private final long maxSize;
+
    public LowLevelBuddyCache(Configuration conf) {
      minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
      maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
- arenaSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+ arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
      maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
- if (maxSize < arenaSize || arenaSize > maxAllocation || maxAllocation < minAllocation) {
+ if (maxSize < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) {
        throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
            + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize);
      }
      if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
(Long.bitCount(arenaSize) != 1) || (minAllocation == 1)) {
- // TODO: technically, arena size is not required to be so; needs to be divisible by maxAlloc
+ // TODO: technically, arena size only needs to be divisible by maxAlloc
        throw new AssertionError("Allocation and arena sizes must be powers of two > 1: "
            + minAllocation + ", " + maxAllocation + ", " + arenaSize);
      }
@@ -70,11 +71,11 @@ public class LowLevelBuddyCache implemen
      maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
      arenaSizeLog2 = 31 - Long.numberOfLeadingZeros(arenaSize);
      maxArenas = (int)(maxSize / arenaSize);
- arenas = new ArrayList<arena>(maxArenas);
+ arenas = new ArrayList<Arena>(maxArenas);
      for (int i = 0; i < maxArenas; ++i) {
- arenas.add(new arena());
+ arenas.add(new Arena());
      }
- arenas.get(0).init();
+ arenas.get(0).init(arenaSize, maxAllocation, arenaSizeLog2, minAllocLog2, maxAllocLog2);
      cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
          ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
          : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
@@ -91,7 +92,7 @@ public class LowLevelBuddyCache implemen
      freeListIndex = Math.max(freeListIndex - minAllocLog2, 0);
      int allocationSize = 1 << (freeListIndex + minAllocLog2);
      int total = dest.length * allocationSize;
- cachePolicy.reserveMemory(total);
+ cachePolicy.reserveMemory(total, true);

      int ix = 0;
      for (int i = 0; i < dest.length; ++i) {
@@ -99,27 +100,27 @@ public class LowLevelBuddyCache implemen
        dest[i] = new LlapCacheableBuffer(null, -1, -1); // TODO: pool of objects?
      }
      // TODO: instead of waiting, loop only ones we haven't tried w/tryLock?
- for (arena block : arenas) {
+ for (Arena block : arenas) {
        int newIx = allocateFast(block, freeListIndex, dest, ix, allocationSize);
        if (newIx == -1) break;
        if (newIx == dest.length) return;
        ix = newIx;
      }
      // Then try to split bigger blocks.
- for (arena block : arenas) {
+ for (Arena block : arenas) {
        int newIx = allocateWithSplit(block, freeListIndex, dest, ix, allocationSize);
        if (newIx == -1) break;
        if (newIx == dest.length) return;
        ix = newIx;
      }
      // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
- for (arena block : arenas) {
+ for (Arena block : arenas) {
        ix = allocateWithExpand(block, freeListIndex, dest, ix, allocationSize);
        if (ix == dest.length) return;
      }
    }

- private int allocateFast(arena block,
+ private int allocateFast(Arena block,
        int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
      if (block.data == null) return -1; // not allocated yet
      FreeList freeList = block.freeLists[freeListIndex];
@@ -133,7 +134,7 @@ public class LowLevelBuddyCache implemen
    }

    private int allocateWithSplit(
- arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+ Arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) {
      if (arena.data == null) return -1; // not allocated yet
      FreeList freeList = arena.freeLists[freeListIndex];
      int remaining = -1;
@@ -206,7 +207,7 @@ public class LowLevelBuddyCache implemen
      return lastSplitNextHeader << minAllocLog2;
    }

- public int allocateFromFreeListUnderLock(arena block, FreeList freeList,
+ public int allocateFromFreeListUnderLock(Arena block, FreeList freeList,
        int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
      int current = freeList.listHead;
      while (current >= 0 && ix < dest.length) {
@@ -222,15 +223,15 @@ public class LowLevelBuddyCache implemen
    }

    private int allocateWithExpand(
- arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
- if (block.data != null) return ix; // already allocated
- synchronized (block) {
+ Arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+ if (arena.data != null) return ix; // already allocated
+ synchronized (arena) {
        // Never goes from non-null to null, so this is the only place we need sync.
- if (block.data == null) {
- block.init();
+ if (arena.data == null) {
+ arena.init(arenaSize, maxAllocation, arenaSizeLog2, minAllocLog2, maxAllocLog2);
        }
      }
- return allocateWithSplit(block, freeListIndex, dest, ix, size);
+ return allocateWithSplit(arena, freeListIndex, dest, ix, size);
    }

    @Override
@@ -262,8 +263,8 @@ public class LowLevelBuddyCache implemen
    }

    private boolean lockBuffer(LlapCacheableBuffer buffer) {
- int rc = buffer.lock();
- if (rc == 0) {
+ int rc = buffer.incRef();
+ if (rc == 1) {
        cachePolicy.notifyLock(buffer);
      }
      return rc >= 0;
@@ -282,7 +283,11 @@ public class LowLevelBuddyCache implemen
          assert buffer.isLocked();
          while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
            LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
- if (oldVal == null) break; // Cached successfully.
+ if (oldVal == null) {
+ // Cached successfully, add to policy.
+ cachePolicy.cache(buffer);
+ break;
+ }
            if (DebugUtils.isTraceCachingEnabled()) {
              LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
                  + fileName + "@" + offset + "; old " + oldVal + ", new " + buffer);
@@ -297,7 +302,7 @@ public class LowLevelBuddyCache implemen
              result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
              break;
            }
- // We found some old value but couldn't lock it; remove it.
+ // We found some old value but couldn't incRef it; remove it.
            subCache.cache.remove(offset, oldVal);
          }
        }
@@ -349,15 +354,22 @@ public class LowLevelBuddyCache implemen
      releaseBufferInternal((LlapCacheableBuffer)buffer);
    }

+ @Override
+ public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) {
+ for (int i = 0; i < cacheBuffers.length; ++i) {
+ releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]);
+ }
+ }
+
    public void releaseBufferInternal(LlapCacheableBuffer buffer) {
- if (buffer.unlock() == 0) {
+ if (buffer.decRef() == 0) {
        cachePolicy.notifyUnlock(buffer);
        unblockEviction();
      }
    }

    public static LlapCacheableBuffer allocateFake() {
- return new LlapCacheableBuffer(null, -1, -1);
+ return new LlapCacheableBuffer(null, -1, 1);
    }

    public void unblockEviction() {
@@ -446,9 +458,9 @@ public class LowLevelBuddyCache implemen
      }
    }

- private class arena {
- void init() {
- data = ByteBuffer.allocateDirect(maxAllocation);
+ private static class Arena {
+ void init(int arenaSize, int maxAlloc, int arenaSizeLog2, int minAllocLog2, int maxAllocLog2) {
+ data = ByteBuffer.allocateDirect(arenaSize);
        int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
        headers = new byte[maxMinAllocs];
        int allocLog2Diff = maxAllocLog2 - minAllocLog2;
@@ -459,7 +471,7 @@ public class LowLevelBuddyCache implemen
        int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2),
            headerIndex = 0, headerIncrement = 1 << allocLog2Diff;
        freeLists[maxAllocLog2 - 1].listHead = 0;
- for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
+ for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAlloc) {
          // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
          headers[headerIndex] = (byte)(allocLog2Diff << 1); // Maximum allocation size
          data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerIncrement));
@@ -482,7 +494,6 @@ public class LowLevelBuddyCache implemen
      // However, we are trying to increase fragmentation now, since we cater to single-size.
    }

- // TODO##: separate the classes?
    private static class FileCache {
      private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
      // TODO: given the specific data, perhaps the nested thing should not be CHM

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Wed Jan 14 02:47:08 2015
@@ -22,5 +22,5 @@ public interface LowLevelCachePolicy {
    void cache(LlapCacheableBuffer buffer);
    void notifyLock(LlapCacheableBuffer buffer);
    void notifyUnlock(LlapCacheableBuffer buffer);
- void reserveMemory(long total);
+ boolean reserveMemory(long memoryToReserve, boolean oneEviction);
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Wed Jan 14 02:47:08 2015
@@ -32,7 +32,7 @@ public abstract class LowLevelCachePolic
    }

    @Override
- public void reserveMemory(long memoryToReserve) {
+ public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) {
      // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
      while (memoryToReserve > 0) {
        long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve;
@@ -42,16 +42,18 @@ public abstract class LowLevelCachePolic
        }
        // TODO: for one-block case, we could move notification for the last block out of the loop.
        long evicted = evictSomeBlocks(memoryToReserve, evictionListener);
+ if (!waitForEviction && evicted == 0) return false;
        // Adjust the memory - we have to account for what we have just evicted.
        while (true) {
          long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted);
- if (usedMemory.compareAndSet(usedMem, usedMem + reserveWithEviction)) {
+ if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reserveWithEviction)) {
            memoryToReserve -= reserveWithEviction;
            break;
          }
          usedMem = usedMemory.get();
        }
      }
+ return true;
    }

    protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Wed Jan 14 02:47:08 2015
@@ -18,8 +18,8 @@

  package org.apache.hadoop.hive.llap.cache;

-import java.util.Iterator;
  import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;

  import org.apache.commons.lang.StringUtils;
  import org.apache.hadoop.conf.Configuration;
@@ -27,14 +27,11 @@ import org.apache.hadoop.hive.conf.HiveC
  import org.apache.hadoop.hive.llap.DebugUtils;
  import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;

-import com.google.common.annotations.VisibleForTesting;
-
  /**
- * Implementation of the "simple" algorithm from "On the Existence of a Spectrum of Policies
+ * Implementation of the algorithm from "On the Existence of a Spectrum of Policies
   * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
- * TODO: fix this, no longer true; with ORC as is, 4k buffers per gig of cache
- * We expect the number of buffers to be relatively small (1000s), so we just use one heap.
- **/
+ * Additionally, buffer locking has to be handled (locked buffer cannot be evicted).
+ */
  public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase {
    private final double lambda;
    private final double f(long x) {
@@ -50,124 +47,162 @@ public class LowLevelLrfuCachePolicy ext

    private final AtomicLong timer = new AtomicLong(0);
    /**
- * The heap. Currently synchronized on itself; there is a number of papers out there
- * with various lock-free/efficient priority queues which we can use if needed.
+ * The heap and list. Currently synchronized on the object, which is not good. If this becomes
+ * a problem (which it probably will), we can partition the cache policy, or use some better
+ * structure. Heap should not be locked while holding the lock on list.
+ * As of now, eviction in most cases will only need the list; locking doesn't do anything;
+ * unlocking actually places item in evictable cache - unlocking is done after processing,
+ * so this most expensive part (and only access to heap in most cases) will not affect it.
+ * Perhaps we should use ConcurrentDoubleLinkedList (in public domain).
+ * ONLY LIST REMOVAL is allowed under list lock.
     */
    private final LlapCacheableBuffer[] heap;
+ private final ReentrantLock listLock = new ReentrantLock();
+ private LlapCacheableBuffer listHead, listTail;
    /** Number of elements. */
    private int heapSize = 0;

    public LowLevelLrfuCachePolicy(Configuration conf,
        long minBufferSize, long maxCacheSize, EvictionListener listener) {
      super(maxCacheSize, listener);
- heap = new LlapCacheableBuffer[(int)Math.ceil((maxCacheSize * 1.0) / minBufferSize)];
      lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+ int maxBuffers = (int)Math.ceil((maxCacheSize * 1.0) / minBufferSize);
+ int maxHeapSize = -1;
+ if (lambda == 0) {
+ maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
+ } else {
+ int lrfuThreshold = (int)((Math.log(1 - Math.pow(0.5, lambda)) / Math.log(0.5)) / lambda);
+ maxHeapSize = Math.min(lrfuThreshold, maxBuffers);
+ }
+ heap = new LlapCacheableBuffer[maxHeapSize];
+ listHead = listTail = null;
    }

    @Override
    public void cache(LlapCacheableBuffer buffer) {
- buffer.lastUpdate = timer.incrementAndGet();
- buffer.priority = F0;
+ // LRFU cache policy doesn't store locked blocks. When we cache, the block is locked, so
+ // we simply do nothing here. The fact that it was never updated will allow us to add it
+ // properly on the first notifyUnlock.
      assert buffer.isLocked();
- buffer.isLockedInHeap = true;
- synchronized (heap) {
- // Ensured by reserveMemory.
- assert heapSize < heap.length : heap.length + " >= " + heapSize;
- buffer.indexInHeap = heapSize;
- heapifyUpUnderLock(buffer, buffer.lastUpdate);
- if (DebugUtils.isTraceEnabled()) {
- LlapIoImpl.LOG.info(buffer + " inserted at " + buffer.lastUpdate);
- }
- ++heapSize;
- }
    }

    @Override
    public void notifyLock(LlapCacheableBuffer buffer) {
- long time = timer.get();
- synchronized (heap) {
- buffer.isLockedInHeap = true;
- heapifyDownUnderLock(buffer, time);
- }
+ // We do not proactively remove locked items from the heap, and opportunistically try to
+ // remove from the list (since eviction is mostly from the list). If eviction stumbles upon
+ // a locked item in either, it will remove it from cache; when we unlock, we are going to
+ // put it back or update it, depending on whether this has happened. This should cause
+ // most of the expensive cache update work to happen in unlock, not blocking processing.
+ if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return;
+ if (!listLock.tryLock()) return;
+ removeFromListAndUnlock(buffer);
    }

    @Override
    public void notifyUnlock(LlapCacheableBuffer buffer) {
      long time = timer.incrementAndGet();
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Touching " + buffer + " at " + time);
+ }
      synchronized (heap) {
- if (DebugUtils.isTraceCachingEnabled()) {
- LlapIoImpl.LOG.info("Touching " + buffer + " at " + time);
- }
- buffer.priority = touchPriority(time, buffer.lastUpdate, buffer.priority);
+ // First, update buffer priority - we have just been using it.
+ buffer.priority = (buffer.lastUpdate == -1) ? F0
+ : touchPriority(time, buffer.lastUpdate, buffer.priority);
        buffer.lastUpdate = time;
- buffer.isLockedInHeap = false;
- // Buffer's priority just decreased from boosted lock priority, so move up.
- heapifyUpUnderLock(buffer, time);
+ // Then, if the buffer was in the list, remove it.
+ if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) {
+ listLock.lock();
+ removeFromListAndUnlock(buffer);
+ }
+ // The only concurrent change that can happen when we hold the heap lock is list removal;
+ // we have just ensured the item is not in the list, so we have a definite state now.
+ if (buffer.indexInHeap >= 0) {
+ // The buffer has lived in the heap all along. Restore heap property.
+ heapifyDownUnderLock(buffer, time);
+ } else if (heapSize == heap.length) {
+ // The buffer is not in the (full) heap. Demote the top item of the heap into the list.
+ LlapCacheableBuffer demoted = heap[0];
+ synchronized (listLock) {
+ demoted.indexInHeap = LlapCacheableBuffer.IN_LIST;
+ demoted.prev = null;
+ if (listHead != null) {
+ demoted.next = listHead;
+ listHead.prev = demoted;
+ listHead = demoted;
+ } else {
+ listHead = listTail = demoted;
+ demoted.next = null;
+ }
+ }
+ // Now insert the buffer in its place and restore heap property.
+ buffer.indexInHeap = 0;
+ heapifyDownUnderLock(buffer, time);
+ } else {
+ // Heap is not full, add the buffer to the heap and restore heap property up.
+ assert heapSize < heap.length : heap.length + " < " + heapSize;
+ buffer.indexInHeap = heapSize;
+ heapifyUpUnderLock(buffer, time);
+ ++heapSize;
+ }
      }
    }

- private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
- if (heapSize == 0) return null;
- LlapCacheableBuffer result = heap[0];
- if (!result.invalidate()) {
- // We boost the priority of locked buffers to a very large value;
- // this means entire heap is locked. TODO: need to work around that for small pools?
- if (DebugUtils.isTraceCachingEnabled()) {
- LlapIoImpl.LOG.info("Failed to invalidate head " + result.toString() + "; size = " + heapSize);
+ @Override
+ protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+ long evicted = 0;
+ // In normal case, we evict the items from the list.
+ LlapCacheableBuffer nextCandidate, firstCandidate;
+ listLock.lock();
+ try {
+ nextCandidate = firstCandidate = listTail;
+ while (evicted < memoryToReserve && nextCandidate != null) {
+ if (!nextCandidate.invalidate()) {
+ // Locked buffer was in the list - just drop it; will be re-added on unlock.
+ LlapCacheableBuffer lockedBuffer = nextCandidate;
+ nextCandidate = nextCandidate.prev;
+ removeFromListUnderLock(lockedBuffer);
+ continue;
+ }
+ // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us.
+ // TODO#: double check this is valid!
+ nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+ evicted += nextCandidate.length;
        }
- return null;
- }
- if (DebugUtils.isTraceCachingEnabled()) {
- LlapIoImpl.LOG.info("Evicting " + result + " at " + time);
- }
- result.indexInHeap = -1;
- --heapSize;
- LlapCacheableBuffer newRoot = heap[heapSize];
- newRoot.indexInHeap = 0;
- if (newRoot.lastUpdate != time && !newRoot.isLockedInHeap) {
- newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority);
- newRoot.lastUpdate = time;
+ if (firstCandidate != nextCandidate) {
+ if (nextCandidate == null) {
+ listHead = listTail = null; // We have evicted the entire list.
+ } else {
+ // Splice the section that we have evicted out of the list.
+ removeFromListUnderLock(nextCandidate.next, firstCandidate);
+ }
+ }
+ } finally {
+ listLock.unlock();
      }
- heapifyDownUnderLock(newRoot, time);
- return result;
- }
-
- private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) {
- // Relative positions of the blocks don't change over time; priorities we expire can only
- // decrease; we only have one block that could have broken heap rule and we always move it
- // down; therefore, we can update priorities of other blocks as we go for part of the heap -
- // we correct any discrepancy w/the parent after expiring priority, and any block we expire
- // the priority for already has lower priority than that of its children.
- // TODO: avoid expiring priorities if times are close? might be needlessly expensive.
- int ix = buffer.indexInHeap;
- double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
- while (true) {
- int leftIx = (ix << 1) + 1, rightIx = leftIx + 1;
- if (leftIx >= heapSize) break; // Buffer is at the leaf node.
- LlapCacheableBuffer left = heap[leftIx], right = null;
- if (rightIx < heapSize) {
- right = heap[rightIx];
- }
- double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time);
- if (priority <= leftPri && priority <= rightPri) break;
- if (leftPri <= rightPri) { // prefer left, cause right might be missing
- heap[ix] = left;
- left.indexInHeap = ix;
- ix = leftIx;
- } else {
- heap[ix] = right;
- right.indexInHeap = ix;
- ix = rightIx;
+ while (firstCandidate != nextCandidate) {
+ listener.notifyEvicted(firstCandidate);
+ firstCandidate = firstCandidate.prev;
+ }
+ if (evicted >= memoryToReserve) return evicted;
+ // This should not happen unless we are evicting a lot at once, or buffers are large (so
+ // there's a small number of buffers and they all live in the heap).
+ long time = timer.get();
+ while (evicted < memoryToReserve) {
+ LlapCacheableBuffer buffer = null;
+ synchronized (heap) {
+ buffer = evictFromHeapUnderLock(time);
        }
+ if (buffer == null) return evicted;
+ evicted += buffer.length;
+ listener.notifyEvicted(buffer);
      }
- buffer.indexInHeap = ix;
- heap[ix] = buffer;
+ return evicted;
    }

    private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) {
      // See heapifyDown comment.
      int ix = buffer.indexInHeap;
- double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
+ double priority = buffer.priority;
      while (true) {
        if (ix == 0) break; // Buffer is at the top of the heap.
        int parentIx = (ix - 1) >>> 1;
@@ -182,19 +217,140 @@ public class LowLevelLrfuCachePolicy ext
      heap[ix] = buffer;
    }

+ // Note: almost never called (unless buffers are very large or we evict a lot).
+ private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
+ while (true) {
+ if (heapSize == 0) return null;
+ LlapCacheableBuffer result = heap[0];
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Evicting " + result + " at " + time);
+ }
+ result.indexInHeap = -1;
+ --heapSize;
+ boolean canEvict = result.invalidate();
+ if (heapSize > 0) {
+ LlapCacheableBuffer newRoot = heap[heapSize];
+ newRoot.indexInHeap = 0;
+ if (newRoot.lastUpdate != time) {
+ newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority);
+ newRoot.lastUpdate = time;
+ }
+ heapifyDownUnderLock(newRoot, time);
+ }
+ if (canEvict) return result;
+ // Otherwise we just removed a locked item from heap; unlock will re-add it, we continue.
+ }
+ }
+
+ private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) {
+ // Relative positions of the blocks don't change over time; priorities we expire can only
+ // decrease; we only have one block that could have broken heap rule and we always move it
+ // down; therefore, we can update priorities of other blocks as we go for part of the heap -
+ // we correct any discrepancy w/the parent after expiring priority, and any block we expire
+ // the priority for already has lower priority than that of its children.
+ // TODO: avoid expiring priorities if times are close? might be needlessly expensive.
+ int ix = buffer.indexInHeap;
+ double priority = buffer.priority;
+ while (true) {
+ int newIx = moveMinChildUp(ix, time, priority);
+ if (newIx == -1) break;
+ ix = newIx;
+ }
+ buffer.indexInHeap = ix;
+ heap[ix] = buffer;
+ }
+
+ /**
+ * Moves the minimum child of targetPos block up to targetPos; optionally compares priorities
+ * and terminates if targetPos element has lesser value than either of its children.
+ * @return the index of the child that was moved up; -1 if nothing was moved due to absence
+ * of the children, or a failed priority check.
+ */
+ private int moveMinChildUp(int targetPos, long time, double comparePri) {
+ int leftIx = (targetPos << 1) + 1, rightIx = leftIx + 1;
+ if (leftIx >= heapSize) return -1; // Buffer is at the leaf node.
+ LlapCacheableBuffer left = heap[leftIx], right = null;
+ if (rightIx < heapSize) {
+ right = heap[rightIx];
+ }
+ double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time);
+ if (comparePri >= 0 && comparePri <= leftPri && comparePri <= rightPri) {
+ return -1;
+ }
+ if (leftPri <= rightPri) { // prefer left, cause right might be missing
+ heap[targetPos] = left;
+ left.indexInHeap = targetPos;
+ return leftIx;
+ } else {
+ heap[targetPos] = right;
+ right.indexInHeap = targetPos;
+ return rightIx;
+ }
+ }
+
    private double getHeapifyPriority(LlapCacheableBuffer buf, long time) {
- if (buf == null || buf.isLockedInHeap) return Double.MAX_VALUE;
- if (buf.lastUpdate != time) {
+ if (buf == null) return Double.MAX_VALUE;
+ if (buf.lastUpdate != time && time >= 0) {
        buf.priority = expirePriority(time, buf.lastUpdate, buf.priority);
        buf.lastUpdate = time;
      }
      return buf.priority;
    }

+ private void removeFromListAndUnlock(LlapCacheableBuffer buffer) {
+ try {
+ if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) return;
+ removeFromListUnderLock(buffer);
+ buffer.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+ } finally {
+ listLock.unlock();
+ }
+ }
+
+ private void removeFromListUnderLock(LlapCacheableBuffer buffer) {
+ if (buffer == listTail) {
+ listTail = buffer.prev;
+ } else {
+ buffer.next.prev = buffer.prev;
+ }
+ if (buffer == listHead) {
+ listHead = buffer.next;
+ } else {
+ buffer.prev.next = buffer.next;
+ }
+ }
+
+ private void removeFromListUnderLock(LlapCacheableBuffer from, LlapCacheableBuffer to) {
+ if (to == listTail) {
+ listTail = from.prev;
+ } else {
+ to.next.prev = from.prev;
+ }
+ if (from == listHead) {
+ listHead = to.next;
+ } else {
+ from.prev.next = to.next;
+ }
+ }
+
    public String debugDumpHeap() {
- if (heapSize == 0) return "<empty>";
+ StringBuilder result = new StringBuilder("List: ");
+ if (listHead == null) {
+ result.append("<empty>");
+ } else {
+ LlapCacheableBuffer listItem = listHead;
+ while (listItem != null) {
+ result.append(listItem.toStringForCache()).append(" -> ");
+ listItem = listItem.next;
+ }
+ }
+ result.append("\nHeap:");
+ if (heapSize == 0) {
+ result.append(" <empty>\n");
+ return result.toString();
+ }
+ result.append("\n");
      int levels = 32 - Integer.numberOfLeadingZeros(heapSize);
- StringBuilder result = new StringBuilder();
      int ix = 0;
      int spacesCount = heap[0].toStringForCache().length() + 3;
      String full = StringUtils.repeat(" ", spacesCount),
@@ -230,23 +386,4 @@ public class LowLevelLrfuCachePolicy ext
      }
      return result.toString();
    }
-
- @VisibleForTesting
- public LlapCacheableBuffer evictOneMoreBlock() {
- synchronized (heap) {
- return evictFromHeapUnderLock(timer.get());
- }
- }
-
- @Override
- protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
- long evicted = 0;
- while (evicted < memoryToReserve) {
- LlapCacheableBuffer buffer = evictOneMoreBlock();
- if (buffer == null) return evicted;
- evicted += buffer.length;
- listener.notifyEvicted(buffer);
- }
- return evicted;
- }
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java Wed Jan 14 02:47:08 2015
@@ -33,7 +33,6 @@ public interface VectorReader {
    public static class ColumnVectorBatch {
      public ColumnVector[] cols;
      public int size;
- public List<ColumnBuffer> lockedBuffers;
    }
    public ColumnVectorBatch next() throws InterruptedException, IOException;
    public void close() throws IOException;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Wed Jan 14 02:47:08 2015
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configurable;
  import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.cache.Cache;
  import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache;
  import org.apache.hadoop.hive.llap.cache.NoopCache;
@@ -52,8 +53,11 @@ public class LlapIoImpl implements LlapI

    private LlapIoImpl(Configuration conf) throws IOException {
      this.conf = conf;
- Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); // High-level cache not supported yet.
- this.edp = new OrcEncodedDataProducer(new LowLevelBuddyCache(conf), cache, conf);
+ boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
+ // High-level cache not supported yet.
+ Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
+ LowLevelBuddyCache orcCache = useLowLevelCache ? new LowLevelBuddyCache(conf) : null;
+ this.edp = new OrcEncodedDataProducer(orcCache, cache, conf);
      this.cvp = new OrcColumnVectorProducer(edp, conf);
    }


Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Wed Jan 14 02:47:08 2015
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.Consu
  import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
  import org.apache.hadoop.hive.llap.io.api.VectorReader.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
  import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
  import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -105,6 +106,10 @@ public abstract class ColumnVectorProduc
        }
        if (0 == colsRemaining) {
          ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch, downstreamConsumer);
+ // Batch has been decoded; unlock the buffers in cache
+ for (ColumnBuffer cb : targetBatch.columnDatas) {
+ upstreamFeedback.returnData(cb);
+ }
        }
      }

@@ -133,10 +138,7 @@ public abstract class ColumnVectorProduc

      @Override
      public void returnData(ColumnVectorBatch data) {
- // TODO#: this should happen earlier, when data is decoded buffers are not needed
- for (ColumnBuffer lockedBuffer : data.lockedBuffers) {
- upstreamFeedback.returnData(lockedBuffer);
- }
+ // TODO: column vectors could be added to object pool here
      }

      private void dicardPendingData(boolean isStopped) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Wed Jan 14 02:47:08 2015
@@ -48,10 +48,11 @@ import org.apache.hadoop.mapred.InputSpl

  public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> {
    private FileSystem cachedFs = null;
- private final LowLevelCache lowLevelCache;
    private Configuration conf;
    private OrcMetadataCache metadataCache;
+ // TODO: it makes zero sense to have both at the same time and duplicate data. Add "cache mode".
    private final Cache<OrcCacheKey> cache;
+ private final LowLevelCache lowLevelCache;

    private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>,
      Consumer<EncodedColumn<OrcBatchKey>> {
@@ -112,11 +113,11 @@ public class OrcEncodedDataProducer impl
        }
        determineWhatToRead(stripes);
        if (isStopped) return;
- List<Integer>[] stripeColumnsToRead = produceDataFromCache();
+ List<Integer>[] stripeColsToRead = produceDataFromCache();
        // readState now contains some 1s for column x rgs that were fetched from cache.
        // TODO: I/O threadpool would be here; for now, linear and inefficient
        for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
- List<Integer> colsToRead = stripeColumnsToRead[stripeIxMod];
+ List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
          long[][] colRgs = readState[stripeIxMod];
          if (colsToRead == null) {
            colsToRead = columnIds;
@@ -139,8 +140,10 @@ public class OrcEncodedDataProducer impl
            orcReader = createOrcReader(split);
          }
          RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes);
- // We pass in the already-filtered RGs, as well as sarg. ORC can apply additional filtering.
- stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, lowLevelCache);
+ // In case if we have high-level cache, we will intercept the data and add it there;
+ // otherwise just pass the data directly to the consumer.
+ Consumer<EncodedColumn<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
+ stripeReader.readEncodedColumns(colRgs, rgCount, consumer, lowLevelCache);
          stripeReader.close();
        }

@@ -152,13 +155,11 @@ public class OrcEncodedDataProducer impl

      @Override
      public void returnData(ColumnBuffer data) {
- // TODO#: return the data to cache (unlock)
+ lowLevelCache.releaseBuffers(data.cacheBuffers);
      }

      private void determineWhatToRead(List<StripeInformation> stripes) {
- // The unit of caching for ORC is (stripe x column) (see OrcBatchKey). Note that we do not use
- // SARG anywhere, because file-level filtering on sarg is already performed during split
- // generation, and stripe-level filtering to get row groups is not very helpful right now.
+ // The unit of caching for ORC is (stripe x column) (see OrcBatchKey).
        long offset = split.getStart(), maxOffset = offset + split.getLength();
        stripeIxFrom = stripeIxTo = -1;
        int stripeIx = 0;
@@ -208,6 +209,7 @@ public class OrcEncodedDataProducer impl
            readState[i][j] = new long[bitmaskSize];
          }
        }
+ // TODO: HERE, we need to apply sargs and mark RGs that are filtered as 1s
        rgsPerStripe = new int[stripeRgCounts.size()];
        for (int i = 0; i < rgsPerStripe.length; ++i) {
           rgsPerStripe[i] = stripeRgCounts.get(i);
@@ -215,11 +217,10 @@ public class OrcEncodedDataProducer impl
      }

      // TODO: split by stripe? we do everything by stripe, and it might be faster
- // TODO: return type provisional depending on ORC API
      private List<Integer>[] produceDataFromCache() {
- // Assumes none of the columns are fetched, because we always do this before reading.
+ if (cache == null) return null;
        OrcCacheKey key = new OrcCacheKey(internedFilePath, -1, -1, -1);
- @SuppressWarnings("unchecked") // Grr, no generics arrays, "J" in "Java" stands for "joke".
+ @SuppressWarnings("unchecked") // No generics arrays - "J" in "Java" stands for "joke".
        List<Integer>[] stripeColsNotInCache = new List[readState.length];
        for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
          key.stripeIx = stripeIxFrom + stripeIxMod;
@@ -230,6 +231,8 @@ public class OrcEncodedDataProducer impl
            long[] doneMask = cols[colIxMod];
            boolean areAllRgsInCache = true;
            for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
+ int maskIndex = rgIx >>> 6, maskBit = 1 << (rgIx & 63);
+ if ((doneMask[maskIndex] & maskBit) != 0) continue; // RG eliminated by SARG
              key.rgIx = rgIx;
              ColumnBuffer cached = cache.get(key);
              if (cached == null) {
@@ -240,7 +243,7 @@ public class OrcEncodedDataProducer impl
              EncodedColumn<OrcBatchKey> col = new EncodedColumn<OrcBatchKey>(
                  key.copyToPureBatchKey(), key.colIx, cached);
              consumer.consumeData(col);
- doneMask[rgIx >>> 6] |= 1 << (rgIx & 63);
+ doneMask[maskIndex] = doneMask[maskIndex] | maskBit;
            }
            boolean hasFetchList = stripeColsNotInCache[stripeIxMod] != null;
            if (areAllRgsInCache) {
@@ -273,10 +276,11 @@ public class OrcEncodedDataProducer impl
      @Override
      public void consumeData(EncodedColumn<OrcBatchKey> data) {
        // Store object in cache; create new key object - cannot be reused.
+ assert cache != null;
        OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
        ColumnBuffer cached = cache.cacheOrGet(key, data.columnData);
        if (data.columnData != cached) {
- // TODO: deallocate columnData
+ lowLevelCache.releaseBuffers(data.columnData.cacheBuffers);
          data.columnData = cached;
        }
        consumer.consumeData(data);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Wed Jan 14 02:47:08 2015
@@ -98,7 +98,7 @@ public class LLAPRecordReaderImpl extend
    }

    @Override
- public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
+ public void readEncodedColumns(long[][] colRgs, int rgCount,
        Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) {

    }

Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1651559&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Wed Jan 14 02:47:08 2015
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assume;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLowLevelLrfuCachePolicy {
+ private static final Log LOG = LogFactory.getLog(TestLowLevelLrfuCachePolicy.class);
+
+ @Test
+ public void testHeapSize2() {
+ testHeapSize(2);
+ }
+
+ @Test
+ public void testHeapSize7() {
+ testHeapSize(7);
+ }
+
+ @Test
+ public void testHeapSize8() {
+ testHeapSize(8);
+ }
+
+ @Test
+ public void testHeapSize30() {
+ testHeapSize(30);
+ }
+
+ private class EvictionTracker implements EvictionListener {
+ public List<LlapCacheableBuffer> evicted = new ArrayList<LlapCacheableBuffer>();
+
+ @Override
+ public void notifyEvicted(LlapCacheableBuffer buffer) {
+ evicted.add(buffer);
+ }
+ }
+
+ @Test
+ public void testLfuExtreme() {
+ int heapSize = 4;
+ LOG.info("Testing lambda 0 (LFU)");
+ Random rdm = new Random(1234);
+ HiveConf conf = new HiveConf();
+ ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
+ EvictionTracker et = new EvictionTracker();
+ LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
+ for (int i = 0; i < heapSize; ++i) {
+ LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ assertTrue(cache(lfu, et, buffer));
+ inserted.add(buffer);
+ }
+ Collections.shuffle(inserted, rdm);
+ // LFU extreme, order of accesses should be ignored, only frequency matters.
+ // We touch first elements later, but do it less times, so they will be evicted first.
+ for (int i = inserted.size() - 1; i >= 0; --i) {
+ for (int j = 0; j < i + 1; ++j) {
+ lfu.notifyLock(inserted.get(i));
+ lfu.notifyUnlock(inserted.get(i));
+ }
+ }
+ verifyOrder(lfu, et, inserted);
+ }
+
+ @Test
+ public void testLruExtreme() {
+ int heapSize = 4;
+ LOG.info("Testing lambda 1 (LRU)");
+ Random rdm = new Random(1234);
+ HiveConf conf = new HiveConf();
+ ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+ EvictionTracker et = new EvictionTracker();
+ LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
+ for (int i = 0; i < heapSize; ++i) {
+ LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ assertTrue(cache(lru, et, buffer));
+ inserted.add(buffer);
+ }
+ Collections.shuffle(inserted, rdm);
+ // LRU extreme, frequency of accesses should be ignored, only order matters.
+ for (int i = 0; i < inserted.size(); ++i) {
+ for (int j = 0; j < (inserted.size() - i); ++j) {
+ lru.notifyLock(inserted.get(i));
+ lru.notifyUnlock(inserted.get(i));
+ }
+ }
+ verifyOrder(lru, et, inserted);
+ }
+
+ @Test
+ public void testDeadlockResolution() {
+ int heapSize = 4;
+ LOG.info("Testing deadlock resolution");
+ ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+ EvictionTracker et = new EvictionTracker();
+ LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 1, heapSize, et);
+ for (int i = 0; i < heapSize; ++i) {
+ LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ assertTrue(cache(lrfu, et, buffer));
+ inserted.add(buffer);
+ }
+ // Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
+ LlapCacheableBuffer locked = inserted.get(0);
+ lock(lrfu, locked);
+ lrfu.reserveMemory(1, false);
+ LlapCacheableBuffer evicted = et.evicted.get(0);
+ assertNotNull(evicted);
+ assertTrue(evicted.isInvalid());
+ assertNotSame(locked, evicted);
+ unlock(lrfu, locked);
+ }
+
+ private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelBuddyCache.allocateFake();
+ // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
+ public boolean cache(
+ LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapCacheableBuffer buffer) {
+ if (!lrfu.reserveMemory(1, false)) {
+ return false;
+ }
+ buffer.incRef();
+ lrfu.cache(buffer);
+ buffer.decRef();
+ lrfu.notifyUnlock(buffer);
+ return true;
+ }
+
+ private LlapCacheableBuffer getOneEvictedBuffer(EvictionTracker et) {
+ assertTrue(et.evicted.size() == 0 || et.evicted.size() == 1); // test-specific
+ LlapCacheableBuffer result = et.evicted.isEmpty() ? null : et.evicted.get(0);
+ et.evicted.clear();
+ return result;
+ }
+
+ private static void lock(LowLevelLrfuCachePolicy lrfu, LlapCacheableBuffer locked) {
+ locked.incRef();
+ lrfu.notifyLock(locked);
+ }
+
+ private static void unlock(LowLevelLrfuCachePolicy lrfu, LlapCacheableBuffer locked) {
+ locked.decRef();
+ lrfu.notifyUnlock(locked);
+ }
+
+ private void testHeapSize(int heapSize) {
+ LOG.info("Testing heap size " + heapSize);
+ Random rdm = new Random(1234);
+ HiveConf conf = new HiveConf();
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.05f); // very small heap? TODO#
+ EvictionTracker et = new EvictionTracker();
+ LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
+ // Insert the number of elements plus 2, to trigger 2 evictions.
+ int toEvict = 2;
+ ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+ LlapCacheableBuffer[] evicted = new LlapCacheableBuffer[toEvict];
+ Assume.assumeTrue(toEvict <= heapSize);
+ for (int i = 0; i < heapSize + toEvict; ++i) {
+ LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ assertTrue(cache(lrfu, et, buffer));
+ LlapCacheableBuffer evictedBuf = getOneEvictedBuffer(et);
+ if (i < toEvict) {
+ evicted[i] = buffer;
+ } else {
+ if (i >= heapSize) {
+ assertSame(evicted[i - heapSize], evictedBuf);
+ assertTrue(evictedBuf.isInvalid());
+ } else {
+ assertNull(evictedBuf);
+ }
+ inserted.add(buffer);
+ }
+ }
+ LOG.info("Inserted " + dumpInserted(inserted));
+ // We will touch all blocks in random order.
+ Collections.shuffle(inserted, rdm);
+ LOG.info("Touch order " + dumpInserted(inserted));
+ // Lock entire heap; heap is still full; we should not be able to evict or insert.
+ for (LlapCacheableBuffer buf : inserted) {
+ lock(lrfu, buf);
+ }
+ assertFalse(lrfu.reserveMemory(1, false));
+ if (!et.evicted.isEmpty()) {
+ assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
+ }
+ for (LlapCacheableBuffer buf : inserted) {
+ unlock(lrfu, buf);
+ }
+ // To make (almost) sure we get definite order, touch blocks in order large number of times.
+ for (LlapCacheableBuffer buf : inserted) {
+ // TODO: this seems to indicate that priorities change too little...
+ // perhaps we need to adjust the policy.
+ for (int j = 0; j < 10; ++j) {
+ lrfu.notifyLock(buf);
+ lrfu.notifyUnlock(buf);
+ }
+ }
+ verifyOrder(lrfu, et, inserted);
+ }
+
+ private void verifyOrder(LowLevelLrfuCachePolicy lrfu,
+ EvictionTracker et, ArrayList<LlapCacheableBuffer> inserted) {
+ LlapCacheableBuffer block;
+ // Evict all blocks.
+ et.evicted.clear();
+ for (int i = 0; i < inserted.size(); ++i) {
+ assertTrue(lrfu.reserveMemory(1, false));
+ }
+ // The map should now be empty.
+ assertFalse(lrfu.reserveMemory(1, false));
+ for (int i = 0; i < inserted.size(); ++i) {
+ block = et.evicted.get(i);
+ assertTrue(block.isInvalid());
+ assertSame(inserted.get(i), block);
+ }
+ }
+
+ private String dumpInserted(ArrayList<LlapCacheableBuffer> inserted) {
+ String debugStr = "";
+ for (int i = 0; i < inserted.size(); ++i) {
+ if (i != 0) debugStr += ", ";
+ debugStr += inserted.get(i);
+ }
+ return debugStr;
+ }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Wed Jan 14 02:47:08 2015
@@ -96,6 +96,6 @@ public interface RecordReader {
     * @param consumer Consumer to pass the results too.
     * @param allocator Allocator to allocate memory.
     */
- void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
+ void readEncodedColumns(long[][] colRgs, int rgCount,
        Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache);
  }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Jan 14 02:47:08 2015
@@ -3302,7 +3302,7 @@ public class RecordReaderImpl implements
    }

    @Override
- public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
+ public void readEncodedColumns(long[][] colRgs, int rgCount,
        Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) {
      // TODO: HERE read encoded data
    }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJan 14, '15 at 2:47a
activeJan 14, '15 at 2:47a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase