FAQ
Repository: hive
Updated Branches:
   refs/heads/master 187829fa9 -> 8d22a60c8


HIVE-12556 : Ctrl-C in beeline doesn't kill Tez query on HS2 (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d22a60c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d22a60c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d22a60c

Branch: refs/heads/master
Commit: 8d22a60c8c4a247aa396b8c3841b6ebdce51f508
Parents: 187829f
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Wed Dec 2 16:16:35 2015 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Wed Dec 2 16:16:59 2015 -0800

----------------------------------------------------------------------
  .../java/org/apache/hadoop/hive/ql/Driver.java | 5 +
  .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 21 +++-
  .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 109 ++++++++++++++++++-
  3 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 8fafd61..62b608c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1901,6 +1901,11 @@ public class Driver implements CommandProcessor {

    public int close() {
      try {
+ try {
+ releaseResources();
+ } catch (Exception e) {
+ LOG.info("Exception while releasing resources", e);
+ }
        if (fetchTask != null) {
          try {
            fetchTask.clearFetch();

http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 59e9d29..f6bc19c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -22,6 +22,7 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
  import static org.fusesource.jansi.Ansi.ansi;

  import java.io.IOException;
+import java.io.InterruptedIOException;
  import java.io.PrintStream;
  import java.text.DecimalFormat;
  import java.text.NumberFormat;
@@ -135,9 +136,7 @@ public class TezJobMonitor {
      Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
- for (DAGClient c: shutdownList) {
- TezJobMonitor.killRunningJobs();
- }
+ TezJobMonitor.killRunningJobs();
          try {
            for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) {
              System.err.println("Shutting down tez session.");
@@ -346,8 +345,8 @@ public class TezJobMonitor {
          }
        } catch (Exception e) {
          console.printInfo("Exception: " + e.getMessage());
- if (++failedCounter % maxRetryInterval / checkInterval == 0
- || e instanceof InterruptedException) {
+ boolean isInterrupted = hasInterruptedException(e);
+ if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) {
            try {
              console.printInfo("Killing DAG...");
              dagClient.tryKillDAG();
@@ -376,10 +375,22 @@ public class TezJobMonitor {
          }
        }
      }
+
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
      return rc;
    }

+ private static boolean hasInterruptedException(Throwable e) {
+ // Hadoop IPC wraps InterruptedException. GRRR.
+ while (e != null) {
+ if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+ return true;
+ }
+ e = e.getCause();
+ }
+ return false;
+ }
+
    /**
     * killRunningJobs tries to terminate execution of all
     * currently running tez queries. No guarantees, best effort only.

http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index a6d911d..a2060da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,6 +18,7 @@

  package org.apache.hadoop.hive.ql.exec.tez;

+import java.io.IOException;
  import java.util.Arrays;
  import java.util.Collection;
  import java.util.Collections;
@@ -29,6 +30,9 @@ import java.util.List;
  import java.util.Map;
  import java.util.Set;

+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
@@ -52,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
  import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
  import org.apache.hadoop.yarn.api.records.LocalResource;
  import org.apache.hadoop.yarn.api.records.LocalResourceType;
  import org.apache.tez.client.CallerContext;
@@ -64,10 +69,13 @@ import org.apache.tez.dag.api.DAG;
  import org.apache.tez.dag.api.Edge;
  import org.apache.tez.dag.api.GroupInputEdge;
  import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
  import org.apache.tez.dag.api.Vertex;
  import org.apache.tez.dag.api.VertexGroup;
  import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
  import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
  import org.json.JSONObject;

  /**
@@ -86,6 +94,8 @@ public class TezTask extends Task<TezWork> {

    private final DagUtils utils;

+ private DAGClient dagClient = null;
+
    Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
    Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();

@@ -107,7 +117,6 @@ public class TezTask extends Task<TezWork> {
      int rc = 1;
      boolean cleanContext = false;
      Context ctx = null;
- DAGClient client = null;
      TezSessionState session = null;

      try {
@@ -177,12 +186,12 @@ public class TezTask extends Task<TezWork> {
        addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);

        // submit will send the job to the cluster and start executing
- client = submit(jobConf, dag, scratchDir, appJarLr, session,
+ dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
            additionalLr, inputOutputJars, inputOutputLocalResources);

        // finally monitor will print progress until the job is done
        TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag);
+ rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag);
        if (rc != 0) {
          this.setException(new HiveException(monitor.getDiagnostics()));
        }
@@ -190,7 +199,7 @@ public class TezTask extends Task<TezWork> {
        // fetch the counters
        try {
          Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
+ counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
        } catch (Exception err) {
          // Don't fail execution due to counters - just don't print summary info
          LOG.error("Failed to get counters: " + err, err);
@@ -231,7 +240,7 @@ public class TezTask extends Task<TezWork> {
          }
        }
        // need to either move tmp files or remove them
- if (client != null) {
+ if (dagClient != null) {
          // rc will only be overwritten if close errors out
          rc = close(work, rc);
        }
@@ -462,7 +471,7 @@ public class TezTask extends Task<TezWork> {
      }

      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
- return dagClient;
+ return new SyncDagClient(dagClient);
    }

    /*
@@ -544,4 +553,92 @@ public class TezTask extends Task<TezWork> {

      return ((ReduceWork)children.get(0)).getReducer();
    }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (dagClient != null) {
+ LOG.info("Shutting down Tez task " + this);
+ try {
+ dagClient.tryKillDAG();
+ LOG.info("Waiting for Tez task to shut down: " + this);
+ dagClient.waitForCompletion();
+ } catch (Exception ex) {
+ LOG.info("Failed to shut down TezTask" + this, ex);
+ }
+ }
+ }
+
+ /** DAG client that does dumb global sync on all the method calls;
+ * Tez DAG client is not thread safe and getting the 2nd one is not recommended. */
+ public class SyncDagClient extends DAGClient {
+ private final DAGClient dagClient;
+
+ public SyncDagClient(DAGClient dagClient) {
+ super();
+ this.dagClient = dagClient;
+ }
+
+ @Override
+ public void close() throws IOException {
+ dagClient.close(); // Don't sync.
+ }
+
+ @Override
+ public String getExecutionContext() {
+ return dagClient.getExecutionContext(); // Don't sync.
+ }
+
+ @Override
+ @Private
+ protected ApplicationReport getApplicationReportInternal() {
+ throw new UnsupportedOperationException(); // The method is not exposed, and we don't use it.
+ }
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getDAGStatus(statusOptions);
+ }
+ }
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getDAGStatus(statusOptions, timeout);
+ }
+ }
+
+ @Override
+ public VertexStatus getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions) throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getVertexStatus(vertexName, statusOptions);
+ }
+ }
+
+ @Override
+ public void tryKillDAG() throws IOException, TezException {
+ synchronized (dagClient) {
+ dagClient.tryKillDAG();
+ }
+ }
+
+ @Override
+ public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+ synchronized (dagClient) {
+ return dagClient.waitForCompletion();
+ }
+ }
+
+ @Override
+ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
+ throws IOException, TezException, InterruptedException {
+ synchronized (dagClient) {
+ return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
+ }
+ }
+ }
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedDec 3, '15 at 12:17a
activeDec 3, '15 at 12:17a
posts1
users1
websitehive.apache.org

1 user in discussion

Sershe: 1 post

People

Translate

site design / logo © 2021 Grokbase