FAQ
Author: sseth
Date: Mon Mar 9 17:03:59 2015
New Revision: 1665305

URL: http://svn.apache.org/r1665305
Log:
HIVE-9807. LLAP: Add event logging for execution elements. (Siddharth Seth)

Added:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
Modified:
     hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
     hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java?rev=1665305&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java Mon Mar 9 17:03:59 2015
@@ -0,0 +1,148 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.log4j.Logger;
+
+public class HistoryLogger {
+
+
+ private static final String HISTORY_EVENT_TYPE = "Event";
+ private static final String HISTORY_APPLICATION_ID = "ApplicationId";
+ private static final String HISTORY_CONTAINER_ID = "ContainerId";
+ private static final String HISTORY_SUBMIT_TIME = "SubmitTime";
+ private static final String HISTORY_START_TIME = "StartTime";
+ private static final String HISTORY_END_TIME = "EndTime";
+ private static final String HISTORY_DAG_NAME = "DagName";
+ private static final String HISTORY_VERTEX_NAME = "VertexName";
+ private static final String HISTORY_TASK_ID = "TaskId";
+ private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId";
+ private static final String HISTORY_HOSTNAME = "HostName";
+ private static final String HISTORY_SUCCEEDED = "Succeeded";
+
+ private static final String EVENT_TYPE_FRAGMENT_START = "FRAGMENT_START";
+ private static final String EVENT_TYPE_FRAGMENT_END = "FRAGMENT_END";
+
+ private static final Logger HISTORY_LOGGER = Logger.getLogger(HistoryLogger.class);
+
+ public static void logFragmentStart(String applicationIdStr, String containerIdStr,
+ String hostname,
+ String dagName, String vertexName, int taskId,
+ int attemptId) {
+ HISTORY_LOGGER.info(
+ constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName,
+ vertexName, taskId, attemptId));
+ }
+
+ public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname,
+ String dagName, String vertexName, int taskId, int attemptId,
+ long startTime, boolean failed) {
+ HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname,
+ dagName, vertexName, taskId, attemptId, startTime, failed));
+ }
+
+
+ private static String constructFragmentStartString(String applicationIdStr, String containerIdStr,
+ String hostname, String dagName,
+ String vertexName, int taskId, int attemptId) {
+ HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START);
+ lb.addHostName(hostname);
+ lb.addAppid(applicationIdStr);
+ lb.addContainerId(containerIdStr);
+ lb.addDagName(dagName);
+ lb.addVertexName(vertexName);
+ lb.addTaskId(taskId);
+ lb.addTaskAttemptId(attemptId);
+ lb.addTime(HISTORY_SUBMIT_TIME);
+ return lb.toString();
+ }
+
+ private static String constructFragmentEndString(String applicationIdStr, String containerIdStr,
+ String hostname, String dagName,
+ String vertexName, int taskId, int attemptId,
+ long startTime, boolean succeeded) {
+ HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END);
+ lb.addHostName(hostname);
+ lb.addAppid(applicationIdStr);
+ lb.addContainerId(containerIdStr);
+ lb.addDagName(dagName);
+ lb.addVertexName(vertexName);
+ lb.addTaskId(taskId);
+ lb.addTaskAttemptId(attemptId);
+ lb.addSuccessStatus(succeeded);
+ lb.addTime(HISTORY_START_TIME, startTime);
+ lb.addTime(HISTORY_END_TIME);
+ return lb.toString();
+ }
+
+ private static class HistoryLineBuilder {
+ private final StringBuilder sb = new StringBuilder();
+
+ HistoryLineBuilder(String eventType) {
+ sb.append(HISTORY_EVENT_TYPE).append("=").append(eventType);
+ }
+
+ HistoryLineBuilder addHostName(String hostname) {
+ return setKeyValue(HISTORY_HOSTNAME, hostname);
+ }
+
+ HistoryLineBuilder addAppid(String appId) {
+ return setKeyValue(HISTORY_APPLICATION_ID, appId);
+ }
+
+ HistoryLineBuilder addContainerId(String containerId) {
+ return setKeyValue(HISTORY_CONTAINER_ID, containerId);
+ }
+
+ HistoryLineBuilder addDagName(String dagName) {
+ return setKeyValue(HISTORY_DAG_NAME, dagName);
+ }
+
+ HistoryLineBuilder addVertexName(String vertexName) {
+ return setKeyValue(HISTORY_VERTEX_NAME, vertexName);
+ }
+
+ HistoryLineBuilder addTaskId(int taskId) {
+ return setKeyValue(HISTORY_TASK_ID, String.valueOf(taskId));
+ }
+
+ HistoryLineBuilder addTaskAttemptId(int attemptId) {
+ return setKeyValue(HISTORY_ATTEMPT_ID, String.valueOf(attemptId));
+ }
+
+ HistoryLineBuilder addTime(String timeParam, long millis) {
+ return setKeyValue(timeParam, String.valueOf(millis));
+ }
+
+ HistoryLineBuilder addTime(String timeParam) {
+ return setKeyValue(timeParam, String.valueOf(System.currentTimeMillis()));
+ }
+
+ HistoryLineBuilder addSuccessStatus(boolean status) {
+ return setKeyValue(HISTORY_SUCCEEDED, String.valueOf(status));
+ }
+
+
+ private HistoryLineBuilder setKeyValue(String key, String value) {
+ sb.append(", ").append(key).append("=").append(value);
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+ }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1665305&r1=1665304&r2=1665305&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java Mon Mar 9 17:03:59 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.CallableWithNdc;
  import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
  import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
  import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
@@ -129,7 +130,9 @@ public class ContainerRunnerImpl extends

    @Override
    public void queueContainer(RunContainerRequestProto request) throws IOException {
- LOG.info("Queing container for execution: " + request);
+ HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(),
+ localAddress.get().getHostName(), null, null, -1, -1);
+ LOG.info("Queuing container for execution: " + request);
      // This is the start of container-annotated logging.
      NDC.push(request.getContainerIdString());
      try {
@@ -169,7 +172,7 @@ public class ContainerRunnerImpl extends

        ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()),
            new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
- workingDir, credentials, memoryPerExecutor);
+ workingDir, credentials, memoryPerExecutor, localAddress.get().getHostName());
        ListenableFuture<ContainerExecutionResult> future = executorService
            .submit(callable);
        Futures.addCallback(future, new ContainerRunnerCallback(request, callable));
@@ -194,12 +197,14 @@ public class ContainerRunnerImpl extends
      private final Credentials credentials;
      private final long memoryAvailable;
      private volatile TezChild tezChild;
+ private final String localHostname;
+ private volatile long startTime;


      ContainerRunnerCallable(RunContainerRequestProto request, Configuration conf,
                              ExecutionContext executionContext, Map<String, String> envMap,
                              String[] localDirs, String workingDir, Credentials credentials,
- long memoryAvailable) {
+ long memoryAvailable, String localHostName) {
        this.request = request;
        this.conf = conf;
        this.executionContext = executionContext;
@@ -209,11 +214,13 @@ public class ContainerRunnerImpl extends
        this.objectRegistry = new ObjectRegistryImpl();
        this.credentials = credentials;
        this.memoryAvailable = memoryAvailable;
+ this.localHostname = localHostName;

      }

      @Override
      protected ContainerExecutionResult callInternal() throws Exception {
+ this.startTime = System.currentTimeMillis();
        Stopwatch sw = new Stopwatch().start();
        tezChild =
            new TezChild(conf, request.getAmHost(), request.getAmPort(),
@@ -270,6 +277,11 @@ public class ContainerRunnerImpl extends
            metrics.incrExecutorTotalAskedToDie();
            break;
        }
+ HistoryLogger
+ .logFragmentEnd(request.getApplicationIdString(),
+ request.getContainerIdString(),
+ localAddress.get().getHostName(), null, null, -1, -1,
+ containerRunnerCallable.startTime, true);
        metrics.decrExecutorNumQueuedRequests();
      }

@@ -282,6 +294,11 @@ public class ContainerRunnerImpl extends
        if (tezChild != null) {
          tezChild.shutdown();
        }
+ HistoryLogger
+ .logFragmentEnd(request.getApplicationIdString(),
+ request.getContainerIdString(),
+ localAddress.get().getHostName(), null, null, -1, -1,
+ containerRunnerCallable.startTime, false);
        metrics.decrExecutorNumQueuedRequests();
      }
    }

Modified: hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties?rev=1665305&r1=1665304&r2=1665305&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties (original)
+++ hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties Mon Mar 9 17:03:59 2015
@@ -19,27 +19,43 @@ llap.daemon.root.logger=INFO,console
  llap.daemon.log.dir=.
  llap.daemon.log.file=llapdaemon.log

+llap.daemon.historylog.file=llapdaemon_history.log
+log4j.logger.org.apache.hadoop.hive.llap.daemon.HistoryLogger=INFO,HISTORYAPPENDER
+
  # Define the root logger to the system property "llap.daemon.root.logger".
  log4j.rootLogger=${llap.daemon.root.logger}

  # Logging Threshold
  log4j.threshold=ALL

+
  # Null Appender
  log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

-#
+
+
+# History Events appender
+log4j.appender.HISTORYAPPENDER=org.apache.log4j.RollingFileAppender
+log4j.appender.HISTORYAPPENDER.File=${llap.daemon.log.dir}/${llap.daemon.historylog.file}
+log4j.appender.HISTORYAPPENDER.MaxFileSize=${llap.daemon.log.maxfilesize}
+log4j.appender.HISTORYAPPENDER.MaxBackupIndex=${llap.daemon.log.maxbackupindex}
+log4j.appender.HISTORYAPPENDER.layout=org.apache.log4j.EnhancedPatternLayout
+log4j.appender.HISTORYAPPENDER.layout.ConversionPattern=%m%n
+
+
+
  # Rolling File Appender - cap space usage at 5gb.
  #
  llap.daemon.log.maxfilesize=256MB
  llap.daemon.log.maxbackupindex=20
  log4j.appender.RFA=org.apache.log4j.RollingFileAppender
  log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file}
+log4j.appender.RFA.Append=true

  log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize}
  log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex}

-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout=org.apache.log4j.EnhancedPatternLayout

  # Pattern format: Date LogLevel LoggerName LogMessage
  log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n
@@ -59,7 +75,7 @@ log4j.appender.DRFA.DatePattern=.yyyy-MM

  # 30-day backup
  #log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.DRFA.layout=org.apache.log4j.EnhancedPatternLayout

  # Pattern format: Date LogLevel LoggerName LogMessage
  log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n
@@ -74,5 +90,5 @@ log4j.appender.DRFA.layout.ConversionPat

  log4j.appender.console=org.apache.log4j.ConsoleAppender
  log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout
  log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] %p %c{2} : %m%n

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 9, '15 at 5:04p
activeMar 9, '15 at 5:04p
posts1
users1
websitehive.apache.org

1 user in discussion

Sseth: 1 post

People

Translate

site design / logo © 2021 Grokbase