FAQ
Repository: hive
Updated Branches:
   refs/heads/master 97bf32a12 -> 22fa9216d


HIVE-11595 : refactor ORC footer reading to make it usable from outside (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22fa9216
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22fa9216
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22fa9216

Branch: refs/heads/master
Commit: 22fa9216d4e32d7681d3c1be8cbedc8c7999e56d
Parents: 97bf32a
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Fri Aug 28 18:23:05 2015 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Fri Aug 28 18:23:05 2015 -0700

----------------------------------------------------------------------
  .../apache/hadoop/hive/ql/io/orc/Reader.java | 6 +
  .../hadoop/hive/ql/io/orc/ReaderImpl.java | 281 +++++++++++++------
  2 files changed, 204 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 7bddefc..187924d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
  import java.nio.ByteBuffer;
  import java.util.List;

+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

@@ -358,4 +359,9 @@ public interface Reader {
                      String[] neededColumns) throws IOException;

    MetadataReader metadata() throws IOException;
+
+ /** Gets serialized file metadata read from disk for the purposes of caching, etc. */
+ ByteBuffer getSerializedFileFooter();
+
+ Footer getFooter();
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index c990d85..ab539c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.DiskRange;
  import org.apache.hadoop.hive.ql.io.FileFormatException;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
  import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
@@ -74,6 +76,9 @@ public class ReaderImpl implements Reader {
    // will help avoid cpu cycles spend in deserializing at cost of increased
    // memory footprint.
    private final ByteBuffer footerByteBuffer;
+ // Same for metastore cache - maintains the same background buffer, but includes postscript.
+ // This will only be set if the file footer/metadata was read from disk.
+ private final ByteBuffer footerMetaAndPsBuffer;

    static class StripeInformationImpl
        implements StripeInformation {
@@ -166,11 +171,7 @@ public class ReaderImpl implements Reader {

    @Override
    public List<StripeInformation> getStripes() {
- List<StripeInformation> result = new ArrayList<StripeInformation>();
- for(OrcProto.StripeInformation info: footer.getStripesList()) {
- result.add(new StripeInformationImpl(info));
- }
- return result;
+ return convertProtoStripesToStripes(footer.getStripesList());
    }

    @Override
@@ -274,7 +275,7 @@ public class ReaderImpl implements Reader {
     * Check to see if this ORC file is from a future version and if so,
     * warn the user that we may not be able to read all of the column encodings.
     * @param log the logger to write any error message to
- * @param path the filename for error messages
+ * @param path the data source path for error messages
     * @param version the version of hive that wrote the file.
     */
    static void checkOrcVersion(Log log, Path path, List<Integer> version) {
@@ -287,8 +288,7 @@ public class ReaderImpl implements Reader {
        if (major > OrcFile.Version.CURRENT.getMajor() ||
            (major == OrcFile.Version.CURRENT.getMajor() &&
             minor > OrcFile.Version.CURRENT.getMinor())) {
- log.warn("ORC file " + path +
- " was written by a future Hive version " +
+ log.warn(path + " was written by a future Hive version " +
                   versionString(version) +
                   ". This file may not be readable by this version of Hive.");
        }
@@ -313,9 +313,11 @@ public class ReaderImpl implements Reader {
      FileMetaInfo footerMetaData;
      if (options.getFileMetaInfo() != null) {
        footerMetaData = options.getFileMetaInfo();
+ this.footerMetaAndPsBuffer = null;
      } else {
        footerMetaData = extractMetaInfoFromFooter(fs, path,
            options.getMaxLength());
+ this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
      }
      MetaInfoObjExtractor rInfo =
          new MetaInfoObjExtractor(footerMetaData.compressionType,
@@ -349,6 +351,111 @@ public class ReaderImpl implements Reader {
      return OrcFile.WriterVersion.ORIGINAL;
    }

+ /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */
+ public static FooterInfo extractMetaInfoFromFooter(
+ ByteBuffer bb, Path srcPath) throws IOException {
+ // Read the PostScript. Be very careful as some parts of this historically use bb position
+ // and some use absolute offsets that have to take position into account.
+ int baseOffset = bb.position();
+ int lastByteAbsPos = baseOffset + bb.remaining() - 1;
+ int psLen = bb.get(lastByteAbsPos) & 0xff;
+ int psAbsPos = lastByteAbsPos - psLen;
+ OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos);
+ assert baseOffset == bb.position();
+
+ // Extract PS information.
+ int footerSize = (int)ps.getFooterLength(), metadataSize = (int)ps.getMetadataLength(),
+ footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - metadataSize;
+ String compressionType = ps.getCompression().toString();
+ CompressionCodec codec = WriterImpl.createCodec(CompressionKind.valueOf(compressionType));
+ int bufferSize = (int)ps.getCompressionBlockSize();
+ bb.position(metadataAbsPos);
+ bb.mark();
+
+ // Extract metadata and footer.
+ Metadata metadata = new Metadata(extractMetadata(
+ bb, metadataAbsPos, metadataSize, codec, bufferSize));
+ OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, codec, bufferSize);
+ bb.position(metadataAbsPos);
+ bb.limit(psAbsPos);
+ // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess...
+ FileMetaInfo fmi = new FileMetaInfo(
+ compressionType, bufferSize, metadataSize, bb, extractWriterVersion(ps));
+ return new FooterInfo(metadata, footer, fmi);
+ }
+
+ private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+ int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(footerAbsPos);
+ bb.limit(footerAbsPos + footerSize);
+ InputStream instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), footerSize, codec, bufferSize);
+ return OrcProto.Footer.parseFrom(instream);
+ }
+
+ private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+ int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(metadataAbsPos);
+ bb.limit(metadataAbsPos + metadataSize);
+ InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+ CodedInputStream in = CodedInputStream.newInstance(instream);
+ int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
+ OrcProto.Metadata meta = null;
+ do {
+ try {
+ in.setSizeLimit(msgLimit);
+ meta = OrcProto.Metadata.parseFrom(in);
+ } catch (InvalidProtocolBufferException e) {
+ if (e.getMessage().contains("Protocol message was too large")) {
+ LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
+ " size of the coded input stream." );
+
+ msgLimit = msgLimit << 1;
+ if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
+ LOG.error("Metadata section exceeds max protobuf message size of " +
+ PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
+ throw e;
+ }
+
+ // we must have failed in the middle of reading instream and instream doesn't support
+ // resetting the stream
+ instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+ in = CodedInputStream.newInstance(instream);
+ } else {
+ throw e;
+ }
+ }
+ } while (meta == null);
+ return meta;
+ }
+
+ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+ int psLen, int psAbsOffset) throws IOException {
+ // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+ assert bb.hasArray();
+ CodedInputStream in = CodedInputStream.newInstance(
+ bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+ checkOrcVersion(LOG, path, ps.getVersionList());
+
+ // Check compression codec.
+ switch (ps.getCompression()) {
+ case NONE:
+ break;
+ case ZLIB:
+ break;
+ case SNAPPY:
+ break;
+ case LZO:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
+ return ps;
+ }
+
    private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
                                                          Path path,
                                                          long maxFileLength
@@ -367,44 +474,24 @@ public class ReaderImpl implements Reader {
      int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
      file.seek(size - readSize);
      ByteBuffer buffer = ByteBuffer.allocate(readSize);
- file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
+ assert buffer.position() == 0;
+ file.readFully(buffer.array(), buffer.arrayOffset(), readSize);
+ buffer.position(0);

      //read the PostScript
      //get length of PostScript
      int psLen = buffer.get(readSize - 1) & 0xff;
      ensureOrcFooter(file, path, psLen, buffer);
      int psOffset = readSize - 1 - psLen;
- CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
- buffer.arrayOffset() + psOffset, psLen);
- OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
-
- checkOrcVersion(LOG, path, ps.getVersionList());
+ OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset);

      int footerSize = (int) ps.getFooterLength();
      int metadataSize = (int) ps.getMetadataLength();
- OrcFile.WriterVersion writerVersion;
- if (ps.hasWriterVersion()) {
- writerVersion = getWriterVersion(ps.getWriterVersion());
- } else {
- writerVersion = OrcFile.WriterVersion.ORIGINAL;
- }
+ OrcFile.WriterVersion writerVersion = extractWriterVersion(ps);

- //check compression codec
- switch (ps.getCompression()) {
- case NONE:
- break;
- case ZLIB:
- break;
- case SNAPPY:
- break;
- case LZO:
- break;
- default:
- throw new IllegalArgumentException("Unknown compression");
- }

      //check if extra bytes need to be read
+ ByteBuffer fullFooterBuffer = null;
      int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
      if (extra > 0) {
        //more bytes need to be read, seek back to the right place and read extra bytes
@@ -417,10 +504,12 @@ public class ReaderImpl implements Reader {
        extraBuf.put(buffer);
        buffer = extraBuf;
        buffer.position(0);
+ fullFooterBuffer = buffer.slice();
        buffer.limit(footerSize + metadataSize);
      } else {
        //footer is already in the bytes in buffer, just adjust position, length
        buffer.position(psOffset - footerSize - metadataSize);
+ fullFooterBuffer = buffer.slice();
        buffer.limit(psOffset);
      }

@@ -435,11 +524,24 @@ public class ReaderImpl implements Reader {
          (int) ps.getMetadataLength(),
          buffer,
          ps.getVersionList(),
- writerVersion
+ writerVersion,
+ fullFooterBuffer
          );
    }

+ private static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) {
+ return (ps.hasWriterVersion()
+ ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
+ }

+ private static List<StripeInformation> convertProtoStripesToStripes(
+ List<OrcProto.StripeInformation> stripes) {
+ List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size());
+ for (OrcProto.StripeInformation info : stripes) {
+ result.add(new StripeInformationImpl(info));
+ }
+ return result;
+ }

    /**
     * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl
@@ -467,46 +569,10 @@ public class ReaderImpl implements Reader {

        int position = footerBuffer.position();
        int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
- footerBuffer.limit(position + metadataSize);
-
- InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
- CodedInputStream in = CodedInputStream.newInstance(instream);
- int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
- OrcProto.Metadata meta = null;
- do {
- try {
- in.setSizeLimit(msgLimit);
- meta = OrcProto.Metadata.parseFrom(in);
- } catch (InvalidProtocolBufferException e) {
- if (e.getMessage().contains("Protocol message was too large")) {
- LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
- " size of the coded input stream." );
-
- msgLimit = msgLimit << 1;
- if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
- LOG.error("Metadata section exceeds max protobuf message size of " +
- PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
- throw e;
- }
-
- // we must have failed in the middle of reading instream and instream doesn't support
- // resetting the stream
- instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
- in = CodedInputStream.newInstance(instream);
- } else {
- throw e;
- }
- }
- } while (meta == null);
- this.metadata = meta;

- footerBuffer.position(position + metadataSize);
- footerBuffer.limit(position + metadataSize + footerBufferSize);
- instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize);
- this.footer = OrcProto.Footer.parseFrom(instream);
+ this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize);
+ this.footer = extractFooter(
+ footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize);

        footerBuffer.position(position);
        this.inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
@@ -518,7 +584,8 @@ public class ReaderImpl implements Reader {
     * that is useful for Reader implementation
     *
     */
- static class FileMetaInfo{
+ static class FileMetaInfo {
+ private ByteBuffer footerMetaAndPsBuffer;
      final String compressionType;
      final int bufferSize;
      final int metadataSize;
@@ -526,30 +593,68 @@ public class ReaderImpl implements Reader {
      final List<Integer> versionList;
      final OrcFile.WriterVersion writerVersion;

+ /** Ctor used when reading splits - no version list or full footer buffer. */
      FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
          ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
        this(compressionType, bufferSize, metadataSize, footerBuffer, null,
- writerVersion);
+ writerVersion, null);
      }

- FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer, List<Integer> versionList,
- OrcFile.WriterVersion writerVersion){
+ /** Ctor used when creating file info during init and when getting a new one. */
+ public FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+ ByteBuffer footerBuffer, List<Integer> versionList, WriterVersion writerVersion,
+ ByteBuffer fullFooterBuffer) {
        this.compressionType = compressionType;
        this.bufferSize = bufferSize;
        this.metadataSize = metadataSize;
        this.footerBuffer = footerBuffer;
        this.versionList = versionList;
        this.writerVersion = writerVersion;
+ this.footerMetaAndPsBuffer = fullFooterBuffer;
      }
    }

- public FileMetaInfo getFileMetaInfo(){
+ public FileMetaInfo getFileMetaInfo() {
      return new FileMetaInfo(compressionKind.toString(), bufferSize,
- metadataSize, footerByteBuffer, versionList, writerVersion);
+ metadataSize, footerByteBuffer, versionList, writerVersion, footerMetaAndPsBuffer);
    }

+ /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized for splits
+ * and so we don't just add fields to it, it's already messy and confusing. */
+ public static final class FooterInfo {
+ private final OrcProto.Footer footer;
+ private final Metadata metadata;
+ private final List<StripeInformation> stripes;
+ private final FileMetaInfo fileMetaInfo;

+ private FooterInfo(Metadata metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) {
+ this.metadata = metadata;
+ this.footer = footer;
+ this.fileMetaInfo = fileMetaInfo;
+ this.stripes = convertProtoStripesToStripes(footer.getStripesList());
+ }
+
+ public OrcProto.Footer getFooter() {
+ return footer;
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
+ }
+
+ public FileMetaInfo getFileMetaInfo() {
+ return fileMetaInfo;
+ }
+
+ public List<StripeInformation> getStripes() {
+ return stripes;
+ }
+ }
+
+ @Override
+ public ByteBuffer getSerializedFileFooter() {
+ return footerMetaAndPsBuffer;
+ }

    @Override
    public RecordReader rows() throws IOException {
@@ -609,14 +714,19 @@ public class ReaderImpl implements Reader {

    @Override
    public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+ return getRawDataSizeFromColIndices(colIndices, footer);
+ }
+
+ public static long getRawDataSizeFromColIndices(
+ List<Integer> colIndices, OrcProto.Footer footer) {
      long result = 0;
      for (int colIdx : colIndices) {
- result += getRawDataSizeOfColumn(colIdx);
+ result += getRawDataSizeOfColumn(colIdx, footer);
      }
      return result;
    }

- private long getRawDataSizeOfColumn(int colIdx) {
+ private static long getRawDataSizeOfColumn(int colIdx, OrcProto.Footer footer) {
      OrcProto.ColumnStatistics colStat = footer.getStatistics(colIdx);
      long numVals = colStat.getNumberOfValues();
      Type type = footer.getTypes(colIdx);
@@ -738,4 +848,9 @@ public class ReaderImpl implements Reader {
    public MetadataReader metadata() throws IOException {
      return new MetadataReader(fileSystem, path, codec, bufferSize, footer.getTypesCount());
    }
+
+ @Override
+ public Footer getFooter() {
+ return footer;
+ }
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedAug 29, '15 at 1:25a
activeAug 29, '15 at 1:25a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase