FAQ
Author: hashutosh
Date: Thu Mar 13 22:46:57 2014
New Revision: 1577360

URL: http://svn.apache.org/r1577360
Log:
HIVE-6609 : Doing Ctrl-C on hive cli doesn't kill running MR jobs on hadoop-2 (Ashutosh Chauhan via Jason Dere)

Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1577360&r1=1577359&r2=1577360&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Mar 13 22:46:57 2014
@@ -182,6 +182,7 @@ public class ExecDriver extends Task<Map
     *
     * @return true if fatal errors happened during job execution, false otherwise.
     */
+ @Override
    public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
       Counters.Counter cntr = ctrs.findCounter(
          HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP),
@@ -450,7 +451,7 @@ public class ExecDriver extends Task<Map
            if (returnVal != 0) {
              rj.killJob();
            }
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
            jobID = rj.getID().toString();
          }
        } catch (Exception e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1577360&r1=1577359&r2=1577360&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Thu Mar 13 22:46:57 2014
@@ -26,6 +26,7 @@ import java.util.Calendar;
  import java.util.Collections;
  import java.util.Enumeration;
  import java.util.HashMap;
+import java.util.LinkedList;
  import java.util.List;
  import java.util.Map;

@@ -50,6 +51,7 @@ import org.apache.hadoop.mapred.Counters
  import org.apache.hadoop.mapred.Counters.Counter;
  import org.apache.hadoop.mapred.JobClient;
  import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
  import org.apache.hadoop.mapred.JobStatus;
  import org.apache.hadoop.mapred.RunningJob;
  import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -67,9 +69,9 @@ public class HadoopJobExecHelper {

    protected transient int mapProgress = 0;
    protected transient int reduceProgress = 0;
- public transient String jobId;
- private LogHelper console;
- private HadoopJobExecHook callBackObj;
+ public transient JobID jobId;
+ private final LogHelper console;
+ private final HadoopJobExecHook callBackObj;

    /**
     * Update counters relevant to this task.
@@ -89,7 +91,7 @@ public class HadoopJobExecHelper {
     * @param jobId
     * @return
     */
- private static String getJobStartMsg(String jobId) {
+ private static String getJobStartMsg(JobID jobId) {
      return "Starting Job = " + jobId;
    }

@@ -99,7 +101,7 @@ public class HadoopJobExecHelper {
     * @param jobId
     * @return the job end message
     */
- public static String getJobEndMsg(String jobId) {
+ public static String getJobEndMsg(JobID jobId) {
      return "Ended Job = " + jobId;
    }

@@ -120,11 +122,11 @@ public class HadoopJobExecHelper {
    }


- public String getJobId() {
+ public JobID getJobId() {
      return jobId;
    }

- public void setJobId(String jobId) {
+ public void setJobId(JobID jobId) {
      this.jobId = jobId;
    }

@@ -148,8 +150,8 @@ public class HadoopJobExecHelper {
     * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
     * still jobs running.
     */
- public static Map<String, String> runningJobKillURIs = Collections
- .synchronizedMap(new HashMap<String, String>());
+ public static List<RunningJob> runningJobs = Collections
+ .synchronizedList(new LinkedList<RunningJob>());


    /**
@@ -161,32 +163,23 @@ public class HadoopJobExecHelper {
     *
     */
    static {
- if (new org.apache.hadoop.conf.Configuration()
- .getBoolean("webinterface.private.actions", false)) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            killRunningJobs();
          }
        });
- }
    }

    public static void killRunningJobs() {
- synchronized (runningJobKillURIs) {
- for (String uri : runningJobKillURIs.values()) {
+ synchronized (runningJobs) {
+ for (RunningJob rj : runningJobs) {
          try {
- System.err.println("killing job with: " + uri);
- java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
- .openConnection();
- conn.setRequestMethod("POST");
- int retCode = conn.getResponseCode();
- if (retCode != 200) {
- System.err.println("Got an error trying to kill job with URI: " + uri + " = "
- + retCode);
- }
+ System.err.println("killing job with: " + rj.getID());
+ rj.killJob();
          } catch (Exception e) {
- System.err.println("trying to kill job, caught: " + e);
+ LOG.warn(e);
+ System.err.println("Failed to kill job: "+ rj.getID());
            // do nothing
          }
        }
@@ -252,7 +245,7 @@ public class HadoopJobExecHelper {
          String logMapper;
          String logReducer;

- TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
+ TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
          if (mappers == null) {
            logMapper = "no information for number of mappers; ";
          } else {
@@ -264,7 +257,7 @@ public class HadoopJobExecHelper {
            logMapper = "number of mappers: " + numMap + "; ";
          }

- TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
+ TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
          if (reducers == null) {
            logReducer = "no information for number of reducers. ";
          } else {
@@ -281,13 +274,13 @@ public class HadoopJobExecHelper {
          initOutputPrinted = true;
        }

- RunningJob newRj = jc.getJob(rj.getJobID());
+ RunningJob newRj = jc.getJob(rj.getID());
        if (newRj == null) {
          // under exceptional load, hadoop may not be able to look up status
          // of finished jobs (because it has purged them from memory). From
          // hive's perspective - it's equivalent to the job having failed.
          // So raise a meaningful exception
- throw new IOException("Could not find status of job:" + rj.getJobID());
+ throw new IOException("Could not find status of job:" + rj.getID());
        } else {
          th.setRunningJob(newRj);
          rj = newRj;
@@ -428,12 +421,12 @@ public class HadoopJobExecHelper {
      } else {
        if (SessionState.get() != null) {
          SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
- getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
+ getId(), Keys.TASK_HADOOP_ID, rj.getID().toString());
        }
- console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
+ console.printInfo(getJobStartMsg(rj.getID()) + ", Tracking URL = "
            + rj.getTrackingURL());
        console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
- + " job -kill " + rj.getJobID());
+ + " job -kill " + rj.getID());
      }
    }

@@ -509,7 +502,7 @@ public class HadoopJobExecHelper {


    public int progress(RunningJob rj, JobClient jc) throws IOException {
- jobId = rj.getJobID();
+ jobId = rj.getID();

      int returnVal = 0;

@@ -527,7 +520,7 @@ public class HadoopJobExecHelper {

      // add to list of running jobs to kill in case of abnormal shutdown

- runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
+ runningJobs.add(rj);

      ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
      jobInfo(rj);
@@ -548,7 +541,7 @@ public class HadoopJobExecHelper {

      boolean success = mapRedStats.isSuccess();

- String statusMesg = getJobEndMsg(rj.getJobID());
+ String statusMesg = getJobEndMsg(rj.getID());
      if (!success) {
        statusMesg += " with errors";
        returnVal = 2;
@@ -592,8 +585,7 @@ public class HadoopJobExecHelper {
        }
      }
      // Compute the reducers run time statistics for the job
- ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes,
- new String(this.jobId));
+ ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes);
      // Adding the reducers run time statistics for the job in the QueryPlan
      this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob);
      return;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1577360&r1=1577359&r2=1577360&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Mar 13 22:46:57 2014
@@ -241,7 +241,7 @@ public class BlockMergeTask extends Task
            if (returnVal != 0) {
              rj.killJob();
            }
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
            jobID = rj.getID().toString();
          }
          RCFileMergeMapper.jobClose(outputPath, success, job, console,
@@ -372,5 +372,5 @@ public class BlockMergeTask extends Task
    @Override
    public void logPlanProgress(SessionState ss) throws IOException {
      // no op
- }
+ }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1577360&r1=1577359&r2=1577360&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Thu Mar 13 22:46:57 2014
@@ -248,7 +248,7 @@ public class PartialScanTask extends Tas
            if (returnVal != 0) {
              rj.killJob();
            }
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
            jobID = rj.getID().toString();
          }
        } catch (Exception e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1577360&r1=1577359&r2=1577360&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java Thu Mar 13 22:46:57 2014
@@ -217,7 +217,7 @@ public class ColumnTruncateTask extends
            if (returnVal != 0) {
              rj.killJob();
            }
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
            jobID = rj.getID().toString();
          }
          ColumnTruncateMapper.jobClose(outputPath, success, job, console,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java?rev=1577360&r1=1577359&r2=1577360&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java Thu Mar 13 22:46:57 2014
@@ -32,9 +32,6 @@ import java.util.List;
   */
  public class ReducerTimeStatsPerJob {

- // stores the JobId of the job
- private final String jobId;
-
    // Stores the temporal statistics in milliseconds for reducers
    // specific to a Job
    private final long minimumTime;
@@ -47,8 +44,7 @@ public class ReducerTimeStatsPerJob {
     * Computes the temporal run time statistics of the reducers
     * for a specific JobId.
     */
- public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes, String jobId) {
- this.jobId = jobId;
+ public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes) {

      // If no Run times present, then set -1, indicating no values
      if (!reducersRunTimes.isEmpty()) {
@@ -103,9 +99,4 @@ public class ReducerTimeStatsPerJob {
    public double getStandardDeviationTime() {
      return this.standardDeviationTime;
    }
-
- public String getJobId() {
- return this.jobId;
- }
-
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 13, '14 at 10:47p
activeMar 13, '14 at 10:47p
posts1
users1
websitehive.apache.org

1 user in discussion

Hashutosh: 1 post

People

Translate

site design / logo © 2021 Grokbase