FAQ
Author: sershe
Date: Tue Mar 31 22:28:46 2015
New Revision: 1670501

URL: http://svn.apache.org/r1670501
Log:
HIVE-10092 : LLAP: improve how buffers are locked for split (Sergey Shelukhin)

Modified:
     hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
     hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
     hive/branches/llap/ql/src/test/results/clientpositive/orc_llap.q.out

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Tue Mar 31 22:28:46 2015
@@ -49,7 +49,8 @@ public interface LowLevelCache {
     * Some sort of InvalidCacheChunk could be placed to avoid them. TODO
     * @param base base offset for the ranges (stripe/stream offset in case of ORC).
     */
- DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset);
+ DiskRangeList getFileData(
+ long fileId, DiskRangeList range, long baseOffset, CacheChunkFactory factory);

    /**
     * Puts file data into cache.
@@ -73,7 +74,11 @@ public interface LowLevelCache {

    LlapMemoryBuffer createUnallocated();

- void notifyReused(LlapMemoryBuffer buffer);
+ boolean notifyReused(LlapMemoryBuffer buffer);

    boolean isDirectAlloc();
+
+ public interface CacheChunkFactory {
+ DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long startOffset, long endOffset);
+ }
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Tue Mar 31 22:28:46 2015
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.llap.io.ap
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
  import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
  import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;

  import com.google.common.annotations.VisibleForTesting;

@@ -79,7 +78,8 @@ public class LowLevelCacheImpl implement
    }

    @Override
- public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset) {
+ public DiskRangeList getFileData(
+ long fileId, DiskRangeList ranges, long baseOffset, CacheChunkFactory factory) {
      if (ranges == null) return null;
      FileCache subCache = cache.get(fileId);
      if (subCache == null || !subCache.incRef()) {
@@ -96,7 +96,7 @@ public class LowLevelCacheImpl implement
          metrics.incrCacheRequestedBytes(current.getLength());
          // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
          DiskRangeList next = current.next;
- getOverlappingRanges(baseOffset, current, subCache.cache);
+ getOverlappingRanges(baseOffset, current, subCache.cache, factory);
          current = next;
        }
        return prev.next;
@@ -106,7 +106,7 @@ public class LowLevelCacheImpl implement
    }

    private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached,
- ConcurrentSkipListMap<Long, LlapDataBuffer> cache) {
+ ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory factory) {
      long absOffset = currentNotCached.getOffset() + baseOffset;
      if (!doAssumeGranularBlocks) {
        // This currently only happens in tests. See getFileData comment on the interface.
@@ -140,7 +140,7 @@ public class LowLevelCacheImpl implement
              + cacheOffset + ", " + (cacheOffset + buffer.declaredLength) + ")");
        }
        cacheEnd = cacheOffset + buffer.declaredLength;
- CacheChunk currentCached = new CacheChunk(buffer,
+ DiskRangeList currentCached = factory.createCacheChunk(buffer,
            cacheOffset - baseOffset, cacheEnd - baseOffset);
        currentNotCached = addCachedBufferToIter(currentNotCached, currentCached);
        metrics.incrCacheHitBytes(Math.min(requestedLength, currentCached.getLength()));
@@ -154,7 +154,7 @@ public class LowLevelCacheImpl implement
     * @return The new currentNotCached pointer, following the cached buffer insertion.
     */
    private DiskRangeList addCachedBufferToIter(
- DiskRangeList currentNotCached, CacheChunk currentCached) {
+ DiskRangeList currentNotCached, DiskRangeList currentCached) {
      if (currentNotCached.getOffset() >= currentCached.getOffset()) {
        if (currentNotCached.getEnd() <= currentCached.getEnd()) { // we assume it's always "==" now
          // Replace the entire current DiskRange with new cached range.
@@ -454,10 +454,10 @@ public class LowLevelCacheImpl implement
    }

    @Override
- public void notifyReused(LlapMemoryBuffer buffer) {
+ public boolean notifyReused(LlapMemoryBuffer buffer) {
      // notifyReused implies that buffer is already locked; it's also called once for new
      // buffers that are not cached yet. Don't notify cache policy.
- lockBuffer(((LlapDataBuffer)buffer), false);
+ return lockBuffer(((LlapDataBuffer)buffer), false);
    }

    @Override

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Tue Mar 31 22:28:46 2015
@@ -37,6 +37,8 @@ import org.apache.hadoop.hive.common.Dis
  import org.apache.hadoop.hive.common.DiskRangeList;
  import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheChunkFactory;
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
  import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
@@ -45,6 +47,12 @@ import org.junit.Test;
  public class TestLowLevelCacheImpl {
    private static final Log LOG = LogFactory.getLog(TestLowLevelCacheImpl.class);

+ private static final CacheChunkFactory testFactory = new CacheChunkFactory() {
+ public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
+ return new CacheChunk(buffer, offset, end);
+ }
+ };
+
    private static class DummyAllocator implements Allocator {
      @Override
      public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
@@ -131,7 +139,7 @@ public class TestLowLevelCacheImpl {
        } else if (intCount >= 0) {
          assertTrue(intCount == 0);
          intCount = -1;
- iter = cache.getFileData(fileId, list.get(), 0);
+ iter = cache.getFileData(fileId, list.get(), 0, testFactory);
          assertEquals(resultCount, iter.listSize());
        }
        assertTrue(iter != null);
@@ -255,13 +263,13 @@ public class TestLowLevelCacheImpl {
      assertEquals(0, metrics.getCacheHitBytes());
      list = new DiskRangeListCreateHelper();
      list.addOrMerge(0, 1000, true, false);
- cache.getFileData(fn, list.get(), 0);
+ cache.getFileData(fn, list.get(), 0, testFactory);
      assertEquals(1000, metrics.getCacheRequestedBytes());
      assertEquals(500, metrics.getCacheHitBytes());

      list = new DiskRangeListCreateHelper();
      list.addOrMerge(0, 100, true, false);
- cache.getFileData(fn, list.get(), 0);
+ cache.getFileData(fn, list.get(), 0, testFactory);
      assertEquals(1100, metrics.getCacheRequestedBytes());
      assertEquals(600, metrics.getCacheHitBytes());

@@ -269,14 +277,14 @@ public class TestLowLevelCacheImpl {
      list.addOrMerge(0, 100, true, false);
      list.addOrMerge(300, 500, true, false);
      list.addOrMerge(800, 1000, true, false);
- cache.getFileData(fn, list.get(), 0);
+ cache.getFileData(fn, list.get(), 0, testFactory);
      assertEquals(1600, metrics.getCacheRequestedBytes());
      assertEquals(1100, metrics.getCacheHitBytes());

      list = new DiskRangeListCreateHelper();
      list.addOrMerge(300, 500, true, false);
      list.addOrMerge(1000, 2000, true, false);
- cache.getFileData(fn, list.get(), 0);
+ cache.getFileData(fn, list.get(), 0, testFactory);
      assertEquals(2800, metrics.getCacheRequestedBytes());
      assertEquals(1300, metrics.getCacheHitBytes());
    }
@@ -308,7 +316,7 @@ public class TestLowLevelCacheImpl {
                  list.addOrMerge(offsets[j], offsets[j] + 1, true, false);
                }

- DiskRangeList iter = cache.getFileData(fileName, list.get(), 0);
+ DiskRangeList iter = cache.getFileData(fileName, list.get(), 0, testFactory);
                int j = -1;
                while (iter != null) {
                  ++j;
@@ -374,7 +382,7 @@ public class TestLowLevelCacheImpl {
            DiskRangeList head = new DiskRangeList(0, offsetsToUse + 1);
            isFirstFile = !isFirstFile;
            long fileId = isFirstFile ? fn1 : fn2;
- head = cache.getFileData(fileId, head, 0);
+ head = cache.getFileData(fileId, head, 0, testFactory);
            DiskRange[] results = head.listToArray();
            int startIndex = rdm.nextInt(results.length), index = startIndex;
            LlapDataBuffer victim = null;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java Tue Mar 31 22:28:46 2015
@@ -32,7 +32,7 @@ public class DebugUtils {

    private final static boolean isTraceOrcEnabled = EncodedReaderImpl.LOG.isDebugEnabled();
    public static boolean isTraceOrcEnabled() {
- return true; // TODO: temporary, should be hardcoded false
+ return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
    }

    public static boolean isTraceLockingEnabled() {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java Tue Mar 31 22:28:46 2015
@@ -35,6 +35,8 @@ import org.apache.hadoop.hive.llap.io.ap
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
  import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunkFactory;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
@@ -44,6 +46,34 @@ import org.apache.hadoop.hive.ql.io.orc.
  import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;


+/**
+ * Encoded reader implementation.
+ *
+ * Note about refcounts on cache blocks.
+ * When we get or put blocks into cache, they are "locked" (refcount++). After that, we send the
+ * blocks out to processor as part of RG data; one block can be used for multiple RGs. In some
+ * cases, one block is sent for ALL rgs (e.g. a dictionary for string column). This is how we deal
+ * with this:
+ * For non-dictionary case:
+ * 1) At all times, every buffer has +1 refcount for each time we sent this block to processing.
+ * 2) When processor is done with an RG, it decrefs for all the blocks involved.
+ * 3) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we send
+ * the block to processor, and the latter decrefs it, the block won't be evicted when we want
+ * to reuse it for some other RG, forcing us to do an extra disk read or cache lookup.
+ * 4) As we read (we always read RGs in order, and assume they are stored in physical order in the
+ * file, plus that RGs are not shared between streams, AND that we read each stream from the
+ * beginning), we note which blocks cannot possibly be reused anymore (next RG starts in the
+ * next CB). We decref for the refcount from (3) in such case.
+ * 5) Given that RG end boundary in ORC is an estimate, so we can request data from cache and then
+ * not use it, at the end we go thru all the blocks, and release those not released by (4).
+ * For dictionary case:
+ * 1) We have a separate refcount on the ColumnBuffer object we send to the processor. In the above
+ * case, it's always 1, so when processor is done it goes directly to decrefing cache buffers.
+ * 2) In the dictionary case, it's increased per RG, and processors don't touch cache buffers if
+ * they do not happen to decref this counter to 0.
+ * 3) This is done because dictionary can have many buffers; decrefing all of them for all RGs
+ * is more expensive; plus, decrefing in cache may be more expensive due to cache policy/etc.
+ */
  public class EncodedReaderImpl implements EncodedReader {
    public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);

@@ -58,6 +88,8 @@ public class EncodedReaderImpl implement
    private final ByteBufferAllocatorPool pool;
    // For now, one consumer for all calls.
    private final Consumer<EncodedColumnBatch<OrcBatchKey>> consumer;
+ // TODO: if used as a pool, pass in externally
+ private final TrackedCacheChunkFactory cacheChunkFactory = new TrackedCacheChunkFactory();

    public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, boolean useZeroCopy,
        List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate,
@@ -228,7 +260,7 @@ public class EncodedReaderImpl implement
            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }
      if (cache != null) {
- cache.getFileData(fileId, toRead.next, stripeOffset);
+ cache.getFileData(fileId, toRead.next, stripeOffset, cacheChunkFactory);
        if (DebugUtils.isTraceOrcEnabled()) {
          LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
              + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -279,9 +311,11 @@ public class EncodedReaderImpl implement
                // it when building the last RG, so each RG processing will decref once, and the
                // last one will unlock the buffers.
                sctx.stripeLevelStream.incRef();
+ // For stripe-level streams we don't need the extra refcount on the block. See class comment about refcounts.
+ long unlockUntilCOffset = sctx.offset + sctx.length;
                DiskRangeList lastCached = InStream.uncompressStream(fileId, stripeOffset, iter,
- sctx.offset,sctx.offset + sctx.length, zcr, codec, bufferSize, cache,
- sctx.stripeLevelStream);
+ sctx.offset, sctx.offset + sctx.length, zcr, codec, bufferSize, cache,
+ sctx.stripeLevelStream, unlockUntilCOffset);
                if (lastCached != null) {
                  iter = lastCached;
                }
@@ -296,6 +330,8 @@ public class EncodedReaderImpl implement
                  nextCOffset = isLastRg ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset),
                  endCOffset = RecordReaderUtils.estimateRgEndOffset(
                      isCompressed, isLastRg, nextCOffset, sctx.length, bufferSize) + sctx.offset;
+ // See class comment about refcounts.
+ long unlockUntilCOffset = nextCOffset;
              cb = new StreamBuffer(sctx.kind.getNumber());
              cb.incRef();
              if (DebugUtils.isTraceOrcEnabled()) {
@@ -307,7 +343,7 @@ public class EncodedReaderImpl implement
              boolean isStartOfStream = sctx.bufferIter == null;
              DiskRangeList range = isStartOfStream ? iter : sctx.bufferIter;
              DiskRangeList lastCached = InStream.uncompressStream(fileId, stripeOffset, range,
- cOffset, endCOffset, zcr, codec, bufferSize, cache, cb);
+ cOffset, endCOffset, zcr, codec, bufferSize, cache, cb, unlockUntilCOffset);
              if (lastCached != null) {
                sctx.bufferIter = iter = lastCached; // Reset iter just to ensure it's valid
              }
@@ -324,18 +360,20 @@ public class EncodedReaderImpl implement
            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }

- // TODO: this is not good; we hold all the blocks until we send them all.
- // Hard to avoid due to sharing by RGs... perhaps we can still do better.
- DiskRangeList toFree = toRead.next;
- while (toFree != null) {
- if (toFree instanceof CacheChunk) {
- LlapMemoryBuffer buffer = ((CacheChunk)toFree).buffer;
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Unlocking " + buffer + " at the end of readEncodedColumns");
- }
- cache.releaseBuffer(buffer);
+ // Release the unreleased buffers. See class comment about refcounts.
+ DiskRangeList current = toRead.next;
+ while (current != null) {
+ DiskRangeList toFree = current;
+ current = current.next;
+ if (!(toFree instanceof TrackedCacheChunk)) continue;
+ TrackedCacheChunk cc = (TrackedCacheChunk)toFree;
+ if (cc.isReleased) continue;
+ LlapMemoryBuffer buffer = ((CacheChunk)toFree).buffer;
+ if (DebugUtils.isTraceLockingEnabled()) {
+ LOG.info("Unlocking " + buffer + " for the fetching thread at the end");
        }
- toFree = toFree.next;
+ cache.releaseBuffer(buffer);
+ cc.isReleased = true;
      }
    }


Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Tue Mar 31 22:28:46 2015
@@ -540,13 +540,30 @@ public abstract class InStream extends I
      }
    }

+ public static class TrackedCacheChunkFactory implements LowLevelCache.CacheChunkFactory {
+ // TODO: in future, this can also be used as a pool
+ @Override
+ public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
+ return new TrackedCacheChunk(buffer, offset, end);
+ }
+ }
+
+ /** Cache chunk which tracks whether it has been fully read. See
+ EncodedReaderImpl class comment about refcounts. */
+ public static class TrackedCacheChunk extends CacheChunk {
+ public boolean isReleased = false;
+ public TrackedCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
+ super(buffer, offset, end);
+ }
+ }
+
    /**
     * CacheChunk that is pre-created for new cache data; initially, it contains an original disk
     * buffer and an unallocated LlapMemoryBuffer object. Before we expose it, the LMB is allocated,
     * the data is decompressed, and original compressed data is discarded. The chunk lives on in
     * the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
     */
- private static class ProcCacheChunk extends CacheChunk {
+ private static class ProcCacheChunk extends TrackedCacheChunk {
      public ProcCacheChunk(long cbStartOffset, long cbEndOffset, boolean isCompressed,
          ByteBuffer originalData, LlapMemoryBuffer targetBuffer, int originalCbIndex) {
        super(targetBuffer, cbStartOffset, cbEndOffset);
@@ -573,12 +590,16 @@ public abstract class InStream extends I
     * @param bufferSize Compressed buffer (CB) size.
     * @param cache Low-level cache to cache new data.
     * @param streamBuffer Stream buffer, to add the results.
+ * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
+ * they will not be used in future calls (see the class comment in
+ * EncodedReaderImpl about refcounts).
     * @return Last buffer cached during decomrpession. Cache buffers are never removed from
     * the master list, so they are safe to keep as iterators for various streams.
     */
- public static DiskRangeList uncompressStream(long fileId, long baseOffset,
- DiskRangeList start, long cOffset, long endCOffset, ZeroCopyReaderShim zcr,
- CompressionCodec codec, int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer)
+ // TODO#: move to EncodedReaderImpl
+ public static DiskRangeList uncompressStream(long fileId, long baseOffset, DiskRangeList start,
+ long cOffset, long endCOffset, ZeroCopyReaderShim zcr, CompressionCodec codec,
+ int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer, long unlockUntilCOffset)
            throws IOException {
      streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
      if (cOffset == endCOffset) return null;
@@ -599,22 +620,24 @@ public abstract class InStream extends I
        current = current.split(cOffset).next;
      }
      long currentCOffset = cOffset;
- DiskRangeList lastCached = null;
+ TrackedCacheChunk lastCached = null;
      while (true) {
        DiskRangeList next = null;
- if (current instanceof CacheChunk) {
+ if (current instanceof TrackedCacheChunk) {
          // 2a. This is a cached compression buffer, add as is.
- CacheChunk cc = (CacheChunk)current;
+ TrackedCacheChunk cc = (TrackedCacheChunk)current;
          if (DebugUtils.isTraceLockingEnabled()) {
            LOG.info("Locking " + cc.buffer + " due to reuse");
          }
- cache.notifyReused(cc.buffer);
+ boolean canReuse = cache.notifyReused(cc.buffer);
+ assert canReuse;
          streamBuffer.cacheBuffers.add(cc.buffer);
          currentCOffset = cc.getEnd();
          if (DebugUtils.isTraceOrcEnabled()) {
            LOG.info("Adding an already-uncompressed buffer " + cc.buffer);
          }
- lastCached = current;
+ ponderReleaseInitialRefcount(cache, unlockUntilCOffset, cc);
+ lastCached = cc;
          next = current.next;
        } else {
          // 2b. This is a compressed buffer. We need to uncompress it; the buffer can comprise
@@ -630,6 +653,7 @@ public abstract class InStream extends I
          next = (lastCached != null) ? lastCached.next : null;
          currentCOffset = (next != null) ? next.getOffset() : originalOffset;
        }
+
        if ((endCOffset >= 0 && currentCOffset >= endCOffset) || next == null) {
          break;
        }
@@ -698,9 +722,26 @@ public abstract class InStream extends I
          fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL);
      processCacheCollisions(
          cache, collisionMask, toDecompress, targetBuffers, streamBuffer.cacheBuffers);
+
+ // 7. It may happen that we only use new compression buffers once. Release initial refcounts.
+ for (ProcCacheChunk chunk : toDecompress) {
+ ponderReleaseInitialRefcount(cache, unlockUntilCOffset, chunk);
+ }
+
      return lastCached;
    }

+ private static void ponderReleaseInitialRefcount(LowLevelCache cache,
+ long unlockUntilCOffset, TrackedCacheChunk cc) {
+ if (cc.getEnd() > unlockUntilCOffset) return;
+ // This is the last RG for which this buffer will be used. Remove the initial refcount
+ if (DebugUtils.isTraceLockingEnabled()) {
+ LOG.info("Unlocking " + cc.buffer + " for the fetching thread");
+ }
+ cache.releaseBuffer(cc.buffer);
+ cc.isReleased = true;
+ }
+
    private static void processCacheCollisions(LowLevelCache cache, long[] collisionMask,
        List<ProcCacheChunk> toDecompress, LlapMemoryBuffer[] targetBuffers,
        List<LlapMemoryBuffer> cacheBuffers) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Mar 31 22:28:46 2015
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.common.Dis
  import org.apache.hadoop.hive.common.type.HiveDecimal;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
  import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -89,7 +88,6 @@ public class RecordReaderImpl implements
    private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
    private final Path path;
    private final FileSystem fileSystem;
- private long fileId;
    private final FSDataInputStream file;
    private final long firstRow;
    private final List<StripeInformation> stripes =
@@ -116,7 +114,6 @@ public class RecordReaderImpl implements
    private boolean[] includedRowGroups = null;
    private final Configuration conf;
    private final MetadataReader metadata;
- private LowLevelCache cache = null;

    private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
    private final ZeroCopyReaderShim zcr;
@@ -3095,13 +3092,9 @@ public class RecordReaderImpl implements
      long end = start + stripe.getDataLength();
      // explicitly trigger 1 big read
      DiskRangeList toRead = new DiskRangeList(start, end);
- if (this.cache != null) {
- toRead = cache.getFileData(fileId, toRead, stripe.getOffset());
- }
      bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
      List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
- createStreams(
- streamDescriptions, bufferChunks, null, codec, bufferSize, streams, cache);
+ createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
      // TODO: decompressed data from streams should be put in cache
    }

@@ -3235,8 +3228,7 @@ public class RecordReaderImpl implements
                              boolean[] includeColumn,
                              CompressionCodec codec,
                              int bufferSize,
- Map<StreamName, InStream> streams,
- LowLevelCache cache) throws IOException {
+ Map<StreamName, InStream> streams) throws IOException {
      long streamOffset = 0;
      for (OrcProto.Stream streamDesc: streamDescriptions) {
        int column = streamDesc.getColumn();
@@ -3249,18 +3241,12 @@ public class RecordReaderImpl implements
        List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
            ranges, streamOffset, streamDesc.getLength());
        StreamName name = new StreamName(column, streamDesc.getKind());
- streams.put(name, InStream.create(fileId, name.toString(), buffers,
- streamDesc.getLength(), codec, bufferSize, cache));
+ streams.put(name, InStream.create(null, name.toString(), buffers,
+ streamDesc.getLength(), codec, bufferSize, null));
        streamOffset += streamDesc.getLength();
      }
    }

- public void setCache(LowLevelCache cache) throws IOException {
- this.cache = cache;
- // TODO: if this is actually used, get fileId from split, like main LLAP path.
- this.fileId = RecordReaderUtils.getFileId(fileSystem, path);
- }
-
    private void readPartialDataStreams(StripeInformation stripe) throws IOException {
      List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
      DiskRangeList toRead = planReadPartialDataStreams(streamList,
@@ -3269,15 +3255,12 @@ public class RecordReaderImpl implements
      if (LOG.isDebugEnabled()) {
        LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
      }
- if (this.cache != null) {
- toRead = cache.getFileData(fileId, toRead, stripe.getOffset());
- }
      bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
      if (LOG.isDebugEnabled()) {
        LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
      }

- createStreams(streamList, bufferChunks, included, codec, bufferSize, streams, cache);
+ createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
    }

    @Override

Modified: hive/branches/llap/ql/src/test/results/clientpositive/orc_llap.q.out
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/results/clientpositive/orc_llap.q.out?rev=1670501&r1=1670500&r2=1670501&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/results/clientpositive/orc_llap.q.out (original)
+++ hive/branches/llap/ql/src/test/results/clientpositive/orc_llap.q.out Tue Mar 31 22:28:46 2015
@@ -536,17 +536,17 @@ STAGE PLANS:
            TableScan
              alias: orc_llap
              filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
- Statistics: Num rows: 99862 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 99864 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: ((cint > 10) and cbigint is not null) (type: boolean)
- Statistics: Num rows: 16644 Data size: 266305 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 16644 Data size: 266304 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint)
                  outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 16644 Data size: 266305 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 16644 Data size: 266304 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
- Statistics: Num rows: 16644 Data size: 266305 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 16644 Data size: 266304 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -606,17 +606,17 @@ STAGE PLANS:
            TableScan
              alias: orc_llap
              filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
- Statistics: Num rows: 4993 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 4993 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: ((cint > 10) and cbigint is not null) (type: boolean)
- Statistics: Num rows: 832 Data size: 266246 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 832 Data size: 266251 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: ctinyint (type: tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), cfloat (type: float), cdouble (type: double), cstring1 (type: string), cstring2 (type: string), ctimestamp1 (type: timestamp), ctimestamp2 (type: timestamp), cboolean1 (type: boolean), cboolean2 (type: boolean)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
- Statistics: Num rows: 832 Data size: 266246 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 832 Data size: 266251 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
- Statistics: Num rows: 832 Data size: 266246 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 832 Data size: 266251 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -676,17 +676,17 @@ STAGE PLANS:
            TableScan
              alias: orc_llap
              filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
- Statistics: Num rows: 15363 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 15363 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: ((cint > 5) and (cint < 10)) (type: boolean)
- Statistics: Num rows: 1707 Data size: 177533 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1707 Data size: 177536 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: cstring2 (type: string)
                  outputColumnNames: _col0
- Statistics: Num rows: 1707 Data size: 177533 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1707 Data size: 177536 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
- Statistics: Num rows: 1707 Data size: 177533 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1707 Data size: 177536 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -745,22 +745,22 @@ STAGE PLANS:
        Map Operator Tree:
            TableScan
              alias: orc_llap
- Statistics: Num rows: 7989 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 7989 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: cstring1 (type: string), cstring2 (type: string)
                outputColumnNames: _col0, _col1
- Statistics: Num rows: 7989 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 7989 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
                Group By Operator
                  aggregations: count()
                  keys: _col0 (type: string), _col1 (type: string)
                  mode: hash
                  outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 7989 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 7989 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
                  Reduce Output Operator
                    key expressions: _col0 (type: string), _col1 (type: string)
                    sort order: ++
                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 7989 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 7989 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
                    value expressions: _col2 (type: bigint)
        Execution mode: vectorized
        LLAP IO: all inputs
@@ -770,10 +770,10 @@ STAGE PLANS:
            keys: KEY._col0 (type: string), KEY._col1 (type: string)
            mode: mergepartial
            outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 3994 Data size: 798800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3994 Data size: 798813 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
- Statistics: Num rows: 3994 Data size: 798800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3994 Data size: 798813 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -837,10 +837,10 @@ STAGE PLANS:
            TableScan
              alias: o1
              filterExpr: (csmallint is not null and cbigint is not null) (type: boolean)
- Statistics: Num rows: 14266 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 14266 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: (csmallint is not null and cbigint is not null) (type: boolean)
- Statistics: Num rows: 3567 Data size: 399506 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3567 Data size: 399513 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 csmallint (type: smallint)
@@ -852,10 +852,10 @@ STAGE PLANS:
            TableScan
              alias: o2
              filterExpr: (csmallint is not null and cbigint is not null) (type: boolean)
- Statistics: Num rows: 14266 Data size: 1597800 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 14266 Data size: 1597828 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: (csmallint is not null and cbigint is not null) (type: boolean)
- Statistics: Num rows: 3567 Data size: 399506 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3567 Data size: 399513 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
@@ -863,14 +863,14 @@ STAGE PLANS:
                    0 csmallint (type: smallint)
                    1 csmallint (type: smallint)
                  outputColumnNames: _col6, _col22
- Statistics: Num rows: 3923 Data size: 439456 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3923 Data size: 439464 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col6 (type: string), _col22 (type: string)
                    outputColumnNames: _col0, _col1
- Statistics: Num rows: 3923 Data size: 439456 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3923 Data size: 439464 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
- Statistics: Num rows: 3923 Data size: 439456 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 3923 Data size: 439464 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.TextInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 31, '15 at 10:28p
activeMar 31, '15 at 10:28p
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2022 Grokbase