FAQ
Author: sershe
Date: Wed Feb 11 01:44:14 2015
New Revision: 1658861

URL: http://svn.apache.org/r1658861
Log:
HIVE-9418p8 : Refactor code out of RecordReader

Added:
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
Modified:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
     hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1658861&r1=1658860&r2=1658861&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Wed Feb 11 01:44:14 2015
@@ -39,12 +39,13 @@ import org.apache.hadoop.hive.llap.io.ap
  import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
  import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
  import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
  import org.apache.hadoop.hive.ql.io.orc.OrcFile;
  import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
  import org.apache.hadoop.hive.ql.io.orc.Reader;
-import org.apache.hadoop.hive.ql.io.orc.RecordReader;
  import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.mapred.FileSplit;
@@ -70,6 +71,7 @@ public class OrcEncodedDataProducer impl
      // Read state.
      private int stripeIxFrom;
      private Reader orcReader;
+ private MetadataReader metadataReader;
      private final String internedFilePath;
      /**
       * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be
@@ -138,26 +140,24 @@ public class OrcEncodedDataProducer impl
        int stride = metadata.getRowIndexStride();
        ArrayList<OrcStripeMetadata> stripeMetadatas = null;
        boolean[] globalIncludes = null;
- RecordReader[] stripeReaders = null;
        try {
          globalIncludes = OrcInputFormat.genIncludedColumns(metadata.getTypes(), columnIds, true);
- stripeReaders = new RecordReader[readState.length];
          if (sarg != null && stride != 0) {
            // If SARG is present, get relevant stripe metadata from cache or readers.
- stripeMetadatas = readStripesMetadata(metadata, globalIncludes, stripeReaders);
+ stripeMetadatas = readStripesMetadata(metadata, globalIncludes);
          }

          // Now, apply SARG if any; w/o sarg, this will just initialize readState.
          determineRgsToRead(metadata.getStripes(), metadata.getTypes(),
              globalIncludes, stride, stripeMetadatas);
        } catch (Throwable t) {
- cleanupReaders(stripeReaders);
+ cleanupReaders(null);
          consumer.setError(t);
          return null;
        }

        if (isStopped) {
- cleanupReaders(stripeReaders);
+ cleanupReaders(null);
          return null;
        }

@@ -171,48 +171,54 @@ public class OrcEncodedDataProducer impl
          } catch (Throwable t) {
            // produceDataFromCache handles its own cleanup.
            consumer.setError(t);
- cleanupReaders(stripeReaders);
+ cleanupReaders(null);
            return null;
          }
        }
        // readState has been modified for column x rgs that were fetched from cache.

- // 5. Create the readers for each stripe and prepare to read. We will create reader
- // with global column list and then separately pass stripe-specific includes below.
+ // 5. Create encoded data reader.
+ ensureOrcReader();
+ // In case if we have high-level cache, we will intercept the data and add it there;
+ // otherwise just pass the data directly to the consumer.
+ Consumer<EncodedColumnBatch<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
+ EncodedReader stripeReader = null;
        try {
- for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
- List<Integer> cols = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
- stripeReaders[stripeIxMod] = prepareStripeReader(stripeIxFrom + stripeIxMod,
- metadata, globalIncludes, cols, stripeReaders[stripeIxMod]);
- }
+ orcReader.encodedReader(lowLevelCache, consumer);
        } catch (Throwable t) {
+ // produceDataFromCache handles its own cleanup.
          consumer.setError(t);
- cleanupReaders(stripeReaders);
+ cleanupReaders(null);
          return null;
        }

- // 6. We now have one reader per stripe that needs to be read. Read data.
+ // 6. Read data.
        // TODO: I/O threadpool could be here - one thread per stripe; for now, linear.
        OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
- for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
          try {
- RecordReader stripeReader = stripeReaders[stripeIxMod];
- if (stripeReader == null) continue; // No need to read this stripe, see above.
- List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
+ List<Integer> cols = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
+ if (cols != null && cols.isEmpty()) continue; // No need to read this stripe.
+ int stripeIx = stripeIxFrom + stripeIxMod;
+ StripeInformation si = metadata.getStripes().get(stripeIx);
+
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": "
+ + si.getOffset() + ", " + si.getLength());
+ }
            boolean[] stripeIncludes = null;
            boolean[][] colRgs = readState[stripeIxMod];
- int stripeIx = stripeIxMod + stripeIxFrom;

            // 6.1. Determine the columns to read (usually the same as requested).
- if (colsToRead == null || colsToRead.size() == colRgs.length) {
- colsToRead = columnIds;
+ if (cols == null || cols.size() == colRgs.length) {
+ cols = columnIds;
              stripeIncludes = globalIncludes;
            } else {
              // We are reading subset of the original columns, remove unnecessary bitmasks/etc.
              // This will never happen w/o high-level cache.
              stripeIncludes = OrcInputFormat.genIncludedColumns(
- metadata.getTypes(), colsToRead, true);
- boolean[][] colRgs2 = new boolean[colsToRead.size()][];
+ metadata.getTypes(), cols, true);
+ boolean[][] colRgs2 = new boolean[cols.size()][];
              for (int i = 0, i2 = -1; i < colRgs.length; ++i) {
                if (colRgs[i] == null) continue;
                colRgs2[i2] = colRgs[i];
@@ -229,7 +235,8 @@ public class OrcEncodedDataProducer impl
              stripeKey.stripeIx = stripeIx;
              stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
              if (stripeMetadata == null) {
- stripeMetadata = new OrcStripeMetadata(stripeReader, stripeIncludes);
+ ensureMetadataReader();
+ stripeMetadata = new OrcStripeMetadata(metadataReader, si, stripeIncludes);
                if (DebugUtils.isTraceOrcEnabled()) {
                  LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
                      + " metadata with includes: " + DebugUtils.toString(stripeIncludes));
@@ -243,24 +250,19 @@ public class OrcEncodedDataProducer impl
                LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx
                    + " metadata for includes: " + DebugUtils.toString(stripeIncludes));
              }
- updateLoadedIndexes(stripeMetadata, stripeReader, stripeIncludes);
+ ensureMetadataReader();
+ updateLoadedIndexes(stripeMetadata, si, stripeIncludes);
            }
- // Set stripe metadata externally in the reader.
- stripeReader.setMetadata(stripeMetadata.getRowIndexes(),
- stripeMetadata.getEncodings(), stripeMetadata.getStreams());
            // 6.3. Finally, hand off to the stripe reader to produce the data.
            // This is a sync call that will feed data to the consumer.
- // In case if we have high-level cache, we will intercept the data and add it there;
- // otherwise just pass the data directly to the consumer.
- Consumer<EncodedColumnBatch<OrcBatchKey>> consumer =
- (cache == null) ? this.consumer : this;
            // TODO: readEncodedColumns is not supposed to throw; errors should be propagated thru
            // consumer. It is potentially holding locked buffers, and must perform its own cleanup.
- stripeReader.readEncodedColumns(stripeIx, stripeIncludes, colRgs, lowLevelCache, consumer);
+ stripeReader.readEncodedColumns(stripeIx, si, stripeMetadata.getRowIndexes(),
+ stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes, colRgs);
            stripeReader.close();
          } catch (Throwable t) {
            consumer.setError(t);
- cleanupReaders(stripeReaders);
+ cleanupReaders(stripeReader);
            return null;
          }
        }
@@ -289,55 +291,33 @@ public class OrcEncodedDataProducer impl
       * the missing one. This is a temporary cludge until real metadata cache becomes available.
       */
      private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata,
- RecordReader stripeReader, boolean[] stripeIncludes) throws IOException {
+ StripeInformation stripe, boolean[] stripeIncludes) throws IOException {
        // We only synchronize on write for now - design of metadata cache is very temporary;
        // we pre-allocate the array and never remove entries; so readers should be safe.
        synchronized (stripeMetadata) {
          if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
- stripeMetadata.loadMissingIndexes(stripeReader, stripeIncludes);
+ stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes);
        }
      }

      /**
       * Closes the stripe readers (on error).
       */
- private void cleanupReaders(RecordReader[] stripeReaders) {
- for (RecordReader reader : stripeReaders) {
- if (reader == null) continue;
+ private void cleanupReaders(EncodedReader er) {
+ if (metadataReader != null) {
          try {
- reader.close();
+ metadataReader.close();
          } catch (IOException ex) {
            // Ignore.
          }
        }
- }
-
- /**
- * Creates a reader to read single stripe, if there are any columns to read.
- * @return The reader; null if stripe does not need to be read.
- */
- private RecordReader prepareStripeReader(int stripeIx, OrcFileMetadata metadata,
- boolean[] globalIncludes, List<Integer> stripeColsToRead, RecordReader existingReader)
- throws IOException {
- if (stripeColsToRead != null && stripeColsToRead.isEmpty()) {
- if (existingReader != null) {
- existingReader.close();
+ if (er != null) {
+ try {
+ er.close();
+ } catch (IOException ex) {
+ // Ignore.
          }
- return null;
        }
-
- if (existingReader != null) return existingReader;
- // Create RecordReader that will be used to read only this stripe.
- // TODO: use separate class instead.
- StripeInformation si = metadata.getStripes().get(stripeIx);
- ensureOrcReader();
- if (DebugUtils.isTraceOrcEnabled()) {
- LlapIoImpl.LOG.info("Creating stripe reader " + stripeIx + ": "
- + si.getOffset() + ", " + si.getLength());
- }
- existingReader = orcReader.rows(si.getOffset(), si.getLength(), globalIncludes);
- existingReader.prepareEncodedColumnRead();
- return existingReader;
      }

      /**
@@ -369,24 +349,22 @@ public class OrcEncodedDataProducer impl
       * Reads the metadata for all stripes in the file.
       * @param stripeReaders Array to preserve the readers used.
       */
- private ArrayList<OrcStripeMetadata> readStripesMetadata(OrcFileMetadata metadata,
- boolean[] globalInc, RecordReader[] stripeReaders) throws IOException {
- ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(stripeReaders.length);
+ private ArrayList<OrcStripeMetadata> readStripesMetadata(
+ OrcFileMetadata metadata, boolean[] globalInc) throws IOException {
+ ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length);
        OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
- for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
          stripeKey.stripeIx = stripeIxMod + stripeIxFrom;
          OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey);
          if (value == null || !value.hasAllIndexes(globalInc)) {
- ensureOrcReader();
+ ensureMetadataReader();
            StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx);
            if (DebugUtils.isTraceOrcEnabled()) {
              LlapIoImpl.LOG.info("Creating stripe reader " + stripeKey.stripeIx + ": "
                  + si.getOffset() + ", " + si.getLength());
            }
- stripeReaders[stripeIxMod] = orcReader.rows(si.getOffset(), si.getLength(), globalInc);
- stripeReaders[stripeIxMod].prepareEncodedColumnRead();
            if (value == null) {
- value = new OrcStripeMetadata(stripeReaders[stripeIxMod], globalInc);
+ value = new OrcStripeMetadata(metadataReader, si, globalInc);
              metadataCache.putStripeMetadata(stripeKey, value);
              if (DebugUtils.isTraceOrcEnabled()) {
                LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -399,7 +377,7 @@ public class OrcEncodedDataProducer impl
                LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx
                    + " metadata for includes: " + DebugUtils.toString(globalInc));
              }
- updateLoadedIndexes(value, stripeReaders[stripeIxMod], globalInc);
+ updateLoadedIndexes(value, si, globalInc);
            }
          }
          result.add(value);
@@ -407,6 +385,12 @@ public class OrcEncodedDataProducer impl
        return result;
      }

+ private void ensureMetadataReader() throws IOException {
+ ensureOrcReader();
+ if (metadataReader != null) return;
+ metadataReader = orcReader.metadata();
+ }
+
      @Override
      public void returnData(StreamBuffer data) {
        lowLevelCache.releaseBuffers(data.cacheBuffers);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1658861&r1=1658860&r2=1658861&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Wed Feb 11 01:44:14 2015
@@ -20,22 +20,24 @@ package org.apache.hadoop.hive.llap.io.m
  import java.io.IOException;
  import java.util.List;

-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeFooter;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;

  public class OrcStripeMetadata {
    List<ColumnEncoding> encodings;
    List<Stream> streams;
    RowIndex[] rowIndexes;

- public OrcStripeMetadata(RecordReader reader, boolean[] includes) throws IOException {
- rowIndexes = new OrcProto.RowIndex[includes.length];
- reader.getCurrentRowIndexEntries(includes, rowIndexes);
- streams = reader.getCurrentStreams();
- encodings = reader.getCurrentColumnEncodings();
+ public OrcStripeMetadata(
+ MetadataReader mr, StripeInformation stripe, boolean[] includes) throws IOException {
+ StripeFooter footer = mr.readStripeFooter(stripe);
+ streams = footer.getStreamsList();
+ encodings = footer.getColumnsList();
+ rowIndexes = mr.readRowIndex(stripe, footer, includes, null);
    }

    public boolean hasAllIndexes(boolean[] includes) {
@@ -45,8 +47,10 @@ public class OrcStripeMetadata {
      return true;
    }

- public void loadMissingIndexes(RecordReader reader, boolean[] includes) throws IOException {
- reader.getCurrentRowIndexEntries(includes, rowIndexes);
+ public void loadMissingIndexes(
+ MetadataReader mr, StripeInformation stripe, boolean[] includes) throws IOException {
+ // TODO: should we save footer to avoid a read here?
+ rowIndexes = mr.readRowIndex(stripe, null, includes, rowIndexes);
    }

    public RowIndex[] getRowIndexes() {

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java?rev=1658861&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java Wed Feb 11 01:44:14 2015
@@ -0,0 +1,20 @@
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
+
+public interface EncodedReader {
+ void readEncodedColumns(int stripeIx, StripeInformation stripe,
+ RowIndex[] index, List<ColumnEncoding> encodings, List<Stream> streams,
+ boolean[] included, boolean[][] colRgs) throws IOException;
+
+ void close() throws IOException;
+}
\ No newline at end of file

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1658861&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java Wed Feb 11 01:44:14 2015
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+
+
+public class EncodedReaderImpl implements EncodedReader {
+ private static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
+
+ private final String fileName;
+ private final FSDataInputStream file;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final List<OrcProto.Type> types;
+ private final ZeroCopyReaderShim zcr;
+ private final long rowIndexStride;
+ private final LowLevelCache cache;
+ private final ByteBufferAllocatorPool pool;
+ // For now, one consumer for all calls.
+ private final Consumer<EncodedColumnBatch<OrcBatchKey>> consumer;
+
+ public EncodedReaderImpl(FileSystem fileSystem, Path path, boolean useZeroCopy,
+ List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate,
+ LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer)
+ throws IOException {
+ this.fileName = path.toString().intern(); // should we normalize this, like DFS would?
+ this.file = fileSystem.open(path);
+ this.codec = codec;
+ this.types = types;
+ this.bufferSize = bufferSize;
+ this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null;
+ this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;
+ this.rowIndexStride = strideRate;
+ this.cache = cache;
+ this.consumer = consumer;
+ }
+
+
+ /** Helper context for each column being read */
+ private static final class ColumnReadContext {
+ public ColumnReadContext(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
+ this.encoding = encoding;
+ this.rowIndex = rowIndex;
+ this.colIx = colIx;
+ }
+ public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
+ /** The number of streams that are part of this column. */
+ int streamCount = 0;
+ final StreamContext[] streams = new StreamContext[MAX_STREAMS];
+ /** Column encoding. */
+ final ColumnEncoding encoding;
+ /** Column rowindex. */
+ final OrcProto.RowIndex rowIndex;
+ /** Column index in the file. */
+ final int colIx;
+
+ public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
+ streams[streamCount++] = new StreamContext(stream, offset, indexIx);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" column_index: ").append(colIx);
+ sb.append(" encoding: ").append(encoding);
+ sb.append(" stream_count: ").append(streamCount);
+ int i = 0;
+ for (StreamContext sc : streams) {
+ if (sc != null) {
+ sb.append(" stream_").append(i).append(":").append(sc.toString());
+ }
+ i++;
+ }
+ return sb.toString();
+ }
+ }
+
+ private static final class StreamContext {
+ public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
+ this.kind = stream.getKind();
+ this.length = stream.getLength();
+ this.offset = streamOffset;
+ this.streamIndexOffset = streamIndexOffset;
+ }
+ /** Offsets of each stream in the column. */
+ public final long offset, length;
+ public final int streamIndexOffset;
+ public final OrcProto.Stream.Kind kind;
+ /** Iterators for the buffers; used to maintain position in per-rg reading. */
+ ListIterator<DiskRange> bufferIter;
+ /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
+ StreamBuffer stripeLevelStream;
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" kind: ").append(kind);
+ sb.append(" offset: ").append(offset);
+ sb.append(" length: ").append(length);
+ sb.append(" index_offset: ").append(streamIndexOffset);
+ return sb.toString();
+ }
+ }
+
+ @Override
+ public void readEncodedColumns(int stripeIx, StripeInformation stripe,
+ RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
+ boolean[] included, boolean[][] colRgs) throws IOException {
+ // Note: for now we don't have to setError here, caller will setError if we throw.
+ // We are also not supposed to call setDone, since we are only part of the operation.
+ long stripeOffset = stripe.getOffset();
+ // 1. Figure out what we have to read.
+ LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
+ long offset = 0; // Stream offset in relation to the stripe.
+ // 1.1. Figure out which columns have a present stream
+ boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
+ }
+ DiskRange lastRange = null;
+
+ // We assume stream list is sorted by column and that non-data
+ // streams do not interleave data streams for the same column.
+ // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream).
+ int colRgIx = -1, lastColIx = -1;
+ ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
+ boolean[] includedRgs = null;
+ boolean isCompressed = (codec != null);
+ for (OrcProto.Stream stream : streamList) {
+ long length = stream.getLength();
+ int colIx = stream.getColumn();
+ OrcProto.Stream.Kind streamKind = stream.getKind();
+ if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length);
+ }
+ offset += length;
+ continue;
+ }
+ ColumnReadContext ctx = null;
+ if (lastColIx != colIx) {
+ ++colRgIx;
+ assert colCtxs[colRgIx] == null;
+ lastColIx = colIx;
+ includedRgs = colRgs[colRgIx];
+ ctx = colCtxs[colRgIx] = new ColumnReadContext(
+ colIx, encodings.get(colIx), indexes[colIx]);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
+ }
+ } else {
+ ctx = colCtxs[colRgIx];
+ assert ctx != null;
+ }
+ int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
+ types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
+ ctx.addStream(offset, stream, indexIx);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
+ + ", " + length + ", index position " + indexIx);
+ }
+ if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
+ lastRange = RecordReaderUtils.addEntireStreamToRanges(
+ offset, length, lastRange, rangesToRead);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Will read whole stream " + streamKind + "; added to " + lastRange);
+ }
+ } else {
+ lastRange = RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
+ codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx),
+ bufferSize, hasNull[colIx], offset, length, lastRange, rangesToRead);
+ }
+ offset += length;
+ }
+
+ // 2. Now, read all of the ranges from cache or disk.
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Resulting disk ranges to read: "
+ + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+ }
+ if (cache != null) {
+ cache.getFileData(fileName, rangesToRead, stripeOffset);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Disk ranges after cache (base offset " + stripeOffset
+ + "): " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+ }
+ }
+ // Force direct buffers, since we will be decompressing to cache.
+ RecordReaderUtils.readDiskRanges(file, zcr, stripeOffset, rangesToRead, true);
+
+ // 2.1. Separate buffers (relative to stream offset) for each stream from the data we have.
+ // TODO: given how we read, we could potentially get rid of this step?
+ for (ColumnReadContext colCtx : colCtxs) {
+ for (int i = 0; i < colCtx.streamCount; ++i) {
+ StreamContext sctx = colCtx.streams[i];
+ List<DiskRange> sb = RecordReaderUtils.getStreamBuffers(
+ rangesToRead, sctx.offset, sctx.length);
+ sctx.bufferIter = sb.listIterator();
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Column " + colCtx.colIx + " stream " + sctx.kind + " at " + sctx.offset + ","
+ + sctx.length + " got ranges (relative to stream) "
+ + RecordReaderUtils.stringifyDiskRanges(sb));
+ }
+ }
+ }
+
+ // 3. Finally, decompress data, map per RG, and return to caller.
+ // We go by RG and not by column because that is how data is processed.
+ int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
+ for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
+ boolean isLastRg = rgCount - rgIx - 1 == 0;
+ // Create the batch we will use to return data for this RG.
+ EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
+ new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
+ boolean isRGSelected = true;
+ for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
+ if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
+ isRGSelected = false;
+ continue;
+ } // RG x col filtered.
+ ColumnReadContext ctx = colCtxs[colIxMod];
+ RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
+ nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
+ ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
+ for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+ StreamContext sctx = ctx.streams[streamIx];
+ long absStreamOffset = stripeOffset + sctx.offset;
+ StreamBuffer cb = null;
+ if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
+ // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
+ }
+ cb = getStripeLevelStream(absStreamOffset, sctx, cache, isLastRg);
+ } else {
+ // This stream can be separated by RG using index. Let's do that.
+ long cOffset = index.getPositions(sctx.streamIndexOffset),
+ endCOffset = RecordReaderUtils.estimateRgEndOffset(isCompressed, isLastRg,
+ isLastRg ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset),
+ sctx.length, bufferSize);
+ cb = new StreamBuffer(sctx.kind.getNumber());
+ cb.incRef();
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Getting data for column "+ ctx.colIx + " " + (isLastRg ? "last " : "")
+ + "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
+ + sctx.length + " index position " + sctx.streamIndexOffset + ": compressed ["
+ + cOffset + ", " + endCOffset + ")");
+ }
+ InStream.uncompressStream(fileName, absStreamOffset, zcr, sctx.bufferIter,
+ codec, bufferSize, cache, cOffset, endCOffset, cb);
+ }
+ ecb.setStreamData(colIxMod, streamIx, cb);
+ }
+ }
+ if (isRGSelected) {
+ consumer.consumeData(ecb);
+ }
+ }
+ }
+
+ /**
+ * Reads the entire stream for a column (e.g. a dictionary stream), or gets it from context.
+ * @param isLastRg Whether the stream is being read for last RG in stripe.
+ * @return StreamBuffer that contains the entire stream.
+ */
+ private StreamBuffer getStripeLevelStream(long baseOffset, StreamContext ctx,
+ LowLevelCache cache, boolean isLastRg) throws IOException {
+ if (ctx.stripeLevelStream == null) {
+ ctx.stripeLevelStream = new StreamBuffer(ctx.kind.getNumber());
+ // We will be using this for each RG while also sending RGs to processing.
+ // To avoid buffers being unlocked, run refcount one ahead; we will not increase
+ // it when building the last RG, so each RG processing will decref once, and the
+ // last one will unlock the buffers.
+ ctx.stripeLevelStream.incRef();
+ InStream.uncompressStream(fileName, baseOffset, zcr,
+ ctx.bufferIter, codec, bufferSize, cache, -1, -1, ctx.stripeLevelStream);
+ ctx.bufferIter = null;
+ }
+ if (!isLastRg) {
+ ctx.stripeLevelStream.incRef();
+ }
+ return ctx.stripeLevelStream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.close();
+ if (pool != null) {
+ pool.clear();
+ }
+ }
+}

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java?rev=1658861&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java Wed Feb 11 01:44:14 2015
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
+
+import com.google.common.collect.Lists;
+
+public class MetadataReader {
+ private final FSDataInputStream file;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final int typeCount;
+
+ public MetadataReader(FileSystem fileSystem, Path path,
+ CompressionCodec codec, int bufferSize, int typeCount) throws IOException {
+ this(fileSystem.open(path), codec, bufferSize, typeCount);
+ }
+
+ public MetadataReader(FSDataInputStream file,
+ CompressionCodec codec, int bufferSize, int typeCount) {
+ this.file = file;
+ this.codec = codec;
+ this.bufferSize = bufferSize;
+ this.typeCount = typeCount;
+ }
+
+ public OrcProto.RowIndex[] readRowIndex(StripeInformation stripe, OrcProto.StripeFooter footer,
+ boolean[] included, OrcProto.RowIndex[] indexes) throws IOException {
+ if (footer == null) {
+ footer = readStripeFooter(stripe);
+ }
+ if (indexes == null) {
+ indexes = new OrcProto.RowIndex[typeCount];
+ }
+ long offset = stripe.getOffset();
+ for (OrcProto.Stream stream : footer.getStreamsList()) {
+ if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
+ int col = stream.getColumn();
+ if ((included != null && !included[col]) || indexes[col] == null) continue;
+ byte[] buffer = new byte[(int) stream.getLength()];
+ file.seek(offset);
+ file.readFully(buffer);
+ indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
+ Lists.<DiskRange>newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)),
+ stream.getLength(), codec, bufferSize, null));
+ }
+ offset += stream.getLength();
+ }
+ return indexes;
+ }
+
+ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+ long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+ int tailLength = (int) stripe.getFooterLength();
+
+ // read the footer
+ ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+ file.seek(offset);
+ file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+ return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer",
+ Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+ tailLength, codec, bufferSize, null));
+ }
+
+ public void close() throws IOException {
+ file.close();
+ }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1658861&r1=1658860&r2=1658861&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Wed Feb 11 01:44:14 2015
@@ -22,6 +22,10 @@ import java.io.IOException;
  import java.nio.ByteBuffer;
  import java.util.List;

+import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

@@ -318,4 +322,9 @@ public interface Reader {
                      boolean[] include, SearchArgument sarg,
                      String[] neededColumns) throws IOException;

+ MetadataReader metadata() throws IOException;
+
+ EncodedReader encodedReader(LowLevelCache lowLevelCache,
+ Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException;
+
  }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1658861&r1=1658860&r2=1658861&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Wed Feb 11 01:44:14 2015
@@ -18,6 +18,8 @@

  package org.apache.hadoop.hive.ql.io.orc;

+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY;
+
  import java.io.IOException;
  import java.io.InputStream;
  import java.nio.ByteBuffer;
@@ -34,6 +36,11 @@ import org.apache.hadoop.fs.FSDataInputS
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
@@ -692,4 +699,17 @@ public class ReaderImpl implements Reade
    public List<UserMetadataItem> getOrcProtoUserMetadata() {
      return footer.getMetadataList();
    }
+
+ @Override
+ public MetadataReader metadata() throws IOException {
+ return new MetadataReader(fileSystem, path, codec, bufferSize, footer.getTypesCount());
+ }
+
+ @Override
+ public EncodedReader encodedReader(LowLevelCache lowLevelCache,
+ Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException {
+ boolean useZeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
+ return new EncodedReaderImpl(fileSystem, path, useZeroCopy, footer.getTypesList(),
+ codec, bufferSize, footer.getRowIndexStride(), lowLevelCache, consumer);
+ }
  }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1658861&r1=1658860&r2=1658861&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Wed Feb 11 01:44:14 2015
@@ -83,30 +83,4 @@ public interface RecordReader {
     * Seek to a particular row number.
     */
    void seekToRow(long rowCount) throws IOException;
-
- void prepareEncodedColumnRead() throws IOException;
-
- // TODO: maybe all of this should be moved to LLAP-specific class.
- // See also comment in RecordReaderImpl; class doesn't even have to be RecordReader
- /**
- * TODO: assumes the reader is for one stripe, otherwise the signature makes no sense.
- * Also has no columns passed, because that is in ctor.
- * @param stripeIncludes Includes to use for this call. This method ignores reader's includes.
- * @param colRgs What RGs are to be read. Has # of elements equal to the number of
- * included columns; then each boolean is rgCount long.
- * @param cache Cache to get/put data and allocate memory.
- * @param consumer Consumer to pass the results too.
- * @throws IOException
- */
- void readEncodedColumns(int stripeIx, boolean[] stripeIncludes, boolean[][] colRgs,
- LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException;
-
- void getCurrentRowIndexEntries(boolean[] included, RowIndex[] indexes) throws IOException;
-
- List<ColumnEncoding> getCurrentColumnEncodings() throws IOException;
-
- List<OrcProto.Stream> getCurrentStreams() throws IOException;
-
- void setMetadata(RowIndex[] index,
- List<ColumnEncoding> encodings, List<OrcProto.Stream> streams);
  }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1658861&r1=1658860&r2=1658861&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Feb 11 01:44:14 2015
@@ -31,12 +31,9 @@ import java.util.HashMap;
  import java.util.LinkedHashMap;
  import java.util.LinkedList;
  import java.util.List;
-import java.util.ListIterator;
  import java.util.Map;
-import java.util.TreeMap;

  import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.HashCodeBuilder;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
@@ -46,13 +43,8 @@ import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.DiskRange;
  import org.apache.hadoop.hive.common.type.HiveDecimal;
  import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
  import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -61,10 +53,7 @@ import org.apache.hadoop.hive.ql.exec.ve
  import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
  import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -78,9 +67,7 @@ import org.apache.hadoop.hive.serde2.io.
  import org.apache.hadoop.hive.serde2.io.ShortWritable;
  import org.apache.hadoop.hive.serde2.io.TimestampWritable;
  import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
-import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
  import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
-import org.apache.hadoop.hive.shims.ShimLoader;
  import org.apache.hadoop.io.BooleanWritable;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.FloatWritable;
@@ -88,12 +75,11 @@ import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;

-import com.google.common.collect.ComparisonChain;
  import com.google.common.collect.Lists;

  public class RecordReaderImpl implements RecordReader {

- private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+ static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
    private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();

    private final String fileName;
@@ -117,95 +103,15 @@ public class RecordReaderImpl implements
    List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
    private final TreeReader reader;
    private final OrcProto.RowIndex[] indexes;
- private List<OrcProto.ColumnEncoding> encodings;
- private List<OrcProto.Stream> streamList;
    private final SargApplier sargApp;
    // an array about which row groups aren't skipped
    private boolean[] includedRowGroups = null;
    private final Configuration conf;
+ private final MetadataReader metadata;

    private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
    private final ZeroCopyReaderShim zcr;

- // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
- // which lacks a clear()/clean() operation
- public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim {
- private static final class Key implements Comparable<Key> {
- private final int capacity;
- private final long insertionGeneration;
-
- Key(int capacity, long insertionGeneration) {
- this.capacity = capacity;
- this.insertionGeneration = insertionGeneration;
- }
-
- @Override
- public int compareTo(Key other) {
- return ComparisonChain.start().compare(capacity, other.capacity)
- .compare(insertionGeneration, other.insertionGeneration).result();
- }
-
- @Override
- public boolean equals(Object rhs) {
- if (rhs == null) {
- return false;
- }
- try {
- Key o = (Key) rhs;
- return (compareTo(o) == 0);
- } catch (ClassCastException e) {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(capacity).append(insertionGeneration)
- .toHashCode();
- }
- }
-
- private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
-
- private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
-
- private long currentGeneration = 0;
-
- private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
- return direct ? directBuffers : buffers;
- }
-
- public void clear() {
- buffers.clear();
- directBuffers.clear();
- }
-
- @Override
- public ByteBuffer getBuffer(boolean direct, int length) {
- TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
- Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
- if (entry == null) {
- return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
- .allocate(length);
- }
- tree.remove(entry.getKey());
- return entry.getValue();
- }
-
- @Override
- public void putBuffer(ByteBuffer buffer) {
- TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
- while (true) {
- Key key = new Key(buffer.capacity(), currentGeneration++);
- if (!tree.containsKey(key)) {
- tree.put(key, buffer);
- return;
- }
- // Buffers are indexed by (capacity, generation).
- // If our key is not unique on the first try, we try again
- }
- }
- }

    /**
     * Given a list of column names, find the given column and return the index.
@@ -263,6 +169,7 @@ public class RecordReaderImpl implements
      this.included = options.getInclude();
      this.conf = conf;
      this.rowIndexStride = strideRate;
+ this.metadata = new MetadataReader(file, codec, bufferSize, types.size());
      SearchArgument sarg = options.getSearchArgument();
      if (sarg != null && strideRate != 0) {
        sargApp = new SargApplier(sarg, options.getColumnNames(), strideRate);
@@ -285,15 +192,7 @@ public class RecordReaderImpl implements

      final boolean zeroCopy = (conf != null)
          && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
-
- if (zeroCopy
- && (codec == null || ((codec instanceof DirectDecompressionCodec)
- && ((DirectDecompressionCodec) codec).isAvailable()))) {
- /* codec is null or is available */
- this.zcr = ShimLoader.getHadoopShims().getZeroCopyReader(file, pool);
- } else {
- this.zcr = null;
- }
+ zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;

      firstRow = skippedRows;
      totalRowCount = rows;
@@ -2289,19 +2188,8 @@ public class RecordReaderImpl implements
      }
    }

- OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
- ) throws IOException {
- long offset = stripe.getOffset() + stripe.getIndexLength() +
- stripe.getDataLength();
- int tailLength = (int) stripe.getFooterLength();
-
- // read the footer
- ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
- file.seek(offset);
- file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer",
- Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
- tailLength, codec, bufferSize, null));
+ OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+ return metadata.readStripeFooter(stripe);
    }

    static enum Location {
@@ -2749,7 +2637,7 @@ public class RecordReaderImpl implements
      if (this.cache != null) {
        cache.getFileData(fileName, rangesToRead, stripe.getOffset());
      }
- readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, false);
+ RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, false);
      bufferChunks = rangesToRead;
      List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
      createStreams(
@@ -2837,96 +2725,6 @@ public class RecordReaderImpl implements
      }
    }

- private static final int BYTE_STREAM_POSITIONS = 1;
- private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
- private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
- private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
-
- /**
- * Get the offset in the index positions for the column that the given
- * stream starts.
- * @param columnEncoding the encoding of the column
- * @param columnType the type of the column
- * @param streamType the kind of the stream
- * @param isCompressed is the file compressed
- * @param hasNulls does the column have a PRESENT stream?
- * @return the number of positions that will be used for that stream
- */
- public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
- OrcProto.Type.Kind columnType,
- OrcProto.Stream.Kind streamType,
- boolean isCompressed,
- boolean hasNulls) {
- if (streamType == OrcProto.Stream.Kind.PRESENT) {
- return 0;
- }
- int compressionValue = isCompressed ? 1 : 0;
- int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
- switch (columnType) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case DATE:
- case STRUCT:
- case MAP:
- case LIST:
- case UNION:
- return base;
- case CHAR:
- case VARCHAR:
- case STRING:
- if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
- columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
- return base;
- } else {
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- } else {
- return base + BYTE_STREAM_POSITIONS + compressionValue;
- }
- }
- case BINARY:
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- }
- return base + BYTE_STREAM_POSITIONS + compressionValue;
- case DECIMAL:
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- }
- return base + BYTE_STREAM_POSITIONS + compressionValue;
- case TIMESTAMP:
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- }
- return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
- default:
- throw new IllegalArgumentException("Unknown type " + columnType);
- }
- }
-
- // for uncompressed streams, what is the most overlap with the following set
- // of rows (long vint literal group).
- static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
-
- /**
- * Is this stream part of a dictionary?
- * @return is this part of a dictionary?
- */
- static boolean isDictionary(OrcProto.Stream.Kind kind,
- OrcProto.ColumnEncoding encoding) {
- assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
- OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
- return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
- (kind == OrcProto.Stream.Kind.LENGTH &&
- (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
- encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
- }
-
    /**
     * Plan the ranges of the file that we need to read given the list of
     * columns and row groups.
@@ -2952,7 +2750,7 @@ public class RecordReaderImpl implements
      LinkedList<DiskRange> result = new LinkedList<DiskRange>();
      long offset = 0;
      // figure out which columns have a present stream
- boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
+ boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
      DiskRange lastRange = null;
      for (OrcProto.Stream stream : streamList) {
        long length = stream.getLength();
@@ -2960,10 +2758,11 @@ public class RecordReaderImpl implements
        OrcProto.Stream.Kind streamKind = stream.getKind();
        if (StreamName.getArea(streamKind) == StreamName.Area.DATA && includedColumns[column]) {
          // if we aren't filtering or it is a dictionary, load it.
- if (includedRowGroups == null || isDictionary(streamKind, encodings.get(column))) {
- lastRange = addEntireStreamToResult(offset, length, lastRange, result);
+ if (includedRowGroups == null
+ || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
+ lastRange = RecordReaderUtils.addEntireStreamToRanges(offset, length, lastRange, result);
          } else {
- lastRange = addRgFilteredStreamToResult(stream, includedRowGroups,
+ lastRange = RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
                isCompressed, indexes[column], encodings.get(column), types.get(column),
                compressionSize, hasNull[column], offset, length, lastRange, result);
          }
@@ -2973,71 +2772,6 @@ public class RecordReaderImpl implements
      return result;
    }

- private static DiskRange addEntireStreamToResult(long offset, long length,
- DiskRange lastRange, LinkedList<DiskRange> result) {
- long end = offset + length;
- if (lastRange != null && overlap(lastRange.offset, lastRange.end, offset, end)) {
- lastRange.offset = Math.min(lastRange.offset, offset);
- lastRange.end = Math.max(lastRange.end, end);
- } else {
- lastRange = new DiskRange(offset, end);
- result.add(lastRange);
- }
- return lastRange;
- }
-
- private static boolean[] findPresentStreamsByColumn(List<OrcProto.Stream> streamList,
- List<OrcProto.Type> types) {
- boolean[] hasNull = new boolean[types.size()];
- for(OrcProto.Stream stream: streamList) {
- if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
- hasNull[stream.getColumn()] = true;
- }
- }
- return hasNull;
- }
-
- private static DiskRange addRgFilteredStreamToResult(OrcProto.Stream stream,
- boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
- OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
- long offset, long length, DiskRange lastRange, LinkedList<DiskRange> result) {
- for (int group = 0; group < includedRowGroups.length; ++group) {
- if (!includedRowGroups[group]) continue;
- int posn = getIndexPosition(
- encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
- long start = index.getEntry(group).getPositions(posn);
- final long nextGroupOffset;
- boolean isLast = group == (includedRowGroups.length - 1);
- nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
-
- start += offset;
- long end = offset + estimateRgEndOffset(
- isCompressed, isLast, nextGroupOffset, length, compressionSize);
- if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) {
- lastRange.offset = Math.min(lastRange.offset, start);
- lastRange.end = Math.max(lastRange.end, end);
- } else {
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Creating new range for RG read; last range (which can include some "
- + "previous RGs) was " + lastRange);
- }
- lastRange = new DiskRange(start, end);
- result.add(lastRange);
- }
- }
- return lastRange;
- }
-
- private static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
- long nextGroupOffset, long streamLength, int bufferSize) {
- // figure out the worst case last location
- // if adjacent groups have the same compressed block offset then stretch the slop
- // by factor of 2 to safely accommodate the next compression block.
- // One for the current compression block and another for the next compression block.
- long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
- return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
- }
-
    /**
     * Update the disk ranges to collapse adjacent or overlapping ranges. It
     * assumes that the ranges are sorted.
@@ -3047,7 +2781,7 @@ public class RecordReaderImpl implements
      DiskRange prev = null;
      for(int i=0; i < ranges.size(); ++i) {
        DiskRange current = ranges.get(i);
- if (prev != null && overlap(prev.offset, prev.end,
+ if (prev != null && RecordReaderUtils.overlap(prev.offset, prev.end,
            current.offset, current.end)) {
          prev.offset = Math.min(prev.offset, current.offset);
          prev.end = Math.max(prev.end, current.end);
@@ -3059,99 +2793,6 @@ public class RecordReaderImpl implements
      }
    }

- /**
- * Read the list of ranges from the file.
- * @param file the file to read
- * @param base the base of the stripe
- * @param ranges the disk ranges within the stripe to read
- * @return the bytes read for each disk range, which is the same length as
- * ranges
- * @throws IOException
- */
- static void readDiskRanges(FSDataInputStream file,
- ZeroCopyReaderShim zcr,
- long base,
- LinkedList<DiskRange> ranges,
- boolean doForceDirect) throws IOException {
- ListIterator<DiskRange> rangeIter = ranges.listIterator();
- while (rangeIter.hasNext()) {
- DiskRange range = rangeIter.next();
- if (range.hasData()) continue;
- int len = (int) (range.end - range.offset);
- long off = range.offset;
- file.seek(base + off);
- if (zcr != null) {
- boolean hasReplaced = false;
- while (len > 0) {
- ByteBuffer partial = zcr.readBuffer(len, false);
- BufferChunk bc = new BufferChunk(partial, off);
- if (!hasReplaced) {
- rangeIter.set(bc);
- hasReplaced = true;
- } else {
- rangeIter.add(bc);
- }
- int read = partial.remaining();
- len -= read;
- off += read;
- }
- } else if (doForceDirect) {
- ByteBuffer directBuf = ByteBuffer.allocateDirect(len);
- try {
- while (directBuf.remaining() > 0) {
- int count = file.read(directBuf);
- if (count < 0) throw new EOFException();
- directBuf.position(directBuf.position() + count);
- }
- } catch (UnsupportedOperationException ex) {
- LOG.error("Stream does not support direct read; we will copy.");
- byte[] buffer = new byte[len];
- file.readFully(buffer, 0, buffer.length);
- directBuf.put(buffer);
- }
- directBuf.position(0);
- rangeIter.set(new BufferChunk(directBuf, range.offset));
- } else {
- byte[] buffer = new byte[len];
- file.readFully(buffer, 0, buffer.length);
- rangeIter.set(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
- }
- }
- }
-
- /**
- * Does region A overlap region B? The end points are inclusive on both sides.
- * @param leftA A's left point
- * @param rightA A's right point
- * @param leftB B's left point
- * @param rightB B's right point
- * @return Does region A overlap region B?
- */
- static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
- if (leftA <= leftB) {
- return rightA >= leftB;
- }
- return rightB >= leftA;
- }
-
- /**
- * Build a string representation of a list of disk ranges.
- * @param ranges ranges to stringify
- * @return the resulting string
- */
- static String stringifyDiskRanges(List<DiskRange> ranges) {
- StringBuilder buffer = new StringBuilder();
- buffer.append("[");
- for(int i=0; i < ranges.size(); ++i) {
- if (i != 0) {
- buffer.append(", ");
- }
- buffer.append(ranges.get(i).toString());
- }
- buffer.append("]");
- return buffer.toString();
- }
-
    void createStreams(List<OrcProto.Stream> streamDescriptions,
                              List<DiskRange> ranges,
                              boolean[] includeColumn,
@@ -3167,7 +2808,8 @@ public class RecordReaderImpl implements
          streamOffset += streamDesc.getLength();
          continue;
        }
- List<DiskRange> buffers = getStreamBuffers(ranges, streamOffset, streamDesc.getLength());
+ List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+ ranges, streamOffset, streamDesc.getLength());
        StreamName name = new StreamName(column, streamDesc.getKind());
        streams.put(name, InStream.create(fileName, name.toString(), buffers,
            streamDesc.getLength(), codec, bufferSize, cache));
@@ -3175,44 +2817,6 @@ public class RecordReaderImpl implements
      }
    }

- private List<DiskRange> getStreamBuffers(List<DiskRange> ranges, long offset, long length) {
- // This assumes sorted ranges (as do many other parts of ORC code.
- ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
- long streamEnd = offset + length;
- boolean inRange = false;
- for (DiskRange range : ranges) {
- if (!inRange) {
- if (range.end <= offset) continue; // Skip until we are in range.
- inRange = true;
- if (range.offset < offset) {
- // Partial first buffer, add a slice of it.
- DiskRange partial = range.slice(offset, Math.min(streamEnd, range.end));
- partial.shiftBy(-offset);
- buffers.add(partial);
- if (range.end >= streamEnd) break; // Partial first buffer is also partial last buffer.
- continue;
- }
- } else if (range.offset >= streamEnd) {
- break;
- }
- if (range.end > streamEnd) {
- // Partial last buffer (may also be the first buffer), add a slice of it.
- DiskRange partial = range.slice(range.offset, streamEnd);
- partial.shiftBy(-offset);
- buffers.add(partial);
- break;
- }
- // Buffer that belongs entirely to one stream.
- // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
- // because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
- DiskRange full = range.slice(range.offset, range.end);
- full.shiftBy(-offset);
- buffers.add(full);
- if (range.end == streamEnd) break;
- }
- return buffers;
- }
-
    private LowLevelCache cache = null;
    public void setCache(LowLevelCache cache) {
      this.cache = cache;
@@ -3225,16 +2829,16 @@ public class RecordReaderImpl implements
              indexes, included, includedRowGroups, codec != null,
              stripeFooter.getColumnsList(), types, bufferSize);
      if (LOG.isDebugEnabled()) {
- LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead));
+ LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
      }
      mergeDiskRanges(rangesToRead);
      if (this.cache != null) {
        cache.getFileData(fileName, rangesToRead, stripe.getOffset());
      }
- readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, false);
+ RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, false);
      bufferChunks = rangesToRead;
      if (LOG.isDebugEnabled()) {
- LOG.debug("merge = " + stringifyDiskRanges(rangesToRead));
+ LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
      }

      createStreams(streamList, bufferChunks, included, codec, bufferSize, streams, cache);
@@ -3416,31 +3020,14 @@ public class RecordReaderImpl implements

    OrcProto.RowIndex[] readRowIndex(
        int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes) throws IOException {
- long offset = stripes.get(stripeIndex).getOffset();
- OrcProto.StripeFooter stripeFooter;
+ StripeInformation stripe = stripes.get(stripeIndex);
+ OrcProto.StripeFooter stripeFooter = null;
      // if this is the current stripe, use the cached objects.
      if (stripeIndex == currentStripe) {
        stripeFooter = this.stripeFooter;
        indexes = indexes == null ? this.indexes : indexes;
- } else {
- stripeFooter = readStripeFooter(stripes.get(stripeIndex));
- indexes = indexes == null ? new OrcProto.RowIndex[this.indexes.length] : indexes;
      }
- for(OrcProto.Stream stream: stripeFooter.getStreamsList()) {
- if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
- int col = stream.getColumn();
- if ((included == null || included[col]) && indexes[col] == null) {
- byte[] buffer = new byte[(int) stream.getLength()];
- file.seek(offset);
- file.readFully(buffer);
- indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
- Lists.<DiskRange>newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)),
- stream.getLength(), codec, bufferSize, null));
- }
- }
- offset += stream.getLength();
- }
- return indexes;
+ return metadata.readRowIndex(stripe, stripeFooter, included, indexes);
    }

    private void seekToRowEntry(TreeReader reader, int rowEntry) throws IOException {
@@ -3476,297 +3063,4 @@ public class RecordReaderImpl implements
      // if we aren't to the right row yet, advance in the stripe.
      advanceToNextRow(reader, rowNumber, true);
    }
-
- @Override
- public void prepareEncodedColumnRead() throws IOException {
- assert currentStripe < 1 : "Reader is supposed to be per stripe";
- if (currentStripe == 0) return;
- ++currentStripe;
- beginReadStripe();
- }
-
- /** Helper context for each column being read */
- private static final class ColumnReadContext {
- public ColumnReadContext(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
- this.encoding = encoding;
- this.rowIndex = rowIndex;
- this.colIx = colIx;
- }
- public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
- /** The number of streams that are part of this column. */
- int streamCount = 0;
- final StreamContext[] streams = new StreamContext[MAX_STREAMS];
- /** Column encoding. */
- final ColumnEncoding encoding;
- /** Column rowindex. */
- final OrcProto.RowIndex rowIndex;
- /** Column index in the file. */
- final int colIx;
-
- public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
- streams[streamCount++] = new StreamContext(stream, offset, indexIx);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(" column_index: ").append(colIx);
- sb.append(" encoding: ").append(encoding);
- sb.append(" stream_count: ").append(streamCount);
- int i = 0;
- for (StreamContext sc : streams) {
- if (sc != null) {
- sb.append(" stream_").append(i).append(":").append(sc.toString());
- }
- i++;
- }
- return sb.toString();
- }
- }
-
- private static final class StreamContext {
- public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
- this.kind = stream.getKind();
- this.length = stream.getLength();
- this.offset = streamOffset;
- this.streamIndexOffset = streamIndexOffset;
- }
- /** Offsets of each stream in the column. */
- public final long offset, length;
- public final int streamIndexOffset;
- public final OrcProto.Stream.Kind kind;
- /** Iterators for the buffers; used to maintain position in per-rg reading. */
- ListIterator<DiskRange> bufferIter;
- /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
- StreamBuffer stripeLevelStream;
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(" kind: ").append(kind);
- sb.append(" offset: ").append(offset);
- sb.append(" length: ").append(length);
- sb.append(" index_offset: ").append(streamIndexOffset);
- return sb.toString();
- }
- }
-
- /*
- * TODO: this method could be made static or moved to separate class unrelated to RecordReader.
- * It's not very well integrated into RecordReader, and violates many RR usage patterns.
- * The following fields are used; and how to get rid of them:
- * currentStripe - always 0 in current usage.
- * stripes, fileName, codec, zcr, bufferSize - available externally or thru parent Reader
- * rowCountInStripe, file - derived
- * types, encodings, indexes - available externally from cache (for initial caching, reader
- * is needed to get footer and indexes; or that can be refactored out)
- */
- @Override
- // TODO#: HERE
- public void readEncodedColumns(int stripeIx, boolean[] included, boolean[][] colRgs,
- LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException {
- // Note: for now we don't have to setError here, caller will setError if we throw.
- // We are also not supposed to call setDone, since we are only part of the operation.
- StripeInformation stripe = stripes.get(currentStripe);
- long stripeOffset = stripe.getOffset();
- // TODO: we should have avoided reading the footer if we got metadata from cache.
- List<OrcProto.Stream> streamList =
- this.streamList != null ? this.streamList : stripeFooter.getStreamsList();
- List<ColumnEncoding> encodings =
- this.encodings != null ? this.encodings : stripeFooter.getColumnsList();
-
- // 1. Figure out what we have to read.
- LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
- long offset = 0; // Stream offset in relation to the stripe.
- // 1.1. Figure out which columns have a present stream
- boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
- }
- DiskRange lastRange = null;
-
- // We assume stream list is sorted by column and that non-data
- // streams do not interleave data streams for the same column.
- // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream).
- int colRgIx = -1, lastColIx = -1;
- ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
- boolean[] includedRgs = null;
- boolean isCompressed = (codec != null);
- for (OrcProto.Stream stream : streamList) {
- long length = stream.getLength();
- int colIx = stream.getColumn();
- OrcProto.Stream.Kind streamKind = stream.getKind();
- if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) {
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length);
- }
- offset += length;
- continue;
- }
- ColumnReadContext ctx = null;
- if (lastColIx != colIx) {
- ++colRgIx;
- assert colCtxs[colRgIx] == null;
- lastColIx = colIx;
- includedRgs = colRgs[colRgIx];
- ctx = colCtxs[colRgIx] = new ColumnReadContext(
- colIx, encodings.get(colIx), indexes[colIx]);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
- }
- } else {
- ctx = colCtxs[colRgIx];
- assert ctx != null;
- }
- int indexIx = getIndexPosition(ctx.encoding.getKind(),
- types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
- ctx.addStream(offset, stream, indexIx);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
- + ", " + length + ", index position " + indexIx);
- }
- if (includedRgs == null || isDictionary(streamKind, encodings.get(colIx))) {
- lastRange = addEntireStreamToResult(offset, length, lastRange, rangesToRead);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Will read whole stream " + streamKind + "; added to " + lastRange);
- }
- } else {
- lastRange = addRgFilteredStreamToResult(stream, includedRgs,
- codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx),
- bufferSize, hasNull[colIx], offset, length, lastRange, rangesToRead);
- }
- offset += length;
- }
-
- // 2. Now, read all of the ranges from cache or disk.
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Resulting disk ranges to read: " + stringifyDiskRanges(rangesToRead));
- }
- if (cache != null) {
- cache.getFileData(fileName, rangesToRead, stripeOffset);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Disk ranges after cache (base offset " + stripeOffset
- + "): " + stringifyDiskRanges(rangesToRead));
- }
- }
- // Force direct buffers, since we will be decompressing to cache.
- readDiskRanges(file, zcr, stripeOffset, rangesToRead, true);
-
- // 2.1. Separate buffers (relative to stream offset) for each stream from the data we have.
- // TODO: given how we read, we could potentially get rid of this step?
- for (ColumnReadContext colCtx : colCtxs) {
- for (int i = 0; i < colCtx.streamCount; ++i) {
- StreamContext sctx = colCtx.streams[i];
- List<DiskRange> sb = getStreamBuffers(rangesToRead, sctx.offset, sctx.length);
- sctx.bufferIter = sb.listIterator();
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Column " + colCtx.colIx + " stream " + sctx.kind + " at " + sctx.offset + ","
- + sctx.length + " got ranges (relative to stream) " + stringifyDiskRanges(sb));
- }
- }
- }
-
- // 3. Finally, decompress data, map per RG, and return to caller.
- // We go by RG and not by column because that is how data is processed.
- int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride);
- for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
- boolean isLastRg = rgCount - rgIx - 1 == 0;
- // Create the batch we will use to return data for this RG.
- EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
- new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
- boolean isRGSelected = true;
- for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
- if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
- isRGSelected = false;
- continue;
- } // RG x col filtered.
- ColumnReadContext ctx = colCtxs[colIxMod];
- RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
- nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
- ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
- for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
- StreamContext sctx = ctx.streams[streamIx];
- long absStreamOffset = stripeOffset + sctx.offset;
- StreamBuffer cb = null;
- if (isDictionary(sctx.kind, ctx.encoding)) {
- // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
- + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
- }
- cb = getStripeLevelStream(absStreamOffset, sctx, cache, isLastRg);
- } else {
- // This stream can be separated by RG using index. Let's do that.
- long cOffset = index.getPositions(sctx.streamIndexOffset),
- endCOffset = estimateRgEndOffset(isCompressed, isLastRg, isLastRg
- ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset),
- sctx.length, bufferSize);
- cb = new StreamBuffer(sctx.kind.getNumber());
- cb.incRef();
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Getting data for column "+ ctx.colIx + " " + (isLastRg ? "last " : "")
- + "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
- + sctx.length + " index position " + sctx.streamIndexOffset + ": compressed ["
- + cOffset + ", " + endCOffset + ")");
- }
- InStream.uncompressStream(fileName, absStreamOffset, zcr, sctx.bufferIter,
- codec, bufferSize, cache, cOffset, endCOffset, cb);
- }
- ecb.setStreamData(colIxMod, streamIx, cb);
- }
- }
- if (isRGSelected) {
- consumer.consumeData(ecb);
- }
- }
- }
-
- /**
- * Reads the entire stream for a column (e.g. a dictionary stream), or gets it from context.
- * @param isLastRg Whether the stream is being read for last RG in stripe.
- * @return StreamBuffer that contains the entire stream.
- */
- private StreamBuffer getStripeLevelStream(long baseOffset, StreamContext ctx,
- LowLevelCache cache, boolean isLastRg) throws IOException {
- if (ctx.stripeLevelStream == null) {
- ctx.stripeLevelStream = new StreamBuffer(ctx.kind.getNumber());
- // We will be using this for each RG while also sending RGs to processing.
- // To avoid buffers being unlocked, run refcount one ahead; we will not increase
- // it when building the last RG, so each RG processing will decref once, and the
- // last one will unlock the buffers.
- ctx.stripeLevelStream.incRef();
- InStream.uncompressStream(fileName, baseOffset, zcr,
- ctx.bufferIter, codec, bufferSize, cache, -1, -1, ctx.stripeLevelStream);
- ctx.bufferIter = null;
- }
- if (!isLastRg) {
- ctx.stripeLevelStream.incRef();
- }
- return ctx.stripeLevelStream;
- }
-
- @Override
- public List<ColumnEncoding> getCurrentColumnEncodings() throws IOException {
- return stripeFooter.getColumnsList();
- }
-
- @Override
- public void getCurrentRowIndexEntries(
- boolean[] included, RowIndex[] indexes) throws IOException {
- readRowIndex(currentStripe, included, indexes);
- }
-
- @Override
- public List<Stream> getCurrentStreams() throws IOException {
- return stripeFooter.getStreamsList();
- }
-
- @Override
- public void setMetadata(
- RowIndex[] index, List<ColumnEncoding> encodings, List<Stream> streams) {
- assert index.length == indexes.length;
- System.arraycopy(index, 0, indexes, 0, index.length);
- this.streamList = streams;
- this.encodings = encodings;
- }
  }

Search Discussions

  • Sershe at Feb 11, 2015 at 1:44 am
    Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
    URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1658861&view=auto
    ==============================================================================
    --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java (added)
    +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java Wed Feb 11 01:44:14 2015
    @@ -0,0 +1,422 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements. See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hive.ql.io.orc;
    +
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import org.apache.commons.lang.builder.HashCodeBuilder;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.hive.common.DiskRange;
    +import org.apache.hadoop.hive.llap.DebugUtils;
    +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
    +import org.apache.hadoop.hive.shims.ShimLoader;
    +import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
    +import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
    +
    +import com.google.common.collect.ComparisonChain;
    +
    +/**
    + * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
    + */
    +class RecordReaderUtils {
    + static boolean[] findPresentStreamsByColumn(
    + List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
    + boolean[] hasNull = new boolean[types.size()];
    + for(OrcProto.Stream stream: streamList) {
    + if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
    + hasNull[stream.getColumn()] = true;
    + }
    + }
    + return hasNull;
    + }
    +
    + /**
    + * Does region A overlap region B? The end points are inclusive on both sides.
    + * @param leftA A's left point
    + * @param rightA A's right point
    + * @param leftB B's left point
    + * @param rightB B's right point
    + * @return Does region A overlap region B?
    + */
    + static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
    + if (leftA <= leftB) {
    + return rightA >= leftB;
    + }
    + return rightB >= leftA;
    + }
    +
    +
    + static DiskRange addEntireStreamToRanges(long offset, long length,
    + DiskRange lastRange, LinkedList<DiskRange> result) {
    + long end = offset + length;
    + if (lastRange != null && overlap(lastRange.offset, lastRange.end, offset, end)) {
    + lastRange.offset = Math.min(lastRange.offset, offset);
    + lastRange.end = Math.max(lastRange.end, end);
    + } else {
    + lastRange = new DiskRange(offset, end);
    + result.add(lastRange);
    + }
    + return lastRange;
    + }
    +
    + static DiskRange addRgFilteredStreamToRanges(OrcProto.Stream stream,
    + boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
    + OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
    + long offset, long length, DiskRange lastRange, LinkedList<DiskRange> result) {
    + for (int group = 0; group < includedRowGroups.length; ++group) {
    + if (!includedRowGroups[group]) continue;
    + int posn = getIndexPosition(
    + encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
    + long start = index.getEntry(group).getPositions(posn);
    + final long nextGroupOffset;
    + boolean isLast = group == (includedRowGroups.length - 1);
    + nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
    +
    + start += offset;
    + long end = offset + estimateRgEndOffset(
    + isCompressed, isLast, nextGroupOffset, length, compressionSize);
    + if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) {
    + lastRange.offset = Math.min(lastRange.offset, start);
    + lastRange.end = Math.max(lastRange.end, end);
    + } else {
    + if (DebugUtils.isTraceOrcEnabled()) {
    + RecordReaderImpl.LOG.info("Creating new range for RG read; last range (which can "
    + + "include some previous RGs) was " + lastRange);
    + }
    + lastRange = new DiskRange(start, end);
    + result.add(lastRange);
    + }
    + }
    + return lastRange;
    + }
    +
    + static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
    + long nextGroupOffset, long streamLength, int bufferSize) {
    + // figure out the worst case last location
    + // if adjacent groups have the same compressed block offset then stretch the slop
    + // by factor of 2 to safely accommodate the next compression block.
    + // One for the current compression block and another for the next compression block.
    + long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
    + return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
    + }
    +
    + private static final int BYTE_STREAM_POSITIONS = 1;
    + private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
    + private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
    + private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
    +
    + /**
    + * Get the offset in the index positions for the column that the given
    + * stream starts.
    + * @param columnEncoding the encoding of the column
    + * @param columnType the type of the column
    + * @param streamType the kind of the stream
    + * @param isCompressed is the file compressed
    + * @param hasNulls does the column have a PRESENT stream?
    + * @return the number of positions that will be used for that stream
    + */
    + public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
    + OrcProto.Type.Kind columnType,
    + OrcProto.Stream.Kind streamType,
    + boolean isCompressed,
    + boolean hasNulls) {
    + if (streamType == OrcProto.Stream.Kind.PRESENT) {
    + return 0;
    + }
    + int compressionValue = isCompressed ? 1 : 0;
    + int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
    + switch (columnType) {
    + case BOOLEAN:
    + case BYTE:
    + case SHORT:
    + case INT:
    + case LONG:
    + case FLOAT:
    + case DOUBLE:
    + case DATE:
    + case STRUCT:
    + case MAP:
    + case LIST:
    + case UNION:
    + return base;
    + case CHAR:
    + case VARCHAR:
    + case STRING:
    + if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
    + columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
    + return base;
    + } else {
    + if (streamType == OrcProto.Stream.Kind.DATA) {
    + return base;
    + } else {
    + return base + BYTE_STREAM_POSITIONS + compressionValue;
    + }
    + }
    + case BINARY:
    + if (streamType == OrcProto.Stream.Kind.DATA) {
    + return base;
    + }
    + return base + BYTE_STREAM_POSITIONS + compressionValue;
    + case DECIMAL:
    + if (streamType == OrcProto.Stream.Kind.DATA) {
    + return base;
    + }
    + return base + BYTE_STREAM_POSITIONS + compressionValue;
    + case TIMESTAMP:
    + if (streamType == OrcProto.Stream.Kind.DATA) {
    + return base;
    + }
    + return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
    + default:
    + throw new IllegalArgumentException("Unknown type " + columnType);
    + }
    + }
    +
    + // for uncompressed streams, what is the most overlap with the following set
    + // of rows (long vint literal group).
    + static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
    +
    + /**
    + * Is this stream part of a dictionary?
    + * @return is this part of a dictionary?
    + */
    + static boolean isDictionary(OrcProto.Stream.Kind kind,
    + OrcProto.ColumnEncoding encoding) {
    + assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
    + OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
    + return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
    + (kind == OrcProto.Stream.Kind.LENGTH &&
    + (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
    + encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
    + }
    +
    + /**
    + * Build a string representation of a list of disk ranges.
    + * @param ranges ranges to stringify
    + * @return the resulting string
    + */
    + static String stringifyDiskRanges(List<DiskRange> ranges) {
    + StringBuilder buffer = new StringBuilder();
    + buffer.append("[");
    + for(int i=0; i < ranges.size(); ++i) {
    + if (i != 0) {
    + buffer.append(", ");
    + }
    + buffer.append(ranges.get(i).toString());
    + }
    + buffer.append("]");
    + return buffer.toString();
    + }
    +
    + /**
    + * Read the list of ranges from the file.
    + * @param file the file to read
    + * @param base the base of the stripe
    + * @param ranges the disk ranges within the stripe to read
    + * @return the bytes read for each disk range, which is the same length as
    + * ranges
    + * @throws IOException
    + */
    + static void readDiskRanges(FSDataInputStream file,
    + ZeroCopyReaderShim zcr,
    + long base,
    + LinkedList<DiskRange> ranges,
    + boolean doForceDirect) throws IOException {
    + ListIterator<DiskRange> rangeIter = ranges.listIterator();
    + while (rangeIter.hasNext()) {
    + DiskRange range = rangeIter.next();
    + if (range.hasData()) continue;
    + int len = (int) (range.end - range.offset);
    + long off = range.offset;
    + file.seek(base + off);
    + if (zcr != null) {
    + boolean hasReplaced = false;
    + while (len > 0) {
    + ByteBuffer partial = zcr.readBuffer(len, false);
    + BufferChunk bc = new BufferChunk(partial, off);
    + if (!hasReplaced) {
    + rangeIter.set(bc);
    + hasReplaced = true;
    + } else {
    + rangeIter.add(bc);
    + }
    + int read = partial.remaining();
    + len -= read;
    + off += read;
    + }
    + } else if (doForceDirect) {
    + ByteBuffer directBuf = ByteBuffer.allocateDirect(len);
    + try {
    + while (directBuf.remaining() > 0) {
    + int count = file.read(directBuf);
    + if (count < 0) throw new EOFException();
    + directBuf.position(directBuf.position() + count);
    + }
    + } catch (UnsupportedOperationException ex) {
    + RecordReaderImpl.LOG.error("Stream does not support direct read; we will copy.");
    + byte[] buffer = new byte[len];
    + file.readFully(buffer, 0, buffer.length);
    + directBuf.put(buffer);
    + }
    + directBuf.position(0);
    + rangeIter.set(new BufferChunk(directBuf, range.offset));
    + } else {
    + byte[] buffer = new byte[len];
    + file.readFully(buffer, 0, buffer.length);
    + rangeIter.set(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
    + }
    + }
    + }
    +
    +
    + static List<DiskRange> getStreamBuffers(List<DiskRange> ranges, long offset, long length) {
    + // This assumes sorted ranges (as do many other parts of ORC code.
    + ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
    + long streamEnd = offset + length;
    + boolean inRange = false;
    + for (DiskRange range : ranges) {
    + if (!inRange) {
    + if (range.end <= offset) continue; // Skip until we are in range.
    + inRange = true;
    + if (range.offset < offset) {
    + // Partial first buffer, add a slice of it.
    + DiskRange partial = range.slice(offset, Math.min(streamEnd, range.end));
    + partial.shiftBy(-offset);
    + buffers.add(partial);
    + if (range.end >= streamEnd) break; // Partial first buffer is also partial last buffer.
    + continue;
    + }
    + } else if (range.offset >= streamEnd) {
    + break;
    + }
    + if (range.end > streamEnd) {
    + // Partial last buffer (may also be the first buffer), add a slice of it.
    + DiskRange partial = range.slice(range.offset, streamEnd);
    + partial.shiftBy(-offset);
    + buffers.add(partial);
    + break;
    + }
    + // Buffer that belongs entirely to one stream.
    + // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
    + // because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
    + DiskRange full = range.slice(range.offset, range.end);
    + full.shiftBy(-offset);
    + buffers.add(full);
    + if (range.end == streamEnd) break;
    + }
    + return buffers;
    + }
    +
    + static ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
    + CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
    + if ((codec == null || ((codec instanceof DirectDecompressionCodec)
    + && ((DirectDecompressionCodec) codec).isAvailable()))) {
    + /* codec is null or is available */
    + return ShimLoader.getHadoopShims().getZeroCopyReader(file, pool);
    + }
    + return null;
    + }
    +
    + // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
    + // which lacks a clear()/clean() operation
    + public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim {
    + private static final class Key implements Comparable<Key> {
    + private final int capacity;
    + private final long insertionGeneration;
    +
    + Key(int capacity, long insertionGeneration) {
    + this.capacity = capacity;
    + this.insertionGeneration = insertionGeneration;
    + }
    +
    + @Override
    + public int compareTo(Key other) {
    + return ComparisonChain.start().compare(capacity, other.capacity)
    + .compare(insertionGeneration, other.insertionGeneration).result();
    + }
    +
    + @Override
    + public boolean equals(Object rhs) {
    + if (rhs == null) {
    + return false;
    + }
    + try {
    + Key o = (Key) rhs;
    + return (compareTo(o) == 0);
    + } catch (ClassCastException e) {
    + return false;
    + }
    + }
    +
    + @Override
    + public int hashCode() {
    + return new HashCodeBuilder().append(capacity).append(insertionGeneration)
    + .toHashCode();
    + }
    + }
    +
    + private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
    +
    + private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
    +
    + private long currentGeneration = 0;
    +
    + private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
    + return direct ? directBuffers : buffers;
    + }
    +
    + public void clear() {
    + buffers.clear();
    + directBuffers.clear();
    + }
    +
    + @Override
    + public ByteBuffer getBuffer(boolean direct, int length) {
    + TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
    + Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
    + if (entry == null) {
    + return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
    + .allocate(length);
    + }
    + tree.remove(entry.getKey());
    + return entry.getValue();
    + }
    +
    + @Override
    + public void putBuffer(ByteBuffer buffer) {
    + TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
    + while (true) {
    + Key key = new Key(buffer.capacity(), currentGeneration++);
    + if (!tree.containsKey(key)) {
    + tree.put(key, buffer);
    + return;
    + }
    + // Buffers are indexed by (capacity, generation).
    + // If our key is not unique on the first try, we try again
    + }
    + }
    + }
    +}

    Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
    URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1658861&r1=1658860&r2=1658861&view=diff
    ==============================================================================
    --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original)
    +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Wed Feb 11 01:44:14 2015
    @@ -759,15 +759,15 @@ public class TestRecordReaderImpl {

        @Test
        public void testOverlap() throws Exception {
    - assertTrue(!RecordReaderImpl.overlap(0, 10, -10, -1));
    - assertTrue(RecordReaderImpl.overlap(0, 10, -1, 0));
    - assertTrue(RecordReaderImpl.overlap(0, 10, -1, 1));
    - assertTrue(RecordReaderImpl.overlap(0, 10, 2, 8));
    - assertTrue(RecordReaderImpl.overlap(0, 10, 5, 10));
    - assertTrue(RecordReaderImpl.overlap(0, 10, 10, 11));
    - assertTrue(RecordReaderImpl.overlap(0, 10, 0, 10));
    - assertTrue(RecordReaderImpl.overlap(0, 10, -1, 11));
    - assertTrue(!RecordReaderImpl.overlap(0, 10, 11, 12));
    + assertTrue(!RecordReaderUtils.overlap(0, 10, -10, -1));
    + assertTrue(RecordReaderUtils.overlap(0, 10, -1, 0));
    + assertTrue(RecordReaderUtils.overlap(0, 10, -1, 1));
    + assertTrue(RecordReaderUtils.overlap(0, 10, 2, 8));
    + assertTrue(RecordReaderUtils.overlap(0, 10, 5, 10));
    + assertTrue(RecordReaderUtils.overlap(0, 10, 10, 11));
    + assertTrue(RecordReaderUtils.overlap(0, 10, 0, 10));
    + assertTrue(RecordReaderUtils.overlap(0, 10, -1, 11));
    + assertTrue(!RecordReaderUtils.overlap(0, 10, 11, 12));
        }

        private static List<DiskRange> diskRanges(Integer... points) {
    @@ -806,55 +806,55 @@ public class TestRecordReaderImpl {

        @Test
        public void testGetIndexPosition() throws Exception {
    - assertEquals(0, RecordReaderImpl.getIndexPosition
    + assertEquals(0, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.INT,
                  OrcProto.Stream.Kind.PRESENT, true, true));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.INT,
                  OrcProto.Stream.Kind.DATA, true, true));
    - assertEquals(3, RecordReaderImpl.getIndexPosition
    + assertEquals(3, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.INT,
                  OrcProto.Stream.Kind.DATA, false, true));
    - assertEquals(0, RecordReaderImpl.getIndexPosition
    + assertEquals(0, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.INT,
                  OrcProto.Stream.Kind.DATA, true, false));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DICTIONARY, OrcProto.Type.Kind.STRING,
                  OrcProto.Stream.Kind.DATA, true, true));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.BINARY,
                  OrcProto.Stream.Kind.DATA, true, true));
    - assertEquals(3, RecordReaderImpl.getIndexPosition
    + assertEquals(3, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.BINARY,
                  OrcProto.Stream.Kind.DATA, false, true));
    - assertEquals(6, RecordReaderImpl.getIndexPosition
    + assertEquals(6, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.BINARY,
                  OrcProto.Stream.Kind.LENGTH, true, true));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.BINARY,
                  OrcProto.Stream.Kind.LENGTH, false, true));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.DECIMAL,
                  OrcProto.Stream.Kind.DATA, true, true));
    - assertEquals(3, RecordReaderImpl.getIndexPosition
    + assertEquals(3, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.DECIMAL,
                  OrcProto.Stream.Kind.DATA, false, true));
    - assertEquals(6, RecordReaderImpl.getIndexPosition
    + assertEquals(6, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.DECIMAL,
                  OrcProto.Stream.Kind.SECONDARY, true, true));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.DECIMAL,
                  OrcProto.Stream.Kind.SECONDARY, false, true));
    - assertEquals(4, RecordReaderImpl.getIndexPosition
    + assertEquals(4, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.TIMESTAMP,
                  OrcProto.Stream.Kind.DATA, true, true));
    - assertEquals(3, RecordReaderImpl.getIndexPosition
    + assertEquals(3, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.TIMESTAMP,
                  OrcProto.Stream.Kind.DATA, false, true));
    - assertEquals(7, RecordReaderImpl.getIndexPosition
    + assertEquals(7, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.TIMESTAMP,
                  OrcProto.Stream.Kind.SECONDARY, true, true));
    - assertEquals(5, RecordReaderImpl.getIndexPosition
    + assertEquals(5, RecordReaderUtils.getIndexPosition
              (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.TIMESTAMP,
                  OrcProto.Stream.Kind.SECONDARY, false, true));
        }
    @@ -932,9 +932,9 @@ public class TestRecordReaderImpl {
          result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
              columns, rowGroups, false, encodings, types, 32768);
          assertThat(result, is(diskRanges(0, 1000, 100, 1000, 400, 1000,
    - 1000, 11000 + RecordReaderImpl.WORST_UNCOMPRESSED_SLOP,
    - 11000, 21000 + RecordReaderImpl.WORST_UNCOMPRESSED_SLOP,
    - 41000, 51000 + RecordReaderImpl.WORST_UNCOMPRESSED_SLOP)));
    + 1000, 11000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
    + 11000, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
    + 41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));

          // if we read no rows, don't read any bytes
          rowGroups = new boolean[]{false, false, false, false, false, false};
    @@ -955,7 +955,7 @@ public class TestRecordReaderImpl {
          result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
              columns, rowGroups, false, encodings, types, 32768);
          assertThat(result, is(diskRanges(100100, 102000,
    - 112000, 122000 + RecordReaderImpl.WORST_UNCOMPRESSED_SLOP)));
    + 112000, 122000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));

          rowGroups = new boolean[]{false, false, false, false, false, true};
          indexes[1] = indexes[2];
    @@ -1128,8 +1128,8 @@ public class TestRecordReaderImpl {
          result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
              columns, rowGroups, false, encodings, types, 32768);
          assertThat(result, is(diskRanges(100, 1000, 400, 1000, 500, 1000,
    - 11000, 21000 + RecordReaderImpl.WORST_UNCOMPRESSED_SLOP,
    - 41000, 51000 + RecordReaderImpl.WORST_UNCOMPRESSED_SLOP,
    + 11000, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
    + 41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
              51000, 95000, 95000, 97000, 97000, 100000)));
        }
      }

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedFeb 11, '15 at 1:44a
activeFeb 11, '15 at 1:44a
posts2
users1
websitehive.apache.org

1 user in discussion

Sershe: 2 posts

People

Translate

site design / logo © 2021 Grokbase