FAQ
Author: sershe
Date: Tue Mar 3 02:58:22 2015
New Revision: 1663518

URL: http://svn.apache.org/r1663518
Log:
HIVE-9729p1 : LLAP: design and implement proper metadata cache - make sure java metadata objects are not accessed via cache from decode side (so they could be evicted); also clean up some interfaces

Modified:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
     hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
     hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Tue Mar 3 02:58:22 2015
@@ -43,31 +43,32 @@ public class LowLevelCacheImpl implement
    private final Allocator allocator;

    private AtomicInteger newEvictions = new AtomicInteger(0);
- private final Thread cleanupThread;
+ private Thread cleanupThread = null;
    private final ConcurrentHashMap<String, FileCache> cache =
        new ConcurrentHashMap<String, FileCache>();
- private final LowLevelCachePolicyBase cachePolicy;
+ private final LowLevelCachePolicy cachePolicy;
+ private final long cleanupInterval;

    public LowLevelCacheImpl(
- Configuration conf, LowLevelCachePolicyBase cachePolicy, Allocator allocator) {
+ Configuration conf, LowLevelCachePolicy cachePolicy, Allocator allocator) {
      this(conf, cachePolicy, allocator, 600);
    }

    @VisibleForTesting
    LowLevelCacheImpl(Configuration conf,
- LowLevelCachePolicyBase cachePolicy, Allocator allocator, long cleanupInterval) {
+ LowLevelCachePolicy cachePolicy, Allocator allocator, long cleanupInterval) {
      if (LlapIoImpl.LOGL.isInfoEnabled()) {
        LlapIoImpl.LOG.info("Low level cache; cleanup interval " + cleanupInterval + "sec");
      }
      this.cachePolicy = cachePolicy;
- this.cachePolicy.setEvictionListener(this);
      this.allocator = allocator;
- if (cleanupInterval >= 0) {
- cleanupThread = new CleanupThread(cleanupInterval);
- cleanupThread.start();
- } else {
- cleanupThread = null;
- }
+ this.cleanupInterval = cleanupInterval;
+ }
+
+ public void init() {
+ if (cleanupInterval < 0) return;
+ cleanupThread = new CleanupThread(cleanupInterval);
+ cleanupThread.start();
    }

    @Override

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Tue Mar 3 02:58:22 2015
@@ -22,4 +22,6 @@ public interface LowLevelCachePolicy {
    void cache(LlapCacheableBuffer buffer);
    void notifyLock(LlapCacheableBuffer buffer);
    void notifyUnlock(LlapCacheableBuffer buffer);
+ long evictSomeBlocks(long memoryToReserve);
+ void setEvictionListener(EvictionListener listener);
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Tue Mar 3 02:58:22 2015
@@ -30,12 +30,12 @@ import org.apache.hadoop.hive.conf.HiveC
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;

-public class LowLevelFifoCachePolicy extends LowLevelCachePolicyBase {
+public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
    private final Lock lock = new ReentrantLock();
    private final LinkedList<LlapCacheableBuffer> buffers;
+ private EvictionListener evictionListener;

    public LowLevelFifoCachePolicy(Configuration conf) {
- super(HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
      if (LlapIoImpl.LOGL.isInfoEnabled()) {
        LlapIoImpl.LOG.info("FIFO cache policy");
      }
@@ -63,7 +63,12 @@ public class LowLevelFifoCachePolicy ext
    }

    @Override
- protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+ public void setEvictionListener(EvictionListener listener) {
+ this.evictionListener = listener;
+ }
+
+ @Override
+ public long evictSomeBlocks(long memoryToReserve) {
      long evicted = 0;
      lock.lock();
      try {
@@ -73,7 +78,7 @@ public class LowLevelFifoCachePolicy ext
          if (candidate.invalidate()) {
            iter.remove();
            evicted += candidate.byteBuffer.remaining();
- listener.notifyEvicted(candidate);
+ evictionListener.notifyEvicted(candidate);
          }
        }
      } finally {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Tue Mar 3 02:58:22 2015
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.llap.io.ap
   * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
   * Additionally, buffer locking has to be handled (locked buffer cannot be evicted).
   */
-public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase {
+public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
    private final double lambda;
    private final double f(long x) {
      return Math.pow(0.5, lambda * x);
@@ -62,9 +62,10 @@ public class LowLevelLrfuCachePolicy ext
    private LlapCacheableBuffer listHead, listTail;
    /** Number of elements. */
    private int heapSize = 0;
+ private EvictionListener evictionListener;

    public LowLevelLrfuCachePolicy(Configuration conf) {
- super(HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+ long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
      int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
      lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
      int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
@@ -154,7 +155,12 @@ public class LowLevelLrfuCachePolicy ext
    }

    @Override
- protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+ public void setEvictionListener(EvictionListener listener) {
+ this.evictionListener = listener;
+ }
+
+ @Override
+ public long evictSomeBlocks(long memoryToReserve) {
      long evicted = 0;
      // In normal case, we evict the items from the list.
      LlapCacheableBuffer nextCandidate, firstCandidate;
@@ -189,7 +195,7 @@ public class LowLevelLrfuCachePolicy ext
        listLock.unlock();
      }
      while (firstCandidate != nextCandidate) {
- listener.notifyEvicted(firstCandidate);
+ evictionListener.notifyEvicted(firstCandidate);
        firstCandidate = firstCandidate.prev;
      }
      if (evicted >= memoryToReserve) return evicted;
@@ -203,7 +209,7 @@ public class LowLevelLrfuCachePolicy ext
        }
        if (buffer == null) return evicted;
        evicted += buffer.byteBuffer.remaining();
- listener.notifyEvicted(buffer);
+ evictionListener.notifyEvicted(buffer);
      }
      return evicted;
    }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java Tue Mar 3 02:58:22 2015
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.Consu
  import org.apache.hadoop.hive.llap.ConsumerFeedback;
  import org.apache.hadoop.hive.llap.DebugUtils;
  import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -44,16 +45,26 @@ import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.RecordReader;
  import org.apache.hadoop.mapred.Reporter;

+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
  public class LlapInputFormat
    implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface {
- private final LlapIoImpl llapIo;
- private InputFormat sourceInputFormat;
-
- LlapInputFormat(LlapIoImpl llapIo, InputFormat sourceInputFormat) {
+ @SuppressWarnings("rawtypes")
+ private final InputFormat sourceInputFormat;
+ private final ColumnVectorProducer cvp;
+ private final ListeningExecutorService executor;
+
+ @SuppressWarnings("rawtypes")
+ LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
+ ListeningExecutorService executor) {
      // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
      // We'd need to plumb it thru and use it to get data to cache/etc.
      assert sourceInputFormat instanceof OrcInputFormat;
- this.llapIo = llapIo;
+ this.executor = executor;
+ this.cvp = cvp;
      this.sourceInputFormat = sourceInputFormat;
    }

@@ -73,7 +84,7 @@ public class LlapInputFormat
        if (includedCols.isEmpty()) {
          includedCols = null; // Also means read all columns? WTF?
        }
- return new LlapRecordReader(job, fileSplit, includedCols, llapIo.getCvp());
+ return new LlapRecordReader(job, fileSplit, includedCols);
      } catch (Exception ex) {
        throw new IOException(ex);
      }
@@ -84,13 +95,12 @@ public class LlapInputFormat
      return sourceInputFormat.getSplits(job, numSplits);
    }

- private static class LlapRecordReader
+ private class LlapRecordReader
        implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
      private final InputSplit split;
      private final List<Integer> columnIds;
      private final SearchArgument sarg;
      private final String[] columnNames;
- private final ColumnVectorProducer<?> cvp;
      private final VectorizedRowBatchCtx rbCtx;

      private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
@@ -101,11 +111,9 @@ public class LlapInputFormat
      private boolean isDone = false, isClosed = false;
      private ConsumerFeedback<ColumnVectorBatch> feedback;

- public LlapRecordReader(
- JobConf job, FileSplit split, List<Integer> includedCols, ColumnVectorProducer<?> cvp) {
+ public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols) {
        this.split = split;
        this.columnIds = includedCols;
- this.cvp = cvp;
        this.sarg = SearchArgumentFactory.createFromConf(job);
        this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
        try {
@@ -126,7 +134,7 @@ public class LlapInputFormat
          // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
          if (isFirst) {
            rbCtx.addPartitionColsToBatch(value);
- feedback = cvp.read(split, columnIds, sarg, columnNames, this);
+ startRead();
            isFirst = false;
          }
          ColumnVectorBatch cvb = nextCvb();
@@ -150,6 +158,30 @@ public class LlapInputFormat
        return true;
      }

+
+ private final class UncaughtErrorHandler implements FutureCallback<Void> {
+ @Override
+ public void onSuccess(Void result) {
+ // Successful execution of reader is supposed to call setDone.
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Reader is not supposed to throw AFTER calling setError.
+ LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage());
+ setError(t);
+ }
+ }
+
+ private void startRead() {
+ // Create the consumer of encoded data; it will coordinate decoding to CVBs.
+ ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames);
+ feedback = rp;
+ ListenableFuture<Void> future = executor.submit(rp.getReadCallable());
+ // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases.
+ Futures.addCallback(future, new UncaughtErrorHandler());
+ }
+
      ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
        // TODO: if some collection is needed, return previous ColumnVectorBatch here
        ColumnVectorBatch current = null;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Tue Mar 3 02:58:22 2015
@@ -19,7 +19,6 @@
  package org.apache.hadoop.hive.llap.io.api.impl;

  import java.io.IOException;
-import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;

  import org.apache.commons.logging.Log;
@@ -31,7 +30,8 @@ import org.apache.hadoop.hive.llap.cache
  import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
  import org.apache.hadoop.hive.llap.cache.Cache;
  import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
-import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicyBase;
+import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
+import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
  import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
  import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
  import org.apache.hadoop.hive.llap.cache.NoopCache;
@@ -39,17 +39,20 @@ import org.apache.hadoop.hive.llap.io.ap
  import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
  import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
  import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
-import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
+import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  import org.apache.hadoop.io.NullWritable;
  import org.apache.hadoop.mapred.InputFormat;

+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
  public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
    public static final Log LOG = LogFactory.getLog(LlapIoImpl.class);
    public static final LogLevels LOGL = new LogLevels(LOG);

- private final OrcColumnVectorProducer cvp;
- private final OrcEncodedDataProducer edp;
+ private final ColumnVectorProducer cvp;
+ private final ListeningExecutorService executor;

    private LlapIoImpl(Configuration conf) throws IOException {
      boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
@@ -58,31 +61,39 @@ public class LlapIoImpl implements LlapI
        LOG.info("Initializing LLAP IO" + (useLowLevelCache ? " with low level cache" : ""));
      }
      Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
- LowLevelCacheImpl orcCache = null;
- if (useLowLevelCache) {
- boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
- LowLevelCachePolicyBase cachePolicy =
- useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
- Allocator allocator = new BuddyAllocator(conf, cachePolicy);
- orcCache = new LowLevelCacheImpl(conf, cachePolicy, allocator);
- }
- // TODO: arbitrary thread pool
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
+ LowLevelCacheImpl orcCache = createLowLevelCache(conf, useLowLevelCache);
+ OrcMetadataCache metadataCache = OrcMetadataCache.getInstance();
+ // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?)
+ executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
+
      // TODO: this should depends on input format and be in a map, or something.
- this.edp = new OrcEncodedDataProducer(orcCache, cache, conf);
- this.cvp = new OrcColumnVectorProducer(threadPool, edp, conf);
+ this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf);
      if (LOGL.isInfoEnabled()) {
        LOG.info("LLAP IO initialized");
      }
    }

+ private LowLevelCacheImpl createLowLevelCache(Configuration conf, boolean useLowLevelCache) {
+ if (!useLowLevelCache) return null;
+ boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
+ LowLevelCachePolicy cachePolicy =
+ useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
+ // Memory manager uses cache policy to trigger evictions.
+ LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(conf, cachePolicy);
+ // Allocator uses memory manager to request memory.
+ Allocator allocator = new BuddyAllocator(conf, memManager);
+ // Cache uses allocator to allocate and deallocate.
+ LowLevelCacheImpl orcCache = new LowLevelCacheImpl(conf, cachePolicy, allocator);
+ // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
+ cachePolicy.setEvictionListener(orcCache);
+ orcCache.init();
+ return orcCache;
+ }
+
+ @SuppressWarnings("rawtypes")
    @Override
    public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
        InputFormat sourceInputFormat) {
- return new LlapInputFormat(this, sourceInputFormat);
- }
-
- public ColumnVectorProducer<?> getCvp() {
- return cvp;
+ return new LlapInputFormat(sourceInputFormat, cvp, executor);
    }
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Tue Mar 3 02:58:22 2015
@@ -18,88 +18,17 @@

  package org.apache.hadoop.hive.llap.io.decode;

-import java.io.IOException;
  import java.util.List;
-import java.util.concurrent.ExecutorService;

  import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
  import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
-import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
-import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.mapred.InputSplit;

-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/** Middle layer - gets encoded blocks, produces proto-VRBs */
-public abstract class ColumnVectorProducer<BatchKey> {
- private final ListeningExecutorService executor;
-
- public ColumnVectorProducer(ExecutorService executor) {
- this.executor = (executor instanceof ListeningExecutorService) ?
- (ListeningExecutorService)executor : MoreExecutors.listeningDecorator(executor);
- }
-
- private final class UncaughtErrorHandler implements FutureCallback<Void> {
- private final EncodedDataConsumer<BatchKey> edc;
-
- private UncaughtErrorHandler(EncodedDataConsumer edc) {
- this.edc = edc;
- }
-
- @Override
- public void onSuccess(Void result) {
- // Successful execution of reader is supposed to call setDone.
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Reader is not supposed to throw AFTER calling setError.
- LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage());
- edc.setError(t);
- }
- }
-
- /**
- * Reads ColumnVectorBatch-es.
- * @param consumer Consumer that will receive the batches asynchronously.
- * @return Feedback that can be used to stop reading, and should be used
- * to return consumed batches.
- * @throws IOException
- */
- public ConsumerFeedback<ColumnVectorBatch> read(InputSplit split, List<Integer> columnIds,
- SearchArgument sarg, String[] columnNames, Consumer<ColumnVectorBatch> consumer)
- throws IOException {
- // Get the source of encoded data.
- EncodedDataProducer<BatchKey> edp = getEncodedDataProducer();
- // Create the consumer of encoded data; it will coordinate decoding to CVBs.
- final EncodedDataConsumer<BatchKey> edc = createConsumer(this, consumer, columnIds.size());
- // Then, get the specific reader of encoded data out of the producer.
- EncodedDataReader<BatchKey> reader = edp.createReader(
- split, columnIds, sarg, columnNames, edc);
- // Set the encoded data reader as upstream feedback for encoded data consumer, and start.
- edc.init(reader);
- // This is where we send execution on separate thread; the only threading boundary for now.
- // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases.
- UncaughtErrorHandler errorHandler = new UncaughtErrorHandler(edc);
- ListenableFuture<Void> future = executor.submit(reader);
- Futures.addCallback(future, errorHandler);
- return edc;
- }
-
- protected abstract EncodedDataConsumer<BatchKey> createConsumer(
- ColumnVectorProducer<BatchKey> cvp, Consumer<ColumnVectorBatch> consumer, int size);
-
- protected abstract EncodedDataProducer<BatchKey> getEncodedDataProducer();
-
- protected abstract void decodeBatch(EncodedDataConsumer<BatchKey> context,
- EncodedColumnBatch<BatchKey> batch, Consumer<ColumnVectorBatch> downstreamConsumer);
-}
+/**
+ * Entry point used by LlapInputFormat to create read pipeline to get data.
+ */
+public interface ColumnVectorProducer {
+ ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, InputSplit split,
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames);
+}
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java Tue Mar 3 02:58:22 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.io.d
  import java.util.ArrayList;
  import java.util.HashMap;
  import java.util.List;
+import java.util.concurrent.Callable;

  import org.apache.hadoop.hive.llap.Consumer;
  import org.apache.hadoop.hive.llap.ConsumerFeedback;
@@ -30,25 +31,30 @@ import org.apache.hadoop.hive.llap.io.ap
  /**
   *
   */
-public abstract class EncodedDataConsumer<BatchKey> implements ConsumerFeedback<ColumnVectorBatch>,
- Consumer<EncodedColumnBatch<BatchKey>> {
+public abstract class EncodedDataConsumer<BatchKey> implements
+ Consumer<EncodedColumnBatch<BatchKey>>, ReadPipeline {
    private volatile boolean isStopped = false;
    // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
    private final HashMap<BatchKey, EncodedColumnBatch<BatchKey>> pendingData = new HashMap<>();
    private ConsumerFeedback<EncodedColumnBatch.StreamBuffer> upstreamFeedback;
    private final Consumer<ColumnVectorBatch> downstreamConsumer;
+ private Callable<Void> readCallable;
    private final int colCount;
- private ColumnVectorProducer<BatchKey> cvp;

- public EncodedDataConsumer(ColumnVectorProducer<BatchKey> cvp,
- Consumer<ColumnVectorBatch> consumer, int colCount) {
+ public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, int colCount) {
      this.downstreamConsumer = consumer;
      this.colCount = colCount;
- this.cvp = cvp;
    }

- public void init(ConsumerFeedback<EncodedColumnBatch.StreamBuffer> upstreamFeedback) {
+ public void init(ConsumerFeedback<EncodedColumnBatch.StreamBuffer> upstreamFeedback,
+ Callable<Void> readCallable) {
      this.upstreamFeedback = upstreamFeedback;
+ this.readCallable = readCallable;
+ }
+
+ @Override
+ public Callable<Void> getReadCallable() {
+ return readCallable;
    }

    @Override
@@ -92,12 +98,15 @@ public abstract class EncodedDataConsume
        return;
      }
      if (0 == targetBatch.colsRemaining) {
- cvp.decodeBatch(this, targetBatch, downstreamConsumer);
+ decodeBatch(targetBatch, downstreamConsumer);
        // Batch has been decoded; unlock the buffers in cache
        returnProcessed(targetBatch.columnData);
      }
    }

+ protected abstract void decodeBatch(EncodedColumnBatch<BatchKey> batch,
+ Consumer<ColumnVectorBatch> downstreamConsumer);
+
    protected void returnProcessed(EncodedColumnBatch.StreamBuffer[][] data) {
      for (EncodedColumnBatch.StreamBuffer[] sbs : data) {
        for (EncodedColumnBatch.StreamBuffer sb : sbs) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java Tue Mar 3 02:58:22 2015
@@ -18,436 +18,52 @@

  package org.apache.hadoop.hive.llap.io.decode;

-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
+import java.util.List;

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
  import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
  import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BooleanStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ByteStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.CharacterStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DateStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DecimalStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DoubleStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.FloatStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.IntStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.LongStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ShortStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.StringStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.TimestampStreamReader;
-import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
-import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
-import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
+import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
  import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.mapred.InputSplit;

-public class OrcColumnVectorProducer extends ColumnVectorProducer<OrcBatchKey> {
- private final OrcEncodedDataProducer _edp;
- private final OrcMetadataCache _metadataCache;
- private boolean _skipCorrupt;
+public class OrcColumnVectorProducer implements ColumnVectorProducer {

- public OrcColumnVectorProducer(ExecutorService executor, OrcEncodedDataProducer edp,
- Configuration conf) {
- super(executor);
+ private final OrcMetadataCache metadataCache;
+ private final Cache<OrcCacheKey> cache;
+ private final LowLevelCache lowLevelCache;
+ private final Configuration conf;
+ private boolean _skipCorrupt; // TODO: get rid of this
+
+ public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
+ LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf) {
      if (LlapIoImpl.LOGL.isInfoEnabled()) {
        LlapIoImpl.LOG.info("Initializing ORC column vector producer");
      }

- this._edp = edp;
- this._metadataCache = OrcMetadataCache.getInstance();
+ this.metadataCache = metadataCache;
+ this.lowLevelCache = lowLevelCache;
+ this.cache = cache;
+ this.conf = conf;
      this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
    }

    @Override
- protected EncodedDataProducer<OrcBatchKey> getEncodedDataProducer() {
- return _edp;
+ public ReadPipeline createReadPipeline(
+ Consumer<ColumnVectorBatch> consumer, InputSplit split,
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames) {
+ OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
+ consumer, columnIds.size(), _skipCorrupt);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(
+ lowLevelCache, cache, metadataCache, conf, split, columnIds, sarg, columnNames, edc);
+ edc.init(reader, reader);
+ return edc;
    }
-
- @Override
- protected void decodeBatch(EncodedDataConsumer<OrcBatchKey> context,
- EncodedColumnBatch<OrcBatchKey> batch,
- Consumer<ColumnVectorBatch> downstreamConsumer) {
- OrcEncodedDataConsumer oedc = (OrcEncodedDataConsumer)context;
- String fileName = batch.batchKey.file;
- int currentStripeIndex = batch.batchKey.stripeIx;
- if (oedc.getPreviousStripeIndex() == -1) {
- oedc.setPreviousStripeIndex(currentStripeIndex);
- }
- boolean sameStripe = currentStripeIndex == oedc.getPreviousStripeIndex();
-
- // OrcEncodedDataProducer should have just loaded cache entries from this file.
- // The default LRU algorithm shouldn't have dropped the entries. To make it
- // safe, untie the code from EDP into separate class and make use of loading cache. The current
- // assumption is that entries for the current file exists in metadata cache.
- try {
- OrcFileMetadata fileMetadata = _metadataCache.getFileMetadata(fileName);
- OrcBatchKey stripeKey = batch.batchKey.clone();
-
- // To get stripe metadata we only need to know the stripe number. Oddly, stripe metadata
- // accepts BatchKey as key. We need to keep the row group index in batch key the same to
- // retrieve the stripe metadata properly. To make sure we get the correct stripe
- // metadata, set row group index to 0. That's how it is cached. See OrcEncodedDataProducer
- stripeKey.rgIx = 0;
- OrcStripeMetadata stripeMetadata = _metadataCache.getStripeMetadata(stripeKey);
-
- // Get non null row count from root column, to get max vector batches
- int rgIdx = batch.batchKey.rgIx;
- OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[0].getEntry(rgIdx);
- long nonNullRowCount = getRowCount(rowIndex);
- int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
- int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
- int numCols = batch.columnIxs.length;
- if (oedc.getColumnReaders() == null || !sameStripe) {
- RecordReaderImpl.TreeReader[] columnReaders = createTreeReaders(numCols, batch,
- fileMetadata, stripeMetadata);
- oedc.setColumnReaders(columnReaders);
- } else {
- repositionInStreams(oedc.getColumnReaders(), batch, sameStripe, numCols, fileMetadata,
- stripeMetadata);
- }
- oedc.setPreviousStripeIndex(currentStripeIndex);
-
- for (int i = 0; i < maxBatchesRG; i++) {
- ColumnVectorBatch cvb = new ColumnVectorBatch(batch.columnIxs.length);
-
- // for last batch in row group, adjust the batch size
- if (i == maxBatchesRG - 1) {
- batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
- cvb.size = batchSize;
- }
-
- for (int idx = 0; idx < batch.columnIxs.length; idx++) {
- cvb.cols[idx] = (ColumnVector) oedc.getColumnReaders()[idx].nextVector(null, batchSize);
- }
-
- // we are done reading a batch, send it to consumer for processing
- downstreamConsumer.consumeData(cvb);
- }
- } catch (IOException | CloneNotSupportedException e) {
- downstreamConsumer.setError(e);
- }
- }
-
- private RecordReaderImpl.TreeReader[] createTreeReaders(int numCols,
- EncodedColumnBatch<OrcBatchKey> batch,
- OrcFileMetadata fileMetadata,
- OrcStripeMetadata stripeMetadata) throws IOException {
- String file = batch.batchKey.file;
- RecordReaderImpl.TreeReader[] treeReaders = new RecordReaderImpl.TreeReader[numCols];
-
- for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.columnIxs[i];
- int rowGroupIndex = batch.batchKey.rgIx;
- EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
- OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex);
-
- // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
- // But we need to know if the original data is compressed or not. This is used to skip
- // positions in row index properly. If the file is originally compressed,
- // then 1st position (compressed offset) in row index should be skipped to get
- // uncompressed offset, else 1st position should not be skipped.
- CompressionCodec codec = fileMetadata.getCompressionCodec();
- OrcProto.ColumnEncoding columnEncoding = stripeMetadata.getEncodings().get(columnIndex);
- OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
- OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
-
- // stream buffers are arranged in enum order of stream kind
- EncodedColumnBatch.StreamBuffer present = null;
- EncodedColumnBatch.StreamBuffer data = null;
- EncodedColumnBatch.StreamBuffer dictionary = null;
- EncodedColumnBatch.StreamBuffer lengths = null;
- EncodedColumnBatch.StreamBuffer secondary = null;
- for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
- switch(streamBuffer.streamKind) {
- case 0:
- // PRESENT stream
- present = streamBuffer;
- break;
- case 1:
- // DATA stream
- data = streamBuffer;
- break;
- case 2:
- // LENGTH stream
- lengths = streamBuffer;
- break;
- case 3:
- // DICTIONARY_DATA stream
- dictionary = streamBuffer;
- break;
- case 5:
- // SECONDARY stream
- secondary = streamBuffer;
- break;
- default:
- throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
- }
- }
-
- switch (columnType.getKind()) {
- case BINARY:
- treeReaders[i] = BinaryStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case BOOLEAN:
- treeReaders[i] = BooleanStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case BYTE:
- treeReaders[i] = ByteStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case SHORT:
- treeReaders[i] = ShortStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case INT:
- treeReaders[i] = IntStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case LONG:
- treeReaders[i] = LongStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .skipCorrupt(_skipCorrupt)
- .build();
- break;
- case FLOAT:
- treeReaders[i] = FloatStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case DOUBLE:
- treeReaders[i] = DoubleStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case CHAR:
- case VARCHAR:
- treeReaders[i] = CharacterStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setMaxLength(columnType.getMaximumLength())
- .setCharacterType(columnType)
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setDictionaryStream(dictionary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case STRING:
- treeReaders[i] = StringStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setDictionaryStream(dictionary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case DECIMAL:
- treeReaders[i] = DecimalStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPrecision(columnType.getPrecision())
- .setScale(columnType.getScale())
- .setPresentStream(present)
- .setValueStream(data)
- .setScaleStream(secondary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case TIMESTAMP:
- treeReaders[i] = TimestampStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setSecondsStream(data)
- .setNanosStream(secondary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .skipCorrupt(_skipCorrupt)
- .build();
- break;
- case DATE:
- treeReaders[i] = DateStreamReader.builder()
- .setFileName(file)
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- default:
- throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
- }
- treeReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry));
- }
- return treeReaders;
- }
-
- private void repositionInStreams(RecordReaderImpl.TreeReader[] columnReaders,
- EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe, int numCols,
- OrcFileMetadata fileMetadata, OrcStripeMetadata stripeMetadata) throws IOException {
- for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.columnIxs[i];
- int rowGroupIndex = batch.batchKey.rgIx;
- EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
- OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex);
- OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
- OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
-
- // stream buffers are arranged in enum order of stream kind
- EncodedColumnBatch.StreamBuffer present = null;
- EncodedColumnBatch.StreamBuffer data = null;
- EncodedColumnBatch.StreamBuffer dictionary = null;
- EncodedColumnBatch.StreamBuffer lengths = null;
- EncodedColumnBatch.StreamBuffer secondary = null;
- for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
- switch(streamBuffer.streamKind) {
- case 0:
- // PRESENT stream
- present = streamBuffer;
- break;
- case 1:
- // DATA stream
- data = streamBuffer;
- break;
- case 2:
- // LENGTH stream
- lengths = streamBuffer;
- break;
- case 3:
- // DICTIONARY_DATA stream
- dictionary = streamBuffer;
- break;
- case 5:
- // SECONDARY stream
- secondary = streamBuffer;
- break;
- default:
- throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
- }
- }
-
- switch (columnType.getKind()) {
- case BINARY:
- ((BinaryStreamReader)columnReaders[i]).setBuffers(present, data, lengths);
- break;
- case BOOLEAN:
- ((BooleanStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case BYTE:
- ((ByteStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case SHORT:
- ((ShortStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case INT:
- ((IntStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case LONG:
- ((LongStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case FLOAT:
- ((FloatStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case DOUBLE:
- ((DoubleStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- case CHAR:
- case VARCHAR:
- ((CharacterStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary,
- sameStripe);
- break;
- case STRING:
- ((StringStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary,
- sameStripe);
- break;
- case DECIMAL:
- ((DecimalStreamReader)columnReaders[i]).setBuffers(present, data, secondary);
- break;
- case TIMESTAMP:
- ((TimestampStreamReader)columnReaders[i]).setBuffers(present, data, secondary);
- break;
- case DATE:
- ((DateStreamReader)columnReaders[i]).setBuffers(present, data);
- break;
- default:
- throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
- }
-
- columnReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry));
- }
- }
-
- private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) {
- return rowIndexEntry.getStatistics().getNumberOfValues();
- }
-
- @Override
- protected EncodedDataConsumer<OrcBatchKey> createConsumer(ColumnVectorProducer<OrcBatchKey> cvp,
- Consumer<ColumnVectorBatch> consumer, int colCount) {
- return new OrcEncodedDataConsumer(cvp, consumer, colCount);
- }
-
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java Tue Mar 3 02:58:22 2015
@@ -17,34 +17,408 @@
   */
  package org.apache.hadoop.hive.llap.io.decode;

+import java.io.IOException;
+
  import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
  import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
  import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BooleanStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ByteStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.CharacterStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DateStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DecimalStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DoubleStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.FloatStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.IntStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.LongStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ShortStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.StringStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.TimestampStreamReader;
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;

  public class OrcEncodedDataConsumer extends EncodedDataConsumer<OrcBatchKey> {
- private int previousStripeIndex;
    private RecordReaderImpl.TreeReader[] columnReaders;
+ private int previousStripeIndex = -1;
+ private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
+ private OrcStripeMetadata[] stripes;
+ private final boolean skipCorrupt; // TODO: get rid of this

    public OrcEncodedDataConsumer(
- ColumnVectorProducer<OrcBatchKey> cvp, Consumer<ColumnVectorBatch> consumer, int colCount) {
- super(cvp, consumer, colCount);
- this.previousStripeIndex = -1;
+ Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt) {
+ super(consumer, colCount);
+ this.skipCorrupt = skipCorrupt;
+ }
+
+ public void setFileMetadata(OrcFileMetadata f) {
+ assert fileMetadata == null;
+ fileMetadata = f;
+ stripes = new OrcStripeMetadata[f.getStripes().size()];
+ }
+
+ public void setStripeMetadata(OrcStripeMetadata m) {
+ assert stripes != null;
+ LlapIoImpl.LOG.error("TODO# got metadata " + m.getStripeIx());
+ stripes[m.getStripeIx()] = m;
    }

- public int getPreviousStripeIndex() {
- return previousStripeIndex;
+ @Override
+ protected void decodeBatch(EncodedColumnBatch<OrcBatchKey> batch,
+ Consumer<ColumnVectorBatch> downstreamConsumer) {
+ int currentStripeIndex = batch.batchKey.stripeIx;
+ LlapIoImpl.LOG.error("TODO# got data " + currentStripeIndex);
+
+ boolean sameStripe = currentStripeIndex == previousStripeIndex;
+
+ try {
+ OrcStripeMetadata stripeMetadata = stripes[currentStripeIndex];
+ // Get non null row count from root column, to get max vector batches
+ int rgIdx = batch.batchKey.rgIx;
+ OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[0].getEntry(rgIdx);
+ long nonNullRowCount = getRowCount(rowIndex);
+ int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
+ int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
+ int numCols = batch.columnIxs.length;
+ if (columnReaders == null || !sameStripe) {
+ this.columnReaders = createTreeReaders(numCols, batch, fileMetadata, stripeMetadata);
+ } else {
+ repositionInStreams(this.columnReaders, batch, sameStripe, numCols, fileMetadata,
+ stripeMetadata);
+ }
+ previousStripeIndex = currentStripeIndex;
+
+ for (int i = 0; i < maxBatchesRG; i++) {
+ ColumnVectorBatch cvb = new ColumnVectorBatch(batch.columnIxs.length);
+
+ // for last batch in row group, adjust the batch size
+ if (i == maxBatchesRG - 1) {
+ batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
+ cvb.size = batchSize;
+ }
+
+ for (int idx = 0; idx < batch.columnIxs.length; idx++) {
+ cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(null, batchSize);
+ }
+
+ // we are done reading a batch, send it to consumer for processing
+ downstreamConsumer.consumeData(cvb);
+ }
+ } catch (IOException e) {
+ // Caller will return the batch.
+ downstreamConsumer.setError(e);
+ }
    }

- public void setPreviousStripeIndex(int previousStripeIndex) {
- this.previousStripeIndex = previousStripeIndex;
+ RecordReaderImpl.TreeReader[] createTreeReaders(int numCols,
+ EncodedColumnBatch<OrcBatchKey> batch,
+ OrcFileMetadata fileMetadata,
+ OrcStripeMetadata stripeMetadata) throws IOException {
+ String file = batch.batchKey.file;
+ RecordReaderImpl.TreeReader[] treeReaders = new RecordReaderImpl.TreeReader[numCols];
+
+ for (int i = 0; i < numCols; i++) {
+ int columnIndex = batch.columnIxs[i];
+ int rowGroupIndex = batch.batchKey.rgIx;
+ EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+ OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex);
+
+ // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
+ // But we need to know if the original data is compressed or not. This is used to skip
+ // positions in row index properly. If the file is originally compressed,
+ // then 1st position (compressed offset) in row index should be skipped to get
+ // uncompressed offset, else 1st position should not be skipped.
+ CompressionCodec codec = fileMetadata.getCompressionCodec();
+ OrcProto.ColumnEncoding columnEncoding = stripeMetadata.getEncodings().get(columnIndex);
+ OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
+ OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
+
+ // stream buffers are arranged in enum order of stream kind
+ EncodedColumnBatch.StreamBuffer present = null;
+ EncodedColumnBatch.StreamBuffer data = null;
+ EncodedColumnBatch.StreamBuffer dictionary = null;
+ EncodedColumnBatch.StreamBuffer lengths = null;
+ EncodedColumnBatch.StreamBuffer secondary = null;
+ for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
+ switch(streamBuffer.streamKind) {
+ case 0:
+ // PRESENT stream
+ present = streamBuffer;
+ break;
+ case 1:
+ // DATA stream
+ data = streamBuffer;
+ break;
+ case 2:
+ // LENGTH stream
+ lengths = streamBuffer;
+ break;
+ case 3:
+ // DICTIONARY_DATA stream
+ dictionary = streamBuffer;
+ break;
+ case 5:
+ // SECONDARY stream
+ secondary = streamBuffer;
+ break;
+ default:
+ throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+ }
+ }
+
+ switch (columnType.getKind()) {
+ case BINARY:
+ treeReaders[i] = BinaryStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ case BOOLEAN:
+ treeReaders[i] = BooleanStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ break;
+ case BYTE:
+ treeReaders[i] = ByteStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ break;
+ case SHORT:
+ treeReaders[i] = ShortStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ case INT:
+ treeReaders[i] = IntStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ case LONG:
+ treeReaders[i] = LongStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .skipCorrupt(skipCorrupt)
+ .build();
+ break;
+ case FLOAT:
+ treeReaders[i] = FloatStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ break;
+ case DOUBLE:
+ treeReaders[i] = DoubleStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ break;
+ case CHAR:
+ case VARCHAR:
+ treeReaders[i] = CharacterStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setMaxLength(columnType.getMaximumLength())
+ .setCharacterType(columnType)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setDictionaryStream(dictionary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ case STRING:
+ treeReaders[i] = StringStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setDictionaryStream(dictionary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ case DECIMAL:
+ treeReaders[i] = DecimalStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPrecision(columnType.getPrecision())
+ .setScale(columnType.getScale())
+ .setPresentStream(present)
+ .setValueStream(data)
+ .setScaleStream(secondary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ case TIMESTAMP:
+ treeReaders[i] = TimestampStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setSecondsStream(data)
+ .setNanosStream(secondary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .skipCorrupt(skipCorrupt)
+ .build();
+ break;
+ case DATE:
+ treeReaders[i] = DateStreamReader.builder()
+ .setFileName(file)
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ break;
+ default:
+ throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
+ }
+ treeReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry));
+ }
+ return treeReaders;
    }

- public void setColumnReaders(RecordReaderImpl.TreeReader[] columnReaders) {
- this.columnReaders = columnReaders;
+ private void repositionInStreams(RecordReaderImpl.TreeReader[] columnReaders,
+ EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe, int numCols,
+ OrcFileMetadata fileMetadata, OrcStripeMetadata stripeMetadata) throws IOException {
+ for (int i = 0; i < numCols; i++) {
+ int columnIndex = batch.columnIxs[i];
+ int rowGroupIndex = batch.batchKey.rgIx;
+ EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+ OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex);
+ OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
+ OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
+
+ // stream buffers are arranged in enum order of stream kind
+ EncodedColumnBatch.StreamBuffer present = null;
+ EncodedColumnBatch.StreamBuffer data = null;
+ EncodedColumnBatch.StreamBuffer dictionary = null;
+ EncodedColumnBatch.StreamBuffer lengths = null;
+ EncodedColumnBatch.StreamBuffer secondary = null;
+ for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
+ switch(streamBuffer.streamKind) {
+ case 0:
+ // PRESENT stream
+ present = streamBuffer;
+ break;
+ case 1:
+ // DATA stream
+ data = streamBuffer;
+ break;
+ case 2:
+ // LENGTH stream
+ lengths = streamBuffer;
+ break;
+ case 3:
+ // DICTIONARY_DATA stream
+ dictionary = streamBuffer;
+ break;
+ case 5:
+ // SECONDARY stream
+ secondary = streamBuffer;
+ break;
+ default:
+ throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+ }
+ }
+
+ switch (columnType.getKind()) {
+ case BINARY:
+ ((BinaryStreamReader)columnReaders[i]).setBuffers(present, data, lengths);
+ break;
+ case BOOLEAN:
+ ((BooleanStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case BYTE:
+ ((ByteStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case SHORT:
+ ((ShortStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case INT:
+ ((IntStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case LONG:
+ ((LongStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case FLOAT:
+ ((FloatStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case DOUBLE:
+ ((DoubleStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ case CHAR:
+ case VARCHAR:
+ ((CharacterStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary,
+ sameStripe);
+ break;
+ case STRING:
+ ((StringStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary,
+ sameStripe);
+ break;
+ case DECIMAL:
+ ((DecimalStreamReader)columnReaders[i]).setBuffers(present, data, secondary);
+ break;
+ case TIMESTAMP:
+ ((TimestampStreamReader)columnReaders[i]).setBuffers(present, data, secondary);
+ break;
+ case DATE:
+ ((DateStreamReader)columnReaders[i]).setBuffers(present, data);
+ break;
+ default:
+ throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
+ }
+
+ columnReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry));
+ }
    }

- public RecordReaderImpl.TreeReader[] getColumnReaders() {
- return columnReaders;
+ private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) {
+ return rowIndexEntry.getStatistics().getNumberOfValues();
    }
  }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java Tue Mar 3 02:58:22 2015
@@ -26,7 +26,7 @@ import com.google.common.cache.Cache;
  import com.google.common.cache.CacheBuilder;

  /**
- * ORC-specific metadata cache.
+ * TODO#: doc ORC-specific metadata cache.
   * TODO: should be merged with main cache somehow if we find this takes too much memory
   */
  public class OrcMetadataCache {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Tue Mar 3 02:58:22 2015
@@ -29,12 +29,14 @@ import org.apache.hadoop.hive.ql.io.orc.
  import org.apache.hadoop.hive.ql.io.orc.StripeInformation;

  public class OrcStripeMetadata {
- List<ColumnEncoding> encodings;
- List<Stream> streams;
- RecordReaderImpl.Index rowIndex;
+ private final int stripeIx;
+ private final List<ColumnEncoding> encodings;
+ private final List<Stream> streams;
+ private RecordReaderImpl.Index rowIndex;

- public OrcStripeMetadata(MetadataReader mr, StripeInformation stripe,
+ public OrcStripeMetadata(int stripeIx, MetadataReader mr, StripeInformation stripe,
        boolean[] includes, boolean[] sargColumns) throws IOException {
+ this.stripeIx = stripeIx;
      StripeFooter footer = mr.readStripeFooter(stripe);
      streams = footer.getStreamsList();
      encodings = footer.getColumnsList();
@@ -55,6 +57,10 @@ public class OrcStripeMetadata {
          sargColumns, rowIndex.getBloomFilterIndex());
    }

+ public int getStripeIx() {
+ return stripeIx;
+ }
+
    public RowIndex[] getRowIndexes() {
      return rowIndex.getRowGroupIndex();
    }
@@ -67,16 +73,8 @@ public class OrcStripeMetadata {
      return encodings;
    }

- public void setEncodings(List<ColumnEncoding> encodings) {
- this.encodings = encodings;
- }
-
    public List<Stream> getStreams() {
      return streams;
    }

- public void setStreams(List<Stream> streams) {
- this.streams = streams;
- }
-
  }

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Tue Mar 3 02:58:22 2015
@@ -63,9 +63,8 @@ public class TestLowLevelCacheImpl {
      }
    }

- private static class DummyCachePolicy extends LowLevelCachePolicyBase {
- public DummyCachePolicy(long maxSize) {
- super(maxSize);
+ private static class DummyCachePolicy implements LowLevelCachePolicy {
+ public DummyCachePolicy() {
      }

      public void cache(LlapCacheableBuffer buffer) {
@@ -77,16 +76,19 @@ public class TestLowLevelCacheImpl {
      public void notifyUnlock(LlapCacheableBuffer buffer) {
      }

- protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+ public long evictSomeBlocks(long memoryToReserve) {
        return memoryToReserve;
      }
+
+ public void setEvictionListener(EvictionListener listener) {
+ }
    }

    @Test
    public void testGetPut() {
      Configuration conf = createConf();
      LowLevelCacheImpl cache = new LowLevelCacheImpl(
- conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
+ conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread
      String fn1 = "file1".intern(), fn2 = "file2".intern();
      LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
      verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
@@ -144,7 +146,7 @@ public class TestLowLevelCacheImpl {
    public void testMultiMatch() {
      Configuration conf = createConf();
      LowLevelCacheImpl cache = new LowLevelCacheImpl(
- conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
+ conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread
      String fn = "file1".intern();
      LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() };
      assertNull(cache.putFileData(fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0));
@@ -162,7 +164,7 @@ public class TestLowLevelCacheImpl {
    public void testStaleValueGet() {
      Configuration conf = createConf();
      LowLevelCacheImpl cache = new LowLevelCacheImpl(
- conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
+ conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread
      String fn1 = "file1".intern(), fn2 = "file2".intern();
      LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() };
      assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0));
@@ -181,7 +183,7 @@ public class TestLowLevelCacheImpl {
    public void testStaleValueReplace() {
      Configuration conf = createConf();
      LowLevelCacheImpl cache = new LowLevelCacheImpl(
- conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
+ conf, new DummyCachePolicy(), new DummyAllocator(), -1); // no cleanup thread
      String fn1 = "file1".intern(), fn2 = "file2".intern();
      LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] {
          fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
@@ -200,7 +202,7 @@ public class TestLowLevelCacheImpl {
    public void testMTTWithCleanup() {
      Configuration conf = createConf();
      final LowLevelCacheImpl cache = new LowLevelCacheImpl(
- conf, new DummyCachePolicy(10), new DummyAllocator(), 1);
+ conf, new DummyCachePolicy(), new DummyAllocator(), 1);
      final String fn1 = "file1".intern(), fn2 = "file2".intern();
      final int offsetsToUse = 8;
      final CountDownLatch cdlIn = new CountDownLatch(4), cdlOut = new CountDownLatch(1);

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1663518&r1=1663517&r2=1663518&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Tue Mar 3 02:58:22 2015
@@ -72,10 +72,11 @@ public class TestLowLevelLrfuCachePolicy
      conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
      EvictionTracker et = new EvictionTracker();
      LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf);
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lfu);
      lfu.setEvictionListener(et);
      for (int i = 0; i < heapSize; ++i) {
        LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(lfu, et, buffer));
+ assertTrue(cache(mm, lfu, et, buffer));
        inserted.add(buffer);
      }
      Collections.shuffle(inserted, rdm);
@@ -87,7 +88,7 @@ public class TestLowLevelLrfuCachePolicy
          lfu.notifyUnlock(inserted.get(i));
        }
      }
- verifyOrder(lfu, et, inserted);
+ verifyOrder(mm, lfu, et, inserted);
    }

    private Configuration createConf(int min, int heapSize) {
@@ -107,10 +108,11 @@ public class TestLowLevelLrfuCachePolicy
      conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
      EvictionTracker et = new EvictionTracker();
      LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf);
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lru);
      lru.setEvictionListener(et);
      for (int i = 0; i < heapSize; ++i) {
        LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(lru, et, buffer));
+ assertTrue(cache(mm, lru, et, buffer));
        inserted.add(buffer);
      }
      Collections.shuffle(inserted, rdm);
@@ -121,7 +123,7 @@ public class TestLowLevelLrfuCachePolicy
          lru.notifyUnlock(inserted.get(i));
        }
      }
- verifyOrder(lru, et, inserted);
+ verifyOrder(mm, lru, et, inserted);
    }

    @Test
@@ -130,17 +132,19 @@ public class TestLowLevelLrfuCachePolicy
      LOG.info("Testing deadlock resolution");
      ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
      EvictionTracker et = new EvictionTracker();
- LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(createConf(1, heapSize));
+ Configuration conf = createConf(1, heapSize);
+ LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf);
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu);
      lrfu.setEvictionListener(et);
      for (int i = 0; i < heapSize; ++i) {
        LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(lrfu, et, buffer));
+ assertTrue(cache(mm, lrfu, et, buffer));
        inserted.add(buffer);
      }
      // Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
      LlapCacheableBuffer locked = inserted.get(0);
      lock(lrfu, locked);
- lrfu.reserveMemory(1, false);
+ mm.reserveMemory(1, false);
      LlapCacheableBuffer evicted = et.evicted.get(0);
      assertNotNull(evicted);
      assertTrue(evicted.isInvalid());
@@ -149,9 +153,9 @@ public class TestLowLevelLrfuCachePolicy
    }

    // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
- public boolean cache(
+ public boolean cache(LowLevelCacheMemoryManager mm,
        LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapCacheableBuffer buffer) {
- if (!lrfu.reserveMemory(1, false)) {
+ if (!mm.reserveMemory(1, false)) {
        return false;
      }
      buffer.incRef();
@@ -185,6 +189,7 @@ public class TestLowLevelLrfuCachePolicy
      conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
      EvictionTracker et = new EvictionTracker();
      LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf);
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu);
      lrfu.setEvictionListener(et);
      // Insert the number of elements plus 2, to trigger 2 evictions.
      int toEvict = 2;
@@ -193,7 +198,7 @@ public class TestLowLevelLrfuCachePolicy
      Assume.assumeTrue(toEvict <= heapSize);
      for (int i = 0; i < heapSize + toEvict; ++i) {
        LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(lrfu, et, buffer));
+ assertTrue(cache(mm, lrfu, et, buffer));
        LlapCacheableBuffer evictedBuf = getOneEvictedBuffer(et);
        if (i < toEvict) {
          evicted[i] = buffer;
@@ -215,7 +220,7 @@ public class TestLowLevelLrfuCachePolicy
      for (LlapCacheableBuffer buf : inserted) {
        lock(lrfu, buf);
      }
- assertFalse(lrfu.reserveMemory(1, false));
+ assertFalse(mm.reserveMemory(1, false));
      if (!et.evicted.isEmpty()) {
        assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
      }
@@ -231,19 +236,19 @@ public class TestLowLevelLrfuCachePolicy
          lrfu.notifyUnlock(buf);
        }
      }
- verifyOrder(lrfu, et, inserted);
+ verifyOrder(mm, lrfu, et, inserted);
    }

- private void verifyOrder(LowLevelLrfuCachePolicy lrfu,
+ private void verifyOrder(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu,
        EvictionTracker et, ArrayList<LlapCacheableBuffer> inserted) {
      LlapCacheableBuffer block;
      // Evict all blocks.
      et.evicted.clear();
      for (int i = 0; i < inserted.size(); ++i) {
- assertTrue(lrfu.reserveMemory(1, false));
+ assertTrue(mm.reserveMemory(1, false));
      }
      // The map should now be empty.
- assertFalse(lrfu.reserveMemory(1, false));
+ assertFalse(mm.reserveMemory(1, false));
      for (int i = 0; i < inserted.size(); ++i) {
        block = et.evicted.get(i);
        assertTrue(block.isInvalid());

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 3, '15 at 2:58a
activeMar 3, '15 at 2:58a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase