FAQ
Author: brock
Date: Mon Nov 17 17:03:08 2014
New Revision: 1640190

URL: http://svn.apache.org/r1640190
Log:
HIVE-8852 - Update new spark progress API for local submitted job monitoring [Spark Branch] (Rui Li via Brock)

Added:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
Removed:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
Modified:
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
     hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Mon Nov 17 17:03:08 2014
@@ -32,14 +32,13 @@ import org.apache.commons.logging.LogFac
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.Context;
  import org.apache.hadoop.hive.ql.DriverContext;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
  import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
  import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
  import org.apache.hadoop.hive.ql.io.HiveKey;
  import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -52,7 +51,6 @@ import org.apache.spark.SparkContext;
  import org.apache.spark.api.java.JavaFutureAction;
  import org.apache.spark.api.java.JavaPairRDD;
  import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.ui.jobs.JobProgressListener;

  import scala.Tuple2;

@@ -113,17 +111,13 @@ public class SparkClient implements Seri

    private List<String> localFiles = new ArrayList<String>();

- private JobStateListener jobStateListener;
-
- private JobProgressListener jobProgressListener;
+ private JobMetricsListener jobMetricsListener;

    private SparkClient(Configuration hiveConf) {
      SparkConf sparkConf = initiateSparkConf(hiveConf);
      sc = new JavaSparkContext(sparkConf);
- jobStateListener = new JobStateListener();
- jobProgressListener = new JobProgressListener(sparkConf);
- sc.sc().listenerBus().addListener(jobStateListener);
- sc.sc().listenerBus().addListener(jobProgressListener);
+ jobMetricsListener = new JobMetricsListener();
+ sc.sc().listenerBus().addListener(jobMetricsListener);
    }

    private SparkConf initiateSparkConf(Configuration hiveConf) {
@@ -217,10 +211,11 @@ public class SparkClient implements Seri
      JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
      // We use Spark RDD async action to submit job as it's the only way to get jobId now.
      JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
- // As we always use foreach action to submit RDD graph, it would only trigger on job.
+ // As we always use foreach action to submit RDD graph, it would only trigger one job.
      int jobId = future.jobIds().get(0);
      SimpleSparkJobStatus sparkJobStatus =
- new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
+ new SimpleSparkJobStatus(sc, jobId, jobMetricsListener,
+ sparkCounters, future);
      return new SparkJobRef(jobId, sparkJobStatus);
    }


Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Mon Nov 17 17:03:08 2014
@@ -28,6 +28,7 @@ import java.util.TreeSet;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.spark.JobExecutionStatus;

  /**
   * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
@@ -59,26 +60,18 @@ public class SparkJobMonitor {
      boolean done = false;
      int failedCounter = 0;
      int rc = 0;
- SparkJobState lastState = null;
+ JobExecutionStatus lastState = null;
      Map<String, SparkStageProgress> lastProgressMap = null;
      long startTime = 0;

      while (true) {
-
        try {
- Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
- SparkJobState state = sparkJobStatus.getState();
-
- if (state != lastState || state == SparkJobState.RUNNING) {
+ JobExecutionStatus state = sparkJobStatus.getState();
+ if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) {
            lastState = state;
+ Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();

            switch (state) {
- case SUBMITTED:
- console.printInfo("Status: Submitted");
- break;
- case INITING:
- console.printInfo("Status: Initializing");
- break;
            case RUNNING:
              if (!running) {
                // print job stages.
@@ -110,14 +103,7 @@ public class SparkJobMonitor {
              running = false;
              done = true;
              break;
- case KILLED:
- console.printInfo("Status: Killed");
- running = false;
- done = true;
- rc = 1;
- break;
            case FAILED:
- case ERROR:
              console.printError("Status: Failed");
              running = false;
              done = true;
@@ -187,17 +173,17 @@ public class SparkJobMonitor {
                String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
            }
          } else {
- double cost = progress.getCumulativeTime() / 1000.0;
            /* stage is waiting for input/slots or complete */
            if (failed > 0) {
              /* tasks finished but some failed */
              reportBuffer.append(
- String.format(
- "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, failed, total, cost));
+ String.format(
+ "%s: %d(-%d)/%d Finished with failed tasks\t",
+ stageName, complete, failed, total));
            } else {
              if (complete == total) {
                reportBuffer.append(
- String.format("%s: %d/%d Finished in %,.2fs\t", stageName, complete, total, cost));
+ String.format("%s: %d/%d Finished\t", stageName, complete, total));
              } else {
                reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
              }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Mon Nov 17 17:03:08 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.s

  import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
  import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.spark.JobExecutionStatus;

  import java.util.Map;

@@ -29,7 +30,7 @@ public interface SparkJobStatus {

    public int getJobId();

- public SparkJobState getState();
+ public JobExecutionStatus getState();

    public int[] getStageIds();


Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java Mon Nov 17 17:03:08 2014
@@ -23,23 +23,21 @@ public class SparkStageProgress {
    private int succeededTaskCount;
    private int runningTaskCount;
    private int failedTaskCount;
- private int killedTaskCount;
- private long cumulativeTime;
+ // TODO: remove the following two metrics as they're not available in current spark API,
+ // we can add them back once spark provides it
+// private int killedTaskCount;
+// private long cumulativeTime;

    public SparkStageProgress(
      int totalTaskCount,
      int succeededTaskCount,
      int runningTaskCount,
- int failedTaskCount,
- int killedTaskCount,
- long cumulativeTime) {
+ int failedTaskCount) {

      this.totalTaskCount = totalTaskCount;
      this.succeededTaskCount = succeededTaskCount;
      this.runningTaskCount = runningTaskCount;
      this.failedTaskCount = failedTaskCount;
- this.killedTaskCount = killedTaskCount;
- this.cumulativeTime = cumulativeTime;
    }

    public int getTotalTaskCount() {
@@ -58,14 +56,6 @@ public class SparkStageProgress {
      return failedTaskCount;
    }

- public int getKilledTaskCount() {
- return killedTaskCount;
- }
-
- public long getCumulativeTime() {
- return cumulativeTime;
- }
-
    @Override
    public boolean equals(Object obj) {
      if (obj instanceof SparkStageProgress) {
@@ -73,8 +63,7 @@ public class SparkStageProgress {
        return getTotalTaskCount() == other.getTotalTaskCount()
          && getSucceededTaskCount() == other.getSucceededTaskCount()
          && getRunningTaskCount() == other.getRunningTaskCount()
- && getFailedTaskCount() == other.getFailedTaskCount()
- && getKilledTaskCount() == other.getKilledTaskCount();
+ && getFailedTaskCount() == other.getFailedTaskCount();
      }
      return false;
    }
@@ -90,10 +79,6 @@ public class SparkStageProgress {
      sb.append(getRunningTaskCount());
      sb.append(" Failed: ");
      sb.append(getFailedTaskCount());
- sb.append(" Killed: ");
- sb.append(getKilledTaskCount());
- sb.append(" CumulativeTime: ");
- sb.append(getCumulativeTime() + "ms");
      return sb.toString();
    }
  }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java?rev=1640190&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java Mon Nov 17 17:03:08 2014
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.JobSucceeded;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+public class JobMetricsListener implements SparkListener {
+
+ private final static Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+ private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+ private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+ private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+ }
+
+ @Override
+ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ int stageId = taskEnd.stageId();
+ int stageAttemptId = taskEnd.stageAttemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Can not find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(taskEnd.taskMetrics());
+ }
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ int jobId = jobStart.jobId();
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for(int i=0; i< size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
+ }
+ jobIdToStageId.put(jobId, intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+ }
+
+ public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+ return allJobMetrics.get(jobId);
+ }
+
+ public synchronized void cleanup(int jobId) {
+ allJobMetrics.remove(jobId);
+ jobIdToStageId.remove(jobId);
+ Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Integer> entry = iterator.next();
+ if (entry.getValue() == jobId) {
+ iterator.remove();
+ }
+ }
+ }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Mon Nov 17 17:03:08 2014
@@ -18,7 +18,6 @@
  package org.apache.hadoop.hive.ql.exec.spark.status.impl;

  import java.util.HashMap;
-import java.util.LinkedList;
  import java.util.List;
  import java.util.Map;

@@ -26,42 +25,35 @@ import com.google.common.collect.Maps;
  import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
  import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
  import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
  import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
  import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
  import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.api.java.JavaSparkContext;
  import org.apache.spark.executor.ShuffleReadMetrics;
  import org.apache.spark.executor.ShuffleWriteMetrics;
  import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.StageInfo;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.spark.ui.jobs.UIData;

  import scala.Option;
-import scala.Tuple2;
-
-import static scala.collection.JavaConversions.bufferAsJavaList;
-import static scala.collection.JavaConversions.mutableMapAsJavaMap;

  public class SimpleSparkJobStatus implements SparkJobStatus {

+ private final JavaSparkContext sparkContext;
    private int jobId;
- private JobStateListener jobStateListener;
- private JobProgressListener jobProgressListener;
+ // After SPARK-2321, we only use JobMetricsListener to get job metrics
+ // TODO: remove it when the new API provides equivalent functionality
+ private JobMetricsListener jobMetricsListener;
    private SparkCounters sparkCounters;
    private JavaFutureAction<Void> future;

- public SimpleSparkJobStatus(
- int jobId,
- JobStateListener stateListener,
- JobProgressListener progressListener,
- SparkCounters sparkCounters,
- JavaFutureAction<Void> future) {
-
+ public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId,
+ JobMetricsListener jobMetricsListener, SparkCounters sparkCounters,
+ JavaFutureAction<Void> future) {
+ this.sparkContext = sparkContext;
      this.jobId = jobId;
- this.jobStateListener = stateListener;
- this.jobProgressListener = progressListener;
+ this.jobMetricsListener = jobMetricsListener;
      this.sparkCounters = sparkCounters;
      this.future = future;
    }
@@ -72,62 +64,39 @@ public class SimpleSparkJobStatus implem
    }

    @Override
- public SparkJobState getState() {
+ public JobExecutionStatus getState() {
      // For spark job with empty source data, it's not submitted actually, so we would never
      // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
      // job state.
      if (future.isDone()) {
- return SparkJobState.SUCCEEDED;
+ return JobExecutionStatus.SUCCEEDED;
      } else {
- return jobStateListener.getJobState(jobId);
+ // SparkJobInfo may not be available yet
+ SparkJobInfo sparkJobInfo = getJobInfo();
+ return sparkJobInfo == null ? null : sparkJobInfo.status();
      }
    }

    @Override
    public int[] getStageIds() {
- return jobStateListener.getStageIds(jobId);
+ SparkJobInfo sparkJobInfo = getJobInfo();
+ return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds();
    }

    @Override
    public Map<String, SparkStageProgress> getSparkStageProgress() {
      Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
- int[] stageIds = jobStateListener.getStageIds(jobId);
- if (stageIds != null) {
- for (int stageId : stageIds) {
- List<StageInfo> stageInfos = getStageInfo(stageId);
- for (StageInfo stageInfo : stageInfos) {
- Tuple2<Object, Object> tuple2 = new Tuple2<Object, Object>(stageInfo.stageId(),
- stageInfo.attemptId());
- UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get();
- if (uiData != null) {
- int runningTaskCount = uiData.numActiveTasks();
- int completedTaskCount = uiData.numCompleteTasks();
- int failedTaskCount = uiData.numFailedTasks();
- int totalTaskCount = stageInfo.numTasks();
- int killedTaskCount = 0;
- long costTime;
- Option<Object> startOption = stageInfo.submissionTime();
- Option<Object> completeOption = stageInfo.completionTime();
- if (startOption.isEmpty()) {
- costTime = 0;
- } else if (completeOption.isEmpty()) {
- long startTime = (Long)startOption.get();
- costTime = System.currentTimeMillis() - startTime;
- } else {
- long startTime = (Long)startOption.get();
- long completeTime = (Long)completeOption.get();
- costTime = completeTime - startTime;
- }
- SparkStageProgress stageProgress = new SparkStageProgress(
- totalTaskCount,
- completedTaskCount,
- runningTaskCount,
- failedTaskCount,
- killedTaskCount,
- costTime);
- stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress);
- }
- }
+ for (int stageId : getStageIds()) {
+ SparkStageInfo sparkStageInfo = getStageInfo(stageId);
+ if (sparkStageInfo != null) {
+ int runningTaskCount = sparkStageInfo.numActiveTasks();
+ int completedTaskCount = sparkStageInfo.numCompletedTasks();
+ int failedTaskCount = sparkStageInfo.numFailedTasks();
+ int totalTaskCount = sparkStageInfo.numTasks();
+ SparkStageProgress sparkStageProgress = new SparkStageProgress(
+ totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+ stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+ sparkStageInfo.currentAttemptId(), sparkStageProgress);
        }
      }
      return stageProgresses;
@@ -145,7 +114,7 @@ public class SimpleSparkJobStatus implem
      sparkStatisticsBuilder.add(sparkCounters);
      // add spark job metrics.
      String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
- Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+ Map<String, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
      if (jobMetric == null) {
        return null;
      }
@@ -160,7 +129,7 @@ public class SimpleSparkJobStatus implem

    @Override
    public void cleanup() {
- jobStateListener.cleanup(jobId);
+ jobMetricsListener.cleanup(jobId);
    }

    private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
@@ -242,29 +211,11 @@ public class SimpleSparkJobStatus implem
      return results;
    }

- private List<StageInfo> getStageInfo(int stageId) {
- List<StageInfo> stageInfos = new LinkedList<StageInfo>();
-
- Map<Object, StageInfo> activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages());
- List<StageInfo> completedStages = bufferAsJavaList(jobProgressListener.completedStages());
- List<StageInfo> failedStages = bufferAsJavaList(jobProgressListener.failedStages());
-
- if (activeStages.containsKey(stageId)) {
- stageInfos.add(activeStages.get(stageId));
- } else {
- for (StageInfo stageInfo : completedStages) {
- if (stageInfo.stageId() == stageId) {
- stageInfos.add(stageInfo);
- }
- }
-
- for (StageInfo stageInfo : failedStages) {
- if (stageInfo.stageId() == stageId) {
- stageInfos.add(stageInfo);
- }
- }
- }
+ private SparkJobInfo getJobInfo() {
+ return sparkContext.statusTracker().getJobInfo(jobId);
+ }

- return stageInfos;
+ private SparkStageInfo getStageInfo(int stageId) {
+ return sparkContext.statusTracker().getStageInfo(stageId);
    }
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedNov 17, '14 at 5:03p
activeNov 17, '14 at 5:03p
posts1
users1
websitehive.apache.org

1 user in discussion

Brock: 1 post

People

Translate

site design / logo © 2022 Grokbase