FAQ
Author: prasanthj
Date: Sat Mar 7 01:16:49 2015
New Revision: 1664774

URL: http://svn.apache.org/r1664774
Log:
HIVE-9888: LLAP: Provide per query counters (Prasanth Jayachandran)

Added:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
Modified:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java?rev=1664774&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java Sat Mar 7 01:16:49 2015
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Per query counters.
+ */
+public class QueryFragmentCounters {
+
+ public static enum Counter {
+ NUM_VECTOR_BATCHES,
+ NUM_DECODED_BATCHES,
+ SELECTED_ROWGROUPS,
+ NUM_ERRORS,
+ ROWS_EMITTED
+ }
+
+ private String appId;
+ private Map<String, Long> counterMap;
+
+ public QueryFragmentCounters() {
+ this("Not Specified");
+ }
+
+ public QueryFragmentCounters(String applicationId) {
+ this.appId = applicationId;
+ this.counterMap = new ConcurrentHashMap<>();
+ }
+
+ public void incrCounter(Counter counter) {
+ incrCounter(counter, 1);
+ }
+
+ public void incrCounter(Counter counter, long delta) {
+ if (counterMap.containsKey(counter.name())) {
+ long val = counterMap.get(counter.name());
+ counterMap.put(counter.name(), val + delta);
+ } else {
+ setCounter(counter, delta);
+ }
+ }
+
+ public void setCounter(Counter counter, long value) {
+ counterMap.put(counter.name(), value);
+ }
+
+ @Override
+ public String toString() {
+ return "ApplicationId: " + appId + " Counters: " + counterMap;
+ }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java Sat Mar 7 01:16:49 2015
@@ -167,7 +167,6 @@ public class ContainerRunnerImpl extends
        LOG.info("DEBUG: Registering request with the ShuffleHandler");
        ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());

-
        ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()),
            new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
            workingDir, credentials, memoryPerExecutor);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java Sat Mar 7 01:16:49 2015
@@ -26,6 +26,7 @@ import java.util.List;
  import org.apache.hadoop.hive.llap.Consumer;
  import org.apache.hadoop.hive.llap.ConsumerFeedback;
  import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
  import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
  import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
  import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -110,12 +111,14 @@ public class LlapInputFormat
      /** Vector that is currently being processed by our user. */
      private boolean isDone = false, isClosed = false;
      private ConsumerFeedback<ColumnVectorBatch> feedback;
+ private final QueryFragmentCounters counters;

      public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols) {
        this.split = split;
        this.columnIds = includedCols;
        this.sarg = SearchArgumentFactory.createFromConf(job);
        this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
+ this.counters = new QueryFragmentCounters();
        try {
          rbCtx = new VectorizedRowBatchCtx();
          rbCtx.init(job, split);
@@ -175,7 +178,7 @@ public class LlapInputFormat

      private void startRead() {
        // Create the consumer of encoded data; it will coordinate decoding to CVBs.
- ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames);
+ ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters);
        feedback = rp;
        ListenableFuture<Void> future = executor.submit(rp.getReadCallable());
        // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases.
@@ -235,6 +238,7 @@ public class LlapInputFormat
          LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone
              + ", err " + pendingError + ", pending " + pendingData.size());
        }
+ LlapIoImpl.LOG.info("QueryFragmentCounters: " + counters);
        feedback.stop();
        rethrowErrorIfAny();
      }
@@ -253,6 +257,7 @@ public class LlapInputFormat
          LlapIoImpl.LOG.info("setDone called; closed " + isClosed
            + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size());
        }
+ LlapIoImpl.LOG.info("DONE: QueryFragmentCounters: " + counters);
        synchronized (pendingData) {
          isDone = true;
          pendingData.notifyAll();
@@ -276,9 +281,11 @@ public class LlapInputFormat

      @Override
      public void setError(Throwable t) {
+ counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS);
        LlapIoImpl.LOG.info("setError called; closed " + isClosed
          + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size());
        assert t != null;
+ LlapIoImpl.LOG.info("ERROR: QueryFragmentCounters: " + counters);
        synchronized (pendingData) {
          pendingError = t;
          pendingData.notifyAll();

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Sat Mar 7 01:16:49 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.llap.io.d
  import java.util.List;

  import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
  import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.mapred.InputSplit;
@@ -30,5 +31,6 @@ import org.apache.hadoop.mapred.InputSpl
   */
  public interface ColumnVectorProducer {
    ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, InputSplit split,
- List<Integer> columnIds, SearchArgument sarg, String[] columnNames);
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ QueryFragmentCounters counters);
  }
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java Sat Mar 7 01:16:49 2015
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveC
  import org.apache.hadoop.hive.llap.Consumer;
  import org.apache.hadoop.hive.llap.cache.Cache;
  import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
  import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
  import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
  import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -43,7 +44,7 @@ public class OrcColumnVectorProducer imp
    private final Configuration conf;
    private boolean _skipCorrupt; // TODO: get rid of this
    private LlapDaemonCacheMetrics metrics;
-
+
    public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
        LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf,
        LlapDaemonCacheMetrics metrics) {
@@ -62,12 +63,13 @@ public class OrcColumnVectorProducer imp
    @Override
    public ReadPipeline createReadPipeline(
        Consumer<ColumnVectorBatch> consumer, InputSplit split,
- List<Integer> columnIds, SearchArgument sarg, String[] columnNames) {
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ QueryFragmentCounters counters) {
      metrics.incrCacheReadRequests();
- OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
- consumer, columnIds.size(), _skipCorrupt);
- OrcEncodedDataReader reader = new OrcEncodedDataReader(
- lowLevelCache, cache, metadataCache, conf, split, columnIds, sarg, columnNames, edc);
+ OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
+ _skipCorrupt, counters);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache,
+ conf, split, columnIds, sarg, columnNames, edc, counters);
      edc.init(reader, reader);
      return edc;
    }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java Sat Mar 7 01:16:49 2015
@@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.io.d
  import java.io.IOException;

  import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
  import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
  import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
  import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
  import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader;
@@ -52,11 +52,14 @@ public class OrcEncodedDataConsumer exte
    private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
    private OrcStripeMetadata[] stripes;
    private final boolean skipCorrupt; // TODO: get rid of this
+ private final QueryFragmentCounters counters;

    public OrcEncodedDataConsumer(
- Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt) {
+ Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt,
+ QueryFragmentCounters counters) {
      super(consumer, colCount);
      this.skipCorrupt = skipCorrupt;
+ this.counters = counters;
    }

    public void setFileMetadata(OrcFileMetadata f) {
@@ -109,7 +112,10 @@ public class OrcEncodedDataConsumer exte

          // we are done reading a batch, send it to consumer for processing
          downstreamConsumer.consumeData(cvb);
+ counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize);
        }
+ counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG);
+ counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES);
      } catch (IOException e) {
        // Caller will return the batch.
        downstreamConsumer.setError(e);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java Sat Mar 7 01:16:49 2015
@@ -8,13 +8,13 @@ import java.util.List;

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.CallableWithNdc;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.Consumer;
  import org.apache.hadoop.hive.llap.ConsumerFeedback;
  import org.apache.hadoop.hive.llap.DebugUtils;
  import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
  import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
  import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
@@ -30,12 +30,12 @@ import org.apache.hadoop.hive.ql.io.orc.
  import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
  import org.apache.hadoop.hive.ql.io.orc.OrcFile;
  import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
  import org.apache.hadoop.hive.ql.io.orc.Reader;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
  import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
  import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.mapred.FileSplit;
  import org.apache.hadoop.mapred.InputSplit;
@@ -52,7 +52,7 @@ public class OrcEncodedDataReader extend
    private final SearchArgument sarg;
    private final String[] columnNames;
    private final OrcEncodedDataConsumer consumer;
-
+ private final QueryFragmentCounters counters;

    // Read state.
    private int stripeIxFrom;
@@ -70,7 +70,7 @@ public class OrcEncodedDataReader extend
    public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
        OrcMetadataCache metadataCache, Configuration conf, InputSplit split,
        List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
- OrcEncodedDataConsumer consumer) {
+ OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
      this.lowLevelCache = lowLevelCache;
      this.metadataCache = metadataCache;
      this.cache = cache;
@@ -83,6 +83,7 @@ public class OrcEncodedDataReader extend
      this.sarg = sarg;
      this.columnNames = columnNames;
      this.consumer = consumer;
+ this.counters = counters;
    }

    @Override
@@ -459,6 +460,17 @@ public class OrcEncodedDataReader extend
          readState[stripeIxMod][j] = (rgsToRead == null) ? null :
            Arrays.copyOf(rgsToRead, rgsToRead.length);
        }
+
+ int count = 0;
+ if (rgsToRead != null) {
+ for (boolean b : rgsToRead) {
+ if (b)
+ count++;
+ }
+ } else {
+ count = rgCount;
+ }
+ counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count);
      }
    }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 7, '15 at 1:16a
activeMar 7, '15 at 1:16a
posts1
users1
websitehive.apache.org

1 user in discussion

Prasanthj: 1 post

People

Translate

site design / logo © 2021 Grokbase