Grokbase Groups Hive commits May 2016
FAQ
Repository: hive
Updated Branches:
   refs/heads/master b9e4fe856 -> 66a021164


HIVE-13458 : Heartbeater doesn't fail query when heartbeat fails (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66a02116
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66a02116
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66a02116

Branch: refs/heads/master
Commit: 66a02116453427601fd806fe999a753e3a201d49
Parents: b9e4fe8
Author: Wei Zheng <weiz@apache.org>
Authored: Wed May 11 16:14:02 2016 -0700
Committer: Wei Zheng <weiz@apache.org>
Committed: Wed May 11 16:14:02 2016 -0700

----------------------------------------------------------------------
  .../org/apache/hadoop/hive/conf/HiveConf.java | 1 +
  .../java/org/apache/hadoop/hive/ql/Context.java | 18 ++++++
  .../hadoop/hive/ql/exec/mr/ExecDriver.java | 3 +-
  .../hive/ql/exec/mr/HadoopJobExecHelper.java | 20 ++++--
  .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 7 ++-
  .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +-
  .../hadoop/hive/ql/io/merge/MergeFileTask.java | 2 +-
  .../ql/io/rcfile/stats/PartialScanTask.java | 2 +-
  .../io/rcfile/truncate/ColumnTruncateTask.java | 2 +-
  .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 66 ++++++++++++++++----
  .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +++++++
  .../index_compact_entry_limit.q.out | 2 +-
  .../index_compact_size_limit.q.out | 2 +-
  13 files changed, 124 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f2273c0..541af57 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1138,6 +1138,7 @@ public class HiveConf extends Configuration {
      HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
      HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false),
      HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false),
+ HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only. Will cause Heartbeater to fail.", false),

      HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
          "Merge small files at the end of a map-only job"),

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 6f18c82..92b4e5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -43,9 +43,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.exec.TaskRunner;
  import org.apache.hadoop.hive.ql.hooks.WriteEntity;
  import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
  import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
  import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
  import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
  import org.apache.hadoop.hive.ql.metadata.Table;
  import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
  import org.apache.hadoop.hive.ql.session.SessionState;
@@ -121,6 +123,8 @@ public class Context {

    private final String stagingDir;

+ private Heartbeater heartbeater;
+
    public Context(Configuration conf) throws IOException {
      this(conf, generateExecutionId());
    }
@@ -760,4 +764,18 @@ public class Context {
    public CompilationOpContext getOpContext() {
      return opContext;
    }
+
+ public Heartbeater getHeartbeater() {
+ return heartbeater;
+ }
+
+ public void setHeartbeater(Heartbeater heartbeater) {
+ this.heartbeater = heartbeater;
+ }
+
+ public void checkHeartbeaterLockException() throws LockException {
+ if (getHeartbeater() != null && getHeartbeater().getLockException() != null) {
+ throw getHeartbeater().getLockException();
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 926f6e8..8a6499b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -433,10 +433,11 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
        rj = jc.submitJob(job);
        this.jobID = rj.getJobID();
        updateStatusInQueryDisplay();
- returnVal = jobExecHelper.progress(rj, jc);
+ returnVal = jobExecHelper.progress(rj, jc, ctx);
        success = (returnVal == 0);
      } catch (Exception e) {
        e.printStackTrace();
+ setException(e);
        String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
        if (rj != null) {
          mesg = "Ended Job = " + rj.getJobID() + mesg;

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index c15316bb..5656f9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;

  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
  import org.apache.hadoop.hive.ql.MapRedStats;
  import org.apache.hadoop.hive.ql.QueryState;
  import org.apache.hadoop.hive.ql.exec.Operator;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
  import org.apache.hadoop.hive.ql.exec.TaskHandle;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
  import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
  import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -213,7 +215,7 @@ public class HadoopJobExecHelper {
      return this.callBackObj.checkFatalErrors(ctrs, errMsg);
    }

- private MapRedStats progress(ExecDriverTaskHandle th) throws IOException {
+ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockException {
      JobClient jc = th.getJobClient();
      RunningJob rj = th.getRunningJob();
      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
@@ -233,6 +235,10 @@ public class HadoopJobExecHelper {
      final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);

      while (!rj.isComplete()) {
+ if (th.getContext() != null) {
+ th.getContext().checkHeartbeaterLockException();
+ }
+
        try {
          Thread.sleep(pullInterval);
        } catch (InterruptedException e) {
@@ -452,6 +458,7 @@ public class HadoopJobExecHelper {
    private static class ExecDriverTaskHandle extends TaskHandle {
      JobClient jc;
      RunningJob rj;
+ Context ctx;

      JobClient getJobClient() {
        return jc;
@@ -461,9 +468,14 @@ public class HadoopJobExecHelper {
        return rj;
      }

- public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+ Context getContext() {
+ return ctx;
+ }
+
+ public ExecDriverTaskHandle(JobClient jc, RunningJob rj, Context ctx) {
        this.jc = jc;
        this.rj = rj;
+ this.ctx = ctx;
      }

      public void setRunningJob(RunningJob job) {
@@ -517,7 +529,7 @@ public class HadoopJobExecHelper {
    }


- public int progress(RunningJob rj, JobClient jc) throws IOException {
+ public int progress(RunningJob rj, JobClient jc, Context ctx) throws IOException, LockException {
      jobId = rj.getID();

      int returnVal = 0;
@@ -538,7 +550,7 @@ public class HadoopJobExecHelper {

      runningJobs.add(rj);

- ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
+ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, ctx);
      jobInfo(rj);
      MapRedStats mapRedStats = progress(th);


http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index b22991c..838f320 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -38,6 +38,7 @@ import java.util.TreeSet;

  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.ql.Context;
  import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
  import org.apache.hadoop.hive.ql.exec.MapOperator;
  import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -217,7 +218,7 @@ public class TezJobMonitor {
     * @return int 0 - success, 1 - killed, 2 - failed
     */
    public int monitorExecution(final DAGClient dagClient, HiveConf conf,
- DAG dag) throws InterruptedException {
+ DAG dag, Context ctx) throws InterruptedException {
      long monitorStartTime = System.currentTimeMillis();
      DAGStatus status = null;
      completed = new HashSet<String>();
@@ -247,6 +248,10 @@ public class TezJobMonitor {
      while (true) {

        try {
+ if (ctx != null) {
+ ctx.checkHeartbeaterLockException();
+ }
+
          status = dagClient.getDAGStatus(opts, checkInterval);
          progressMap = status.getVertexProgress();
          DAGStatus.State state = status.getState();

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index f059aea..9e114c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -187,7 +187,7 @@ public class TezTask extends Task<TezWork> {

        // finally monitor will print progress until the job is done
        TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(dagClient, conf, dag);
+ rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
        if (rc != 0) {
          this.setException(new HiveException(monitor.getDiagnostics()));
        }

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index 6b0343b..0fedd48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -151,7 +151,7 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
        // Finally SUBMIT the JOB!
        rj = jc.submitJob(job);

- returnVal = jobExecHelper.progress(rj, jc);
+ returnVal = jobExecHelper.progress(rj, jc, ctx);
        success = (returnVal == 0);

      } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index d31510d..6771b3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -223,7 +223,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
        // Finally SUBMIT the JOB!
        rj = jc.submitJob(job);

- returnVal = jobExecHelper.progress(rj, jc);
+ returnVal = jobExecHelper.progress(rj, jc, ctx);
        success = (returnVal == 0);

      } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 8acd6e0..ffc6311 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -187,7 +187,7 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
        // Finally SUBMIT the JOB!
        rj = jc.submitJob(job);

- returnVal = jobExecHelper.progress(rj, jc);
+ returnVal = jobExecHelper.progress(rj, jc, ctx);
        success = (returnVal == 0);

      } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 8c3a1d2..4539e71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -77,6 +77,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     */
    private int statementId = -1;

+ // QueryId for the query in current transaction
+ private String queryId;
+
    // ExecutorService for sending heartbeat to metastore periodically.
    private static ScheduledExecutorService heartbeatExecutorService = null;
    private ScheduledFuture<?> heartbeatTask = null;
@@ -136,8 +139,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
    @Override
    public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
      try {
- acquireLocks(plan, ctx, username, true);
- startHeartbeat();
+ acquireLocksWithHeartbeatDelay(plan, ctx, username, 0);
      }
      catch(LockException e) {
        if(e.getCause() instanceof TxnAbortedException) {
@@ -307,7 +309,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
    @VisibleForTesting
    void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
      acquireLocks(plan, ctx, username, true);
- startHeartbeat(delay);
+ ctx.setHeartbeater(startHeartbeat(delay));
+ queryId = plan.getQueryId();
    }


@@ -414,28 +417,51 @@ public class DbTxnManager extends HiveTxnManagerImpl {
      }
    }

- private void startHeartbeat() throws LockException {
- startHeartbeat(0);
+ private Heartbeater startHeartbeat() throws LockException {
+ return startHeartbeat(0);
    }

    /**
     * This is for testing only. Normally client should call {@link #startHeartbeat()}
     * Make the heartbeater start before an initial delay period.
     * @param delay time to delay before first execution, in milliseconds
+ * @return heartbeater
     */
- void startHeartbeat(long delay) throws LockException {
+ Heartbeater startHeartbeat(long delay) throws LockException {
      long heartbeatInterval = getHeartbeatInterval(conf);
      assert heartbeatInterval > 0;
+ Heartbeater heartbeater = new Heartbeater(this, conf);
      heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
- new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS);
- LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " +
- 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS);
+ heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS);
+ LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " +
+ TimeUnit.MILLISECONDS + " for query: " + queryId);
+ return heartbeater;
    }

- private void stopHeartbeat() {
- if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
- heartbeatTask.cancel(false);
+ private void stopHeartbeat() throws LockException {
+ if (heartbeatTask != null) {
+ heartbeatTask.cancel(true);
+ long startTime = System.currentTimeMillis();
+ long sleepInterval = 100;
+ while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+ // We will wait for 30 seconds for the task to be cancelled.
+ // If it's still not cancelled (unlikely), we will just move on.
+ long now = System.currentTimeMillis();
+ if (now - startTime > 30000) {
+ LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
+ break;
+ }
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (InterruptedException e) {
+ }
+ sleepInterval *= 2;
+ }
+ if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
+ LOG.info("Stopped heartbeat for query: " + queryId);
+ }
        heartbeatTask = null;
+ queryId = null;
      }
    }

@@ -553,13 +579,21 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     */
    public static class Heartbeater implements Runnable {
      private HiveTxnManager txnMgr;
+ private HiveConf conf;
+
+ LockException lockException;
+ public LockException getLockException() {
+ return lockException;
+ }

      /**
       *
       * @param txnMgr transaction manager for this operation
       */
- public Heartbeater(HiveTxnManager txnMgr) {
+ public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
        this.txnMgr = txnMgr;
+ this.conf = conf;
+ lockException = null;
      }

      /**
@@ -568,10 +602,16 @@ public class DbTxnManager extends HiveTxnManagerImpl {
      @Override
      public void run() {
        try {
+ // For negative testing purpose..
+ if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
+ throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true");
+ }
+
          LOG.debug("Heartbeating...");
          txnMgr.heartbeat();
        } catch (LockException e) {
          LOG.error("Failed trying to heartbeat " + e.getMessage());
+ lockException = e;
        }
      }
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 472da0b..903337d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -51,6 +51,7 @@ import org.junit.Test;
  import org.junit.rules.TestName;

  import java.io.File;
+import java.io.IOException;
  import java.lang.reflect.Field;
  import java.util.ArrayList;
  import java.util.Arrays;
@@ -746,6 +747,27 @@ public class TestTxnCommands2 {
      Assert.assertEquals("", expected,
        runStatementOnDriver("select a,b from " + tblName + " order by a"));
    }
+
+ /**
+ * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found.
+ * When a heartbeat fails, the query should be failed too.
+ * @throws Exception
+ */
+ @Test
+ public void testFailHeartbeater() throws Exception {
+ // Fail heartbeater, so that we can get a RuntimeException from the query.
+ // More specifically, it's the original IOException thrown by either MR's or Tez's progress monitoring loop.
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
+ Exception exception = null;
+ try {
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
+ } catch (RuntimeException e) {
+ exception = e;
+ }
+ Assert.assertNotNull(exception);
+ Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true"));
+ }
+
    /**
     * takes raw data and turns it into a string as if from Driver.getResults()
     * sorts rows in dictionary order

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
index b65f94e..f844ee4 100644
--- a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
+++ b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
@@ -34,4 +34,4 @@ PREHOOK: type: QUERY
  PREHOOK: Input: default@src
  #### A masked pattern was here ####
  Job Submission failed with exception 'java.io.IOException(org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries)'
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
index 299cc47..9ff8f8f 100644
--- a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
+++ b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
@@ -34,4 +34,4 @@ PREHOOK: type: QUERY
  PREHOOK: Input: default@src
  #### A masked pattern was here ####
  Job Submission failed with exception 'java.io.IOException(Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size)'
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size

Search Discussions

  • Weiz at May 11, 2016 at 11:54 pm
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 6c160bc1c -> 70f352728


    HIVE-13458 : Heartbeater doesn't fail query when heartbeat fails (Wei Zheng, reviewed by Eugene Koifman)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/70f35272
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/70f35272
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/70f35272

    Branch: refs/heads/branch-1
    Commit: 70f3527288593a55c1ace66fc6e0c85753f8c27c
    Parents: 6c160bc
    Author: Wei Zheng <weiz@apache.org>
    Authored: Wed May 11 16:54:25 2016 -0700
    Committer: Wei Zheng <weiz@apache.org>
    Committed: Wed May 11 16:54:25 2016 -0700

    ----------------------------------------------------------------------
      .../org/apache/hadoop/hive/conf/HiveConf.java | 1 +
      .../java/org/apache/hadoop/hive/ql/Context.java | 15 +++++
      .../hadoop/hive/ql/exec/mr/ExecDriver.java | 3 +-
      .../hive/ql/exec/mr/HadoopJobExecHelper.java | 20 ++++--
      .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 7 ++-
      .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +-
      .../hadoop/hive/ql/io/merge/MergeFileTask.java | 2 +-
      .../ql/io/rcfile/stats/PartialScanTask.java | 2 +-
      .../io/rcfile/truncate/ColumnTruncateTask.java | 2 +-
      .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 66 ++++++++++++++++----
      .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +++++++
      .../index_compact_entry_limit.q.out | 2 +-
      .../index_compact_size_limit.q.out | 2 +-
      13 files changed, 121 insertions(+), 25 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    ----------------------------------------------------------------------
    diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    index 1086595..4c6aa71 100644
    --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    @@ -953,6 +953,7 @@ public class HiveConf extends Configuration {
          HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
          HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false),
          HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false),
    + HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only. Will cause Heartbeater to fail.", false),

          HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
              "Merge small files at the end of a map-only job"),

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    index a92331a..5fe08e7 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    @@ -44,9 +44,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.ql.exec.TaskRunner;
      import org.apache.hadoop.hive.ql.hooks.WriteEntity;
      import org.apache.hadoop.hive.ql.io.AcidUtils;
    +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
      import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
      import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
      import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
    +import org.apache.hadoop.hive.ql.lockmgr.LockException;
      import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
      import org.apache.hadoop.hive.ql.session.SessionState;
      import org.apache.hadoop.hive.shims.ShimLoader;
    @@ -114,6 +116,8 @@ public class Context {

        private final String stagingDir;

    + private Heartbeater heartbeater;
    +
        public Context(Configuration conf) throws IOException {
          this(conf, generateExecutionId());
        }
    @@ -713,4 +717,15 @@ public class Context {
          this.cboSucceeded = cboSucceeded;
        }

    + public Heartbeater getHeartbeater() {
    + return heartbeater;
    + }
    + public void setHeartbeater(Heartbeater heartbeater) {
    + this.heartbeater = heartbeater;
    + }
    + public void checkHeartbeaterLockException() throws LockException {
    + if (getHeartbeater() != null && getHeartbeater().getLockException() != null) {
    + throw getHeartbeater().getLockException();
    + }
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    index 4160399..982ccc7 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    @@ -423,10 +423,11 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
            // Finally SUBMIT the JOB!
            rj = jc.submitJob(job);

    - returnVal = jobExecHelper.progress(rj, jc);
    + returnVal = jobExecHelper.progress(rj, jc, ctx);
            success = (returnVal == 0);
          } catch (Exception e) {
            e.printStackTrace();
    + setException(e);
            String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
            if (rj != null) {
              mesg = "Ended Job = " + rj.getJobID() + mesg;

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    index f09d938..5bf49cd 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    @@ -37,12 +37,14 @@ import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.common.JavaUtils;
      import org.apache.hadoop.hive.conf.HiveConf;
      import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
    +import org.apache.hadoop.hive.ql.Context;
      import org.apache.hadoop.hive.ql.MapRedStats;
      import org.apache.hadoop.hive.ql.exec.Operator;
      import org.apache.hadoop.hive.ql.exec.Task;
      import org.apache.hadoop.hive.ql.exec.TaskHandle;
      import org.apache.hadoop.hive.ql.exec.Utilities;
      import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
    +import org.apache.hadoop.hive.ql.lockmgr.LockException;
      import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
      import org.apache.hadoop.hive.ql.session.SessionState;
      import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
    @@ -212,7 +214,7 @@ public class HadoopJobExecHelper {
          return this.callBackObj.checkFatalErrors(ctrs, errMsg);
        }

    - private MapRedStats progress(ExecDriverTaskHandle th) throws IOException {
    + private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockException {
          JobClient jc = th.getJobClient();
          RunningJob rj = th.getRunningJob();
          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
    @@ -232,6 +234,10 @@ public class HadoopJobExecHelper {
          final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);

          while (!rj.isComplete()) {
    + if (th.getContext() != null) {
    + th.getContext().checkHeartbeaterLockException();
    + }
    +
            try {
              Thread.sleep(pullInterval);
            } catch (InterruptedException e) {
    @@ -449,6 +455,7 @@ public class HadoopJobExecHelper {
        private static class ExecDriverTaskHandle extends TaskHandle {
          JobClient jc;
          RunningJob rj;
    + Context ctx;

          JobClient getJobClient() {
            return jc;
    @@ -458,9 +465,14 @@ public class HadoopJobExecHelper {
            return rj;
          }

    - public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
    + Context getContext() {
    + return ctx;
    + }
    +
    + public ExecDriverTaskHandle(JobClient jc, RunningJob rj, Context ctx) {
            this.jc = jc;
            this.rj = rj;
    + this.ctx = ctx;
          }

          public void setRunningJob(RunningJob job) {
    @@ -513,7 +525,7 @@ public class HadoopJobExecHelper {
        }


    - public int progress(RunningJob rj, JobClient jc) throws IOException {
    + public int progress(RunningJob rj, JobClient jc, Context ctx) throws IOException, LockException {
          jobId = rj.getID();

          int returnVal = 0;
    @@ -534,7 +546,7 @@ public class HadoopJobExecHelper {

          runningJobs.add(rj);

    - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
    + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, ctx);
          jobInfo(rj);
          MapRedStats mapRedStats = progress(th);


    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    index 14c4397..840f80e 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    @@ -25,6 +25,7 @@ import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
      import static org.fusesource.jansi.internal.CLibrary.isatty;

      import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.ql.Context;
      import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
      import org.apache.hadoop.hive.ql.exec.MapOperator;
      import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
    @@ -248,7 +249,7 @@ public class TezJobMonitor {
         * @return int 0 - success, 1 - killed, 2 - failed
         */
        public int monitorExecution(final DAGClient dagClient, HiveConf conf,
    - DAG dag) throws InterruptedException {
    + DAG dag, Context ctx) throws InterruptedException {
          DAGStatus status = null;
          completed = new HashSet<String>();
          diagnostics = new StringBuffer();
    @@ -288,6 +289,10 @@ public class TezJobMonitor {
          while (true) {

            try {
    + if (ctx != null) {
    + ctx.checkHeartbeaterLockException();
    + }
    +
              status = dagClient.getDAGStatus(opts);
              Map<String, Progress> progressMap = status.getVertexProgress();
              DAGStatus.State state = status.getState();

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    index b181de6..3c10169 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    @@ -167,7 +167,7 @@ public class TezTask extends Task<TezWork> {

            // finally monitor will print progress until the job is done
            TezJobMonitor monitor = new TezJobMonitor();
    - rc = monitor.monitorExecution(client, conf, dag);
    + rc = monitor.monitorExecution(client, conf, dag, ctx);
            if (rc != 0) {
              this.setException(new HiveException(monitor.getDiagnostics()));
            }

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
    index f1a8a06..7df44a7 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
    @@ -149,7 +149,7 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
            // Finally SUBMIT the JOB!
            rj = jc.submitJob(job);

    - returnVal = jobExecHelper.progress(rj, jc);
    + returnVal = jobExecHelper.progress(rj, jc, ctx);
            success = (returnVal == 0);

          } catch (Exception e) {

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    index a8ed4d1..f0e31ce 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    @@ -217,7 +217,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
            // Finally SUBMIT the JOB!
            rj = jc.submitJob(job);

    - returnVal = jobExecHelper.progress(rj, jc);
    + returnVal = jobExecHelper.progress(rj, jc, ctx);
            success = (returnVal == 0);

          } catch (Exception e) {

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
    index 57a62cd..c702fff 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
    @@ -185,7 +185,7 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
            // Finally SUBMIT the JOB!
            rj = jc.submitJob(job);

    - returnVal = jobExecHelper.progress(rj, jc);
    + returnVal = jobExecHelper.progress(rj, jc, ctx);
            success = (returnVal == 0);

          } catch (Exception e) {

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    index daa31a6..b0f1362 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    @@ -76,6 +76,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         */
        private int statementId = -1;

    + // QueryId for the query in current transaction
    + private String queryId;
    +
        // ExecutorService for sending heartbeat to metastore periodically.
        private static ScheduledExecutorService heartbeatExecutorService = null;
        private ScheduledFuture<?> heartbeatTask = null;
    @@ -135,8 +138,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        @Override
        public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
          try {
    - acquireLocks(plan, ctx, username, true);
    - startHeartbeat();
    + acquireLocksWithHeartbeatDelay(plan, ctx, username, 0);
          }
          catch(LockException e) {
            if(e.getCause() instanceof TxnAbortedException) {
    @@ -306,7 +308,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
        @VisibleForTesting
        void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
          acquireLocks(plan, ctx, username, true);
    - startHeartbeat(delay);
    + ctx.setHeartbeater(startHeartbeat(delay));
    + queryId = plan.getQueryId();
        }

        @Override
    @@ -412,28 +415,51 @@ public class DbTxnManager extends HiveTxnManagerImpl {
          }
        }

    - private void startHeartbeat() throws LockException {
    - startHeartbeat(0);
    + private Heartbeater startHeartbeat() throws LockException {
    + return startHeartbeat(0);
        }

        /**
         * This is for testing only. Normally client should call {@link #startHeartbeat()}
         * Make the heartbeater start before an initial delay period.
         * @param delay time to delay before first execution, in milliseconds
    + * @return heartbeater
         */
    - void startHeartbeat(long delay) throws LockException {
    + Heartbeater startHeartbeat(long delay) throws LockException {
          long heartbeatInterval = getHeartbeatInterval(conf);
          assert heartbeatInterval > 0;
    + Heartbeater heartbeater = new Heartbeater(this, conf);
          heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
    - new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS);
    - LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " +
    - 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS);
    + heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS);
    + LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " +
    + TimeUnit.MILLISECONDS + " for query: " + queryId);
    + return heartbeater;
        }

    - private void stopHeartbeat() {
    - if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
    - heartbeatTask.cancel(false);
    + private void stopHeartbeat() throws LockException {
    + if (heartbeatTask != null) {
    + heartbeatTask.cancel(true);
    + long startTime = System.currentTimeMillis();
    + long sleepInterval = 100;
    + while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
    + // We will wait for 30 seconds for the task to be cancelled.
    + // If it's still not cancelled (unlikely), we will just move on.
    + long now = System.currentTimeMillis();
    + if (now - startTime > 30000) {
    + LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
    + break;
    + }
    + try {
    + Thread.sleep(sleepInterval);
    + } catch (InterruptedException e) {
    + }
    + sleepInterval *= 2;
    + }
    + if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
    + LOG.info("Stopped heartbeat for query: " + queryId);
    + }
            heartbeatTask = null;
    + queryId = null;
          }
        }

    @@ -551,13 +577,21 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         */
        public static class Heartbeater implements Runnable {
          private HiveTxnManager txnMgr;
    + private HiveConf conf;
    +
    + LockException lockException;
    + public LockException getLockException() {
    + return lockException;
    + }

          /**
           *
           * @param txnMgr transaction manager for this operation
           */
    - public Heartbeater(HiveTxnManager txnMgr) {
    + public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
            this.txnMgr = txnMgr;
    + this.conf = conf;
    + lockException = null;
          }

          /**
    @@ -566,10 +600,16 @@ public class DbTxnManager extends HiveTxnManagerImpl {
          @Override
          public void run() {
            try {
    + // For negative testing purpose..
    + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
    + throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true");
    + }
    +
              LOG.debug("Heartbeating...");
              txnMgr.heartbeat();
            } catch (LockException e) {
              LOG.error("Failed trying to heartbeat " + e.getMessage());
    + lockException = e;
            }
          }
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    index e30dcbb..51e2e2f 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
    @@ -51,6 +51,7 @@ import org.junit.Test;
      import org.junit.rules.TestName;

      import java.io.File;
    +import java.io.IOException;
      import java.lang.reflect.Field;
      import java.util.ArrayList;
      import java.util.Arrays;
    @@ -745,6 +746,27 @@ public class TestTxnCommands2 {
          Assert.assertEquals("", expected,
            runStatementOnDriver("select a,b from " + tblName + " order by a"));
        }
    +
    + /**
    + * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found.
    + * When a heartbeat fails, the query should be failed too.
    + * @throws Exception
    + */
    + @Test
    + public void testFailHeartbeater() throws Exception {
    + // Fail heartbeater, so that we can get a RuntimeException from the query.
    + // More specifically, it's the original IOException thrown by either MR's or Tez's progress monitoring loop.
    + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
    + Exception exception = null;
    + try {
    + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
    + } catch (RuntimeException e) {
    + exception = e;
    + }
    + Assert.assertNotNull(exception);
    + Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true"));
    + }
    +
        /**
         * takes raw data and turns it into a string as if from Driver.getResults()
         * sorts rows in dictionary order

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
    index b65f94e..f844ee4 100644
    --- a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
    +++ b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
    @@ -34,4 +34,4 @@ PREHOOK: type: QUERY
      PREHOOK: Input: default@src
      #### A masked pattern was here ####
      Job Submission failed with exception 'java.io.IOException(org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries)'
    -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
    +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries

    http://git-wip-us.apache.org/repos/asf/hive/blob/70f35272/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
    index 299cc47..9ff8f8f 100644
    --- a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
    +++ b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
    @@ -34,4 +34,4 @@ PREHOOK: type: QUERY
      PREHOOK: Input: default@src
      #### A masked pattern was here ####
      Job Submission failed with exception 'java.io.IOException(Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size)'
    -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
    +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 11, '16 at 11:31p
activeMay 11, '16 at 11:54p
posts2
users1
websitehive.apache.org

1 user in discussion

Weiz: 2 posts

People

Translate

site design / logo © 2021 Grokbase