FAQ
Author: sershe
Date: Tue Feb 10 01:29:01 2015
New Revision: 1658607

URL: http://svn.apache.org/r1658607
Log:
HIVE-9418p7 : Additional logging; also fix build break from some previous commit

Added:
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
Removed:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
Modified:
     hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
     hive/branches/llap/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
     hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1658607&r1=1658606&r2=1658607&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Feb 10 01:29:01 2015
@@ -1896,7 +1896,7 @@ public class HiveConf extends Configurat
          "and use it to run queries."),

      // Vectorization enabled
- HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", true,
+ HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false,
          "This flag should be set to true to enable vectorized mode of query execution.\n" +
          "The default value is false."),
      HIVE_VECTORIZATION_REDUCE_ENABLED("hive.vectorized.execution.reduce.enabled", true,
@@ -2003,7 +2003,7 @@ public class HiveConf extends Configurat
          "hive.tez.exec.inplace.progress",
          true,
          "Updates tez job execution progress in-place in the terminal."),
- LLAP_IO_ENABLED("hive.llap.io.enabled", true, ""),
+ LLAP_IO_ENABLED("hive.llap.io.enabled", false, ""),
      LLAP_LOW_LEVEL_CACHE("hive.llap.io.use.lowlevel.cache", true, ""),
      LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.io.cache.orc.alloc.min", 128 * 1024, ""),
      LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024, ""),

Modified: hive/branches/llap/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1658607&r1=1658606&r2=1658607&view=diff
==============================================================================
--- hive/branches/llap/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/branches/llap/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Tue Feb 10 01:29:01 2015
@@ -242,10 +242,10 @@ public class QTestUtil {
    }

    public QTestUtil(String outDir, String logDir, String initScript,
- String cleanupScript, String tezDirectory) throws
- Exception {
+ String cleanupScript, String tezDirectory) throws
+ Exception {
      this(outDir, logDir, MiniClusterType.none, null, "0.20", initScript,
- cleanupScript, tezDirectory);
+ cleanupScript, tezDirectory);
    }

    public String getOutputDirectory() {
@@ -348,8 +348,8 @@ public class QTestUtil {
    }

    public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
- String confDir, String hadoopVer, String initScript,
- String cleanupScript, String tezDirectory)
+ String confDir, String hadoopVer, String initScript,
+ String cleanupScript, String tezDirectory)
      throws Exception {

      this.outDir = outDir;
@@ -398,10 +398,10 @@ public class QTestUtil {
        String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
        if (clusterType == MiniClusterType.tez) {
          mr = shims.getMiniTezCluster(conf, 4, uriString, 1, false,
- tezDir + "/staging"););
+ tezDir + "/staging");
        } else if (clusterType == MiniClusterType.tezlocal) {
- mr = shims.getMiniTezCluster(conf, 4, uriString, 1, true,
- tezDir + "/staging");
+ mr = shims.getMiniTezCluster(conf, 4, uriString, 1, true,
+ tezDir + "/staging");
        } else if (clusterType == MiniClusterType.miniSparkOnYarn) {
          mr = shims.getMiniSparkCluster(conf, 4, uriString, 1);
        } else {
@@ -1581,7 +1581,7 @@ public class QTestUtil {
        // close it first.
        SessionState ss = SessionState.get();
        if (ss != null && ss.out != null && ss.out != System.out) {
- ss.out.close();
+ ss.out.close();
        }

        String inSorted = inFileName + SORT_SUFFIX;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1658607&r1=1658606&r2=1658607&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Tue Feb 10 01:29:01 2015
@@ -230,11 +230,19 @@ public class OrcEncodedDataProducer impl
              stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
              if (stripeMetadata == null) {
                stripeMetadata = new OrcStripeMetadata(stripeReader, stripeIncludes);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
+ + " metadata with includes: " + DebugUtils.toString(stripeIncludes));
+ }
                metadataCache.putStripeMetadata(stripeKey, stripeMetadata);
                stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
              }
            }
            if (!stripeMetadata.hasAllIndexes(stripeIncludes)) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx
+ + " metadata for includes: " + DebugUtils.toString(stripeIncludes));
+ }
              updateLoadedIndexes(stripeMetadata, stripeReader, stripeIncludes);
            }
            // Set stripe metadata externally in the reader.
@@ -320,8 +328,13 @@ public class OrcEncodedDataProducer impl

        if (existingReader != null) return existingReader;
        // Create RecordReader that will be used to read only this stripe.
+ // TODO: use separate class instead.
        StripeInformation si = metadata.getStripes().get(stripeIx);
        ensureOrcReader();
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Creating stripe reader " + stripeIx + ": "
+ + si.getOffset() + ", " + si.getLength());
+ }
        existingReader = orcReader.rows(si.getOffset(), si.getLength(), globalIncludes);
        existingReader.prepareEncodedColumnRead();
        return existingReader;
@@ -366,14 +379,26 @@ public class OrcEncodedDataProducer impl
          if (value == null || !value.hasAllIndexes(globalInc)) {
            ensureOrcReader();
            StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Creating stripe reader " + stripeKey.stripeIx + ": "
+ + si.getOffset() + ", " + si.getLength());
+ }
            stripeReaders[stripeIxMod] = orcReader.rows(si.getOffset(), si.getLength(), globalInc);
            stripeReaders[stripeIxMod].prepareEncodedColumnRead();
            if (value == null) {
              value = new OrcStripeMetadata(stripeReaders[stripeIxMod], globalInc);
              metadataCache.putStripeMetadata(stripeKey, value);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
+ + " metadata with includes: " + DebugUtils.toString(globalInc));
+ }
              // Create new key object to reuse for gets; we've used the old one to put in cache.
              stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
            } else {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx
+ + " metadata for includes: " + DebugUtils.toString(globalInc));
+ }
              updateLoadedIndexes(value, stripeReaders[stripeIxMod], globalInc);
            }
          }
@@ -404,13 +429,21 @@ public class OrcEncodedDataProducer impl
        }
        // readState should have been initialized by this time with an empty array.
        for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
- int originalStripeIx = stripeIxMod + stripeIxFrom;
- StripeInformation stripe = stripes.get(originalStripeIx);
+ int stripeIx = stripeIxMod + stripeIxFrom;
+ StripeInformation stripe = stripes.get(stripeIx);
          int rgCount = getRgCount(stripe, rowIndexStride);
          boolean[] rgsToRead = null;
          if (sargApp != null) {
            rgsToRead = sargApp.pickRowGroups(stripe, metadata.get(stripeIxMod).getRowIndexes());
          }
+ if (DebugUtils.isTraceOrcEnabled()) {
+ if (rgsToRead != null ) {
+ LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": "
+ + DebugUtils.toString(rgsToRead));
+ } else {
+ LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx);
+ }
+ }
          assert rgsToRead == null || rgsToRead.length == rgCount;
          readState[stripeIxMod] = new boolean[columnIds.size()][];
          for (int j = 0; j < columnIds.size(); ++j) {
@@ -445,16 +478,16 @@ public class OrcEncodedDataProducer impl
          long stripeStart = stripe.getOffset();
          if (offset > stripeStart) continue;
          if (stripeIxFrom == -1) {
- if (DebugUtils.isTraceEnabled()) {
- LlapIoImpl.LOG.info("Including from " + stripeIx
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Including stripes from " + stripeIx
                  + " (" + stripeStart + " >= " + offset + ")");
            }
            stripeIxFrom = stripeIx;
          }
          if (stripeStart >= maxOffset) {
- if (DebugUtils.isTraceEnabled()) {
- LlapIoImpl.LOG.info("Including until " + stripeIxTo
- + " (" + stripeStart + " >= " + maxOffset + ")");
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" + stripeStart
+ + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " stripes");
            }
            stripeIxTo = stripeIx;
            break;
@@ -462,10 +495,11 @@ public class OrcEncodedDataProducer impl
          ++stripeIx;
        }
        if (stripeIxTo == -1) {
- if (DebugUtils.isTraceEnabled()) {
- LlapIoImpl.LOG.info("Including until " + stripeIx + " (end of file)");
- }
          stripeIxTo = stripeIx;
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of file); "
+ + (stripeIxTo - stripeIxFrom) + " stripes");
+ }
        }
        readState = new boolean[stripeIxTo - stripeIxFrom][][];
      }

Added: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java?rev=1658607&view=auto
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java (added)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java Tue Feb 10 01:29:01 2015
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional debugrmation
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+/**
+ * A class that contains debug methods; also allows enabling the logging of various
+ * trace messages with low runtime cost, in order to investigate reproducible bugs.
+ */
+public class DebugUtils {
+
+ public static boolean isTraceEnabled() {
+ return false;
+ }
+
+ public static boolean isTraceOrcEnabled() {
+ return true;
+ }
+
+ public static boolean isTraceLockingEnabled() {
+ return false;
+ }
+
+ public static boolean isTraceMttEnabled() {
+ return false;
+ }
+
+ public static boolean isTraceCachingEnabled() {
+ return false;
+ }
+
+ public static String toString(long[] a, int offset, int len) {
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = offset; i < offset + len; ++i) {
+ b.append(a[i]);
+ b.append(", ");
+ }
+ b.append(']');
+ return b.toString();
+ }
+
+ public static String toString(byte[] a, int offset, int len) {
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = offset; i < offset + len; ++i) {
+ b.append(a[i]);
+ b.append(", ");
+ }
+ b.append(']');
+ return b.toString();
+ }
+
+ public static String toString(boolean[] a) {
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; i < a.length; ++i) {
+ b.append(a[i] ? "1" : "0");
+ }
+ b.append(']');
+ return b.toString();
+ }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1658607&r1=1658606&r2=1658607&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Tue Feb 10 01:29:01 2015
@@ -21,7 +21,6 @@ import java.io.IOException;
  import java.io.InputStream;
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
-import java.util.LinkedList;
  import java.util.List;
  import java.util.ListIterator;

@@ -31,6 +30,7 @@ import org.apache.hadoop.hive.common.Dis
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
  import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.hadoop.hive.llap.DebugUtils;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
@@ -537,6 +537,9 @@ abstract class InStream extends InputStr
      // want to be, or just before. However, RGs can overlap due to encoding, so we may have
      // to return to a previous block.
      DiskRange current = findCompressedPosition(ranges, cOffset);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Starting uncompressStream for [" + cOffset + "," + endCOffset + ") at " + current);
+ }

      // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
      if (cOffset >= 0 && cOffset != current.offset) {
@@ -555,6 +558,9 @@ abstract class InStream extends InputStr
          }
          colBuffer.cacheBuffers.add(cc.buffer);
          currentCOffset = cc.end;
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adding an already-uncompressed buffer " + cc.buffer);
+ }
        } else {
          // 2b. This is a compressed buffer. We need to uncompress it; the buffer can comprise
          // several disk ranges, so we might need to combine them.
@@ -692,12 +698,19 @@ abstract class InStream extends InputStr
      int consumedLength = chunkLength + OutStream.HEADER_SIZE;
      long cbEndOffset = cbStartOffset + consumedLength;
      boolean isUncompressed = ((b0 & 0x01) == 1);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
+ + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
+ }
      if (compressed.remaining() >= chunkLength) {
        // Simple case - CB fits entirely in the disk range.
        slice = compressed.slice();
        slice.limit(chunkLength);
        addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset, cbEndOffset,
- compressed, chunkLength, ranges, cache, toDecompress, cacheBuffers);
+ chunkLength, ranges, current, cache, toDecompress, cacheBuffers);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adjusting " + current + " to consume " + consumedLength);
+ }
        current.offset += consumedLength;
        if (compressed.remaining() <= 0 && zcr != null) {
          toRelease.add(compressed);
@@ -709,10 +722,14 @@ abstract class InStream extends InputStr
      // We need to consolidate 2 or more buffers into one to decompress.
      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
      int remaining = chunkLength - compressed.remaining();
+ int originalPos = compressed.position();
      copy.put(compressed);
      ranges.remove();
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Removing " + current + " from ranges");
+ }
      if (zcr != null) {
- if (compressed.position() == 0) {
+ if (originalPos == 0) {
          zcr.releaseBuffer(compressed); // We copied the entire buffer.
        } else {
          toRelease.add(compressed); // There might be slices depending on this buffer.
@@ -731,7 +748,10 @@ abstract class InStream extends InputStr
          slice.limit(remaining);
          copy.put(slice);
          addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
- cbEndOffset, compressed, remaining, ranges, cache, toDecompress, cacheBuffers);
+ cbEndOffset, remaining, ranges, current, cache, toDecompress, cacheBuffers);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adjusting " + range + " to consume " + remaining);
+ }
          range.offset += remaining;
          if (compressed.remaining() <= 0 && zcr != null) {
            zcr.releaseBuffer(compressed); // We copied the entire buffer.
@@ -743,6 +763,9 @@ abstract class InStream extends InputStr
        if (zcr != null) {
          zcr.releaseBuffer(compressed); // We copied the entire buffer.
        }
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Removing " + range + " from ranges");
+ }
        ranges.remove();
      }
      throw new IOException("EOF in while trying to read "
@@ -758,13 +781,15 @@ abstract class InStream extends InputStr
     * @param lastRange The buffer from which the last (or all) bytes of fCB come.
     * @param lastPartLength The number of bytes consumed from lastRange into fCB.
     * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange.
+ * @param lastChunk
     * @param toDecompress See addOneCompressionBuffer.
     * @param cacheBuffers See addOneCompressionBuffer.
     */
    private static void addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
- boolean isUncompressed, long cbStartOffset, long cbEndOffset, ByteBuffer lastRange,
- int lastPartLength, ListIterator<DiskRange> ranges, LowLevelCache cache,
- List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
+ boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastPartLength,
+ ListIterator<DiskRange> ranges, BufferChunk lastChunk,
+ LowLevelCache cache, List<ProcCacheChunk> toDecompress,
+ List<LlapMemoryBuffer> cacheBuffers) {
      // Prepare future cache buffer.
      LlapMemoryBuffer futureAlloc = cache.createUnallocated();
      // Add it to result in order we are processing.
@@ -774,15 +799,21 @@ abstract class InStream extends InputStr
          cbStartOffset, cbEndOffset, !isUncompressed, fullCompressionBlock, futureAlloc);
      toDecompress.add(cc);
      // Adjust the compression block position.
- lastRange.position(lastRange.position() + lastPartLength);
+ lastChunk.chunk.position(lastChunk.chunk.position() + lastPartLength);
      // Finally, put it in the ranges list for future use (if shared between RGs).
      // Before anyone else accesses it, it would have been allocated and decompressed locally.
- if (lastRange.remaining() <= 0) {
+ if (lastChunk.chunk.remaining() <= 0) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
+ }
        ranges.set(cc);
      } else {
- ranges.previous();
+ DiskRange before = ranges.previous();
        ranges.add(cc);
        ranges.next(); // TODO: This is really stupid.
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adding " + cc + " before " + before + " in the buffers");
+ }
      }
    }


Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1658607&r1=1658606&r2=1658607&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Feb 10 01:29:01 2015
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.common.Dis
  import org.apache.hadoop.hive.common.type.HiveDecimal;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.DebugUtils;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
@@ -2770,8 +2771,9 @@ public class RecordReaderImpl implements

      @Override
      public final String toString() {
- return "range start: " + offset + " size: " + chunk.remaining() + " type: "
- + (chunk.isDirect() ? "direct" : "array-backed");
+ boolean makesSense = chunk.remaining() == (end - offset);
+ return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
+ + (makesSense ? "" : "(!)") + " type: " + (chunk.isDirect() ? "direct" : "array-backed");
      }

      @Override
@@ -2823,6 +2825,11 @@ public class RecordReaderImpl implements
        isReused = true;
        return result;
      }
+
+ @Override
+ public String toString() {
+ return "start: " + offset + " end: " + end + " cache buffer: " + buffer;
+ }
    }


@@ -3008,6 +3015,10 @@ public class RecordReaderImpl implements
          lastRange.offset = Math.min(lastRange.offset, start);
          lastRange.end = Math.max(lastRange.end, end);
        } else {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Creating new range for RG read; last range (which can include some "
+ + "previous RGs) was " + lastRange);
+ }
          lastRange = new DiskRange(start, end);
          result.add(lastRange);
        }
@@ -3474,7 +3485,7 @@ public class RecordReaderImpl implements

    /** Helper context for each column being read */
    private static final class ColumnReadContext {
- public ColumnReadContext(long offset, int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
+ public ColumnReadContext(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
        this.encoding = encoding;
        this.rowIndex = rowIndex;
        this.colIx = colIx;
@@ -3523,6 +3534,7 @@ public class RecordReaderImpl implements
     * is needed to get footer and indexes; or that can be refactored out)
     */
    @Override
+ // TODO#: HERE
    public void readEncodedColumns(int stripeIx, boolean[] included, boolean[][] colRgs,
        LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException {
      // Note: for now we don't have to setError here, caller will setError if we throw.
@@ -3537,9 +3549,12 @@ public class RecordReaderImpl implements

      // 1. Figure out what we have to read.
      LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
- long offset = 0;
+ long offset = 0; // Stream offset in relation to the stripe.
      // 1.1. Figure out which columns have a present stream
      boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
+ }
      DiskRange lastRange = null;

      // We assume stream list is sorted by column and that non-data
@@ -3554,6 +3569,9 @@ public class RecordReaderImpl implements
        int colIx = stream.getColumn();
        OrcProto.Stream.Kind streamKind = stream.getKind();
        if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length);
+ }
          offset += length;
          continue;
        }
@@ -3564,7 +3582,11 @@ public class RecordReaderImpl implements
          lastColIx = colIx;
          includedRgs = colRgs[colRgIx];
          ctx = colCtxs[colRgIx] = new ColumnReadContext(
- offset, colIx, encodings.get(colIx), indexes[colIx]);
+ colIx, encodings.get(colIx), indexes[colIx]);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Creating context " + colRgIx + " for column " + colIx + " with encoding "
+ + encodings.get(colIx) + " and rowIndex " + indexes[colIx]);
+ }
        } else {
          ctx = colCtxs[colRgIx];
          assert ctx != null;
@@ -3572,8 +3594,15 @@ public class RecordReaderImpl implements
        int indexIx = getIndexPosition(ctx.encoding.getKind(),
            types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
        ctx.addStream(offset, stream, indexIx);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
+ + ", " + length + ", index position " + indexIx);
+ }
        if (includedRgs == null || isDictionary(streamKind, encodings.get(colIx))) {
          lastRange = addEntireStreamToResult(offset, length, lastRange, rangesToRead);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Will read whole stream " + streamKind + "; added to " + lastRange);
+ }
        } else {
          lastRange = addRgFilteredStreamToResult(stream, includedRgs,
              codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx),
@@ -3583,22 +3612,34 @@ public class RecordReaderImpl implements
      }

      // 2. Now, read all of the ranges from cache or disk.
- if (LOG.isDebugEnabled()) {
- LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead));
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Resulting disk ranges to read: " + stringifyDiskRanges(rangesToRead));
      }
- mergeDiskRanges(rangesToRead);
      if (cache != null) {
        cache.getFileData(fileName, rangesToRead, stripeOffset);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Disk ranges after cache (base offset " + stripeOffset
+ + "): " + stringifyDiskRanges(rangesToRead));
+ }
      }
      // Force direct buffers, since we will be decompressing to cache.
- readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, true);
+ readDiskRanges(file, zcr, stripeOffset, rangesToRead, true);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Disk ranges after disk read (" + (zcr == null ? "no " : "") + " zero-copy, base"
+ + " offset " + stripeOffset + "): " + stringifyDiskRanges(rangesToRead));
+ }

- // 2.1. Separate buffers for each stream from the data we have.
+ // 2.1. Separate buffers (relative to stream offset) for each stream from the data we have.
      // TODO: given how we read, we could potentially get rid of this step?
      for (ColumnReadContext colCtx : colCtxs) {
        for (int i = 0; i < colCtx.streamCount; ++i) {
          StreamContext sctx = colCtx.streams[i];
- sctx.bufferIter = getStreamBuffers(rangesToRead, sctx.offset, sctx.length).listIterator();
+ List<DiskRange> sb = getStreamBuffers(rangesToRead, sctx.offset, sctx.length);
+ sctx.bufferIter = sb.listIterator();
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Column " + colCtx.colIx + " stream " + sctx.kind + " at " + sctx.offset + ","
+ + sctx.length + " got ranges (relative to stream) " + stringifyDiskRanges(sb));
+ }
        }
      }

@@ -3622,6 +3663,10 @@ public class RecordReaderImpl implements
            StreamBuffer cb = null;
            if (isDictionary(sctx.kind, ctx.encoding)) {
              // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
+ }
              cb = getStripeLevelStream(absStreamOffset, sctx, cache, isLastRg);
            } else {
              // This stream can be separated by RG using index. Let's do that.
@@ -3631,6 +3676,12 @@ public class RecordReaderImpl implements
                      sctx.length, bufferSize);
              cb = new StreamBuffer();
              cb.incRef();
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Getting data for column "+ ctx.colIx + " " + (isLastRg ? "last " : "")
+ + "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
+ + sctx.length + " index position " + sctx.streamIndexOffset + ": compressed ["
+ + cOffset + ", " + endCOffset + ")");
+ }
              InStream.uncompressStream(fileName, absStreamOffset, zcr, sctx.bufferIter,
                  codec, bufferSize, cache, cOffset, endCOffset, cb);
            }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedFeb 10, '15 at 1:29a
activeFeb 10, '15 at 1:29a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase