FAQ
Repository: hive
Updated Branches:
   refs/heads/llap b8cb9a1b9 -> c5dc87a8e


HIVE-10762. LLAP: Kill any fragments running in a daemon when a query completes. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c5dc87a8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c5dc87a8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c5dc87a8

Branch: refs/heads/llap
Commit: c5dc87a8e8efb025925c236898f425223f23712a
Parents: b8cb9a1
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jun 16 23:48:13 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jun 16 23:48:13 2015 -0700

----------------------------------------------------------------------
  .../llap/configuration/LlapConfiguration.java | 24 +++-
  .../hive/llap/daemon/KilledTaskHandler.java | 3 +-
  .../hive/llap/daemon/QueryFailedHandler.java | 20 +++
  .../hive/llap/daemon/impl/AMReporter.java | 122 ++++++++++++++++---
  .../llap/daemon/impl/ContainerRunnerImpl.java | 34 +++++-
  .../hive/llap/daemon/impl/LlapDaemon.java | 11 +-
  .../llap/daemon/impl/QueryFragmentInfo.java | 4 +
  .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 5 +
  .../hive/llap/daemon/impl/QueryTracker.java | 17 ++-
  .../llap/daemon/impl/TaskExecutorService.java | 10 +-
  .../llap/daemon/impl/TaskRunnerCallable.java | 6 +-
  .../protocol/LlapTaskUmbilicalProtocol.java | 4 +-
  .../llap/tezplugins/LlapTaskCommunicator.java | 5 +-
  13 files changed, 229 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index dd24661..f5aa2a6 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -51,8 +51,26 @@ public class LlapConfiguration extends Configuration {
    public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false;

    // This needs to be kept below the task timeout interval, but otherwise as high as possible to avoid unnecessary traffic.
- public static final String LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "liveness.heartbeat.interval-ms";
- public static final long LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l;
+ public static final String LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "am.liveness.heartbeat.interval-ms";
+ public static final long LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l;
+
+ /**
+ * Amount of time to wait on connection failures to the AM from an LLAP daemon before considering
+ * the AM to be dead
+ */
+ public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS =
+ LLAP_PREFIX + "am.liveness.connection.timeout-millis";
+ public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10000l;
+
+ // Not used yet - since the Writable RPC engine does not support this policy.
+ /**
+ * Sleep duration while waiting to retry connection failures to the AM from the daemon for the
+ * general keep-alive thread
+ */
+ public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS =
+ LLAP_PREFIX + "am.liveness.connection.sleep-between-retries-millis";
+ public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT =
+ 2000l;


    // Section for configs used in AM and executors
@@ -137,6 +155,8 @@ public class LlapConfiguration extends Configuration {
        LLAP_PREFIX + "task.communicator.connection.sleep-between-retries-millis";
    public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l;

+
+
    public static final String LLAP_DAEMON_SERVICE_PORT = LLAP_DAEMON_PREFIX + "service.port";
    public static final int LLAP_DAEMON_SERVICE_PORT_DEFAULT = 15002;


http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
index 8b481c8..7cb433b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
@@ -24,5 +24,6 @@ public interface KilledTaskHandler {
    // inferred from this.
    // Passing in parameters until there's some dag information stored and tracked in the daemon.
    void taskKilled(String amLocation, int port, String user,
- Token<JobTokenIdentifier> jobToken, TezTaskAttemptID taskAttemptId);
+ Token<JobTokenIdentifier> jobToken, String queryId, String dagName,
+ TezTaskAttemptID taskAttemptId);
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
new file mode 100644
index 0000000..4e62a68
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+public interface QueryFailedHandler {
+
+ public void queryFailed(String queryId, String dagName);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 1ba18fc..8ec9f22 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -14,6 +14,7 @@

  package org.apache.hadoop.hive.llap.daemon.impl;

+import javax.net.SocketFactory;
  import java.io.IOException;
  import java.net.InetSocketAddress;
  import java.security.PrivilegedExceptionAction;
@@ -39,8 +40,11 @@ import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.common.CallableWithNdc;
  import org.apache.hadoop.hive.llap.LlapNodeId;
  import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
  import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
  import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
  import org.apache.hadoop.ipc.RPC;
  import org.apache.hadoop.net.NetUtils;
  import org.apache.hadoop.security.SecurityUtil;
@@ -79,9 +83,13 @@ public class AMReporter extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);

    private volatile LlapNodeId nodeId;
+ private final QueryFailedHandler queryFailedHandler;
    private final Configuration conf;
    private final ListeningExecutorService queueLookupExecutor;
    private final ListeningExecutorService executor;
+ private final RetryPolicy retryPolicy;
+ private final long retryTimeout;
+ private final SocketFactory socketFactory;
    private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new DelayQueue();
    private final AtomicReference<InetSocketAddress> localAddress;
    private final long heartbeatInterval;
@@ -91,9 +99,11 @@ public class AMReporter extends AbstractService {
    private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
    volatile ListenableFuture<Void> queueLookupFuture;

- public AMReporter(AtomicReference<InetSocketAddress> localAddress, Configuration conf) {
+ public AMReporter(AtomicReference<InetSocketAddress> localAddress,
+ QueryFailedHandler queryFailedHandler, Configuration conf) {
      super(AMReporter.class.getName());
      this.localAddress = localAddress;
+ this.queryFailedHandler = queryFailedHandler;
      this.conf = conf;
      ExecutorService rawExecutor = Executors.newCachedThreadPool(
          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
@@ -102,9 +112,25 @@ public class AMReporter extends AbstractService {
          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());
      this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2);
      this.heartbeatInterval =
- conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS,
- LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
-
+ conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS,
+ LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
+
+ this.retryTimeout =
+ conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS,
+ LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
+ long retrySleep = conf.getLong(
+ LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS,
+ LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT);
+ this.retryPolicy = RetryPolicies
+ .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
+ TimeUnit.MILLISECONDS);
+
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+
+ LOG.info("Setting up AMReporter with " +
+ "heartbeatInterval(ms)=" + heartbeatInterval +
+ ", retryTime(ms)=" + retryTimeout +
+ ", retrySleep(ms)=" + retrySleep);
    }

    @Override
@@ -143,23 +169,26 @@ public class AMReporter extends AbstractService {
      }
    }

-
- public void registerTask(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken) {
+ public void registerTask(String amLocation, int port, String user,
+ Token<JobTokenIdentifier> jobToken, String queryId, String dagName) {
      if (LOG.isTraceEnabled()) {
- LOG.trace("Registering for heartbeat: " + amLocation + ":" + port);
+ LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for dagName=" + dagName);
      }
      AMNodeInfo amNodeInfo;
      synchronized (knownAppMasters) {
        LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
        amNodeInfo = knownAppMasters.get(amNodeId);
        if (amNodeInfo == null) {
- amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+ amNodeInfo =
+ new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory,
+ conf);
          knownAppMasters.put(amNodeId, amNodeInfo);
          // Add to the queue only the first time this is registered, and on
          // subsequent instances when it's taken off the queue.
          amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval);
          pendingHeartbeatQueeu.add(amNodeInfo);
        }
+ amNodeInfo.setCurrentDagName(dagName);
        amNodeInfo.incrementAndGetTaskCount();
      }
    }
@@ -182,11 +211,13 @@ public class AMReporter extends AbstractService {
    }

    public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken,
- final TezTaskAttemptID taskAttemptId) {
+ final String queryId, final String dagName, final TezTaskAttemptID taskAttemptId) {
      // Not re-using the connection for the AM heartbeat - which may or may not be open by this point.
      // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection.
      LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
- AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+ AMNodeInfo amNodeInfo =
+ new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory,
+ conf);

      // Even if the service hasn't started up. It's OK to make this invocation since this will
      // only happen after the AtomicReference address has been populated. Not adding an additional check.
@@ -212,9 +243,15 @@ public class AMReporter extends AbstractService {
      protected Void callInternal() {
        while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
          try {
- AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
- if (amNodeInfo.getTaskCount() == 0) {
+ final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
+ if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) {
              synchronized (knownAppMasters) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removing am {} with last associated dag{} from heartbeat with taskCount={}, amFailed={}",
+ amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), amNodeInfo.getTaskCount(),
+ amNodeInfo.hasAmFailed(), amNodeInfo);
+ }
                knownAppMasters.remove(amNodeInfo.amNodeId);
              }
              amNodeInfo.stopUmbilical();
@@ -223,7 +260,22 @@ public class AMReporter extends AbstractService {
              long next = System.currentTimeMillis() + heartbeatInterval;
              amNodeInfo.setNextHeartbeatTime(next);
              pendingHeartbeatQueeu.add(amNodeInfo);
- executor.submit(new AMHeartbeatCallable(amNodeInfo));
+ ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ String currentDagName = amNodeInfo.getCurrentDagName();
+ amNodeInfo.setAmFailed(true);
+ LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
+ amNodeInfo.amNodeId, currentDagName, t);
+ queryFailedHandler.queryFailed(null, currentDagName);
+ }
+ });
            }
          } catch (InterruptedException e) {
            if (isShutdown.get()) {
@@ -284,11 +336,14 @@ public class AMReporter extends AbstractService {
            amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
                nodeId.getPort());
          } catch (IOException e) {
- // TODO Ideally, this could be used to avoid running a task - AM down / unreachable, so there's no point running it.
- LOG.warn("Failed to communicate with AM. May retry later: " + amNodeInfo.amNodeId, e);
+ String currentDagName = amNodeInfo.getCurrentDagName();
+ amNodeInfo.setAmFailed(true);
+ LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
+ amNodeInfo.amNodeId, currentDagName, e);
+ queryFailedHandler.queryFailed(null, currentDagName);
          } catch (InterruptedException e) {
            if (!isShutdown.get()) {
- LOG.warn("Interrupted while trying to send heartbeat to AM: " + amNodeInfo.amNodeId, e);
+ LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
            }
          }
        } else {
@@ -308,15 +363,28 @@ public class AMReporter extends AbstractService {
      private final Token<JobTokenIdentifier> jobToken;
      private final Configuration conf;
      private final LlapNodeId amNodeId;
+ private final RetryPolicy retryPolicy;
+ private final long timeout;
+ private final SocketFactory socketFactory;
+ private final AtomicBoolean amFailed = new AtomicBoolean(false);
+ private String currentDagName;
      private LlapTaskUmbilicalProtocol umbilical;
      private long nextHeartbeatTime;


      public AMNodeInfo(LlapNodeId amNodeId, String user,
                        Token<JobTokenIdentifier> jobToken,
+ String currentDagName,
+ RetryPolicy retryPolicy,
+ long timeout,
+ SocketFactory socketFactory,
                        Configuration conf) {
        this.user = user;
        this.jobToken = jobToken;
+ this.currentDagName = currentDagName;
+ this.retryPolicy = retryPolicy;
+ this.timeout = timeout;
+ this.socketFactory = socketFactory;
        this.conf = conf;
        this.amNodeId = amNodeId;
      }
@@ -331,8 +399,10 @@ public class AMReporter extends AbstractService {
          umbilical = ugi.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
            @Override
            public LlapTaskUmbilicalProtocol run() throws Exception {
- return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
- LlapTaskUmbilicalProtocol.versionID, address, conf);
+ return RPC
+ .getProxy(LlapTaskUmbilicalProtocol.class, LlapTaskUmbilicalProtocol.versionID,
+ address, UserGroupInformation.getCurrentUser(), conf, socketFactory,
+ (int) timeout);
            }
          });
        }
@@ -354,10 +424,26 @@ public class AMReporter extends AbstractService {
        return taskCount.decrementAndGet();
      }

+ void setAmFailed(boolean val) {
+ amFailed.set(val);
+ }
+
+ boolean hasAmFailed() {
+ return amFailed.get();
+ }
+
      int getTaskCount() {
        return taskCount.get();
      }

+ public synchronized String getCurrentDagName() {
+ return currentDagName;
+ }
+
+ public synchronized void setCurrentDagName(String currentDagName) {
+ this.currentDagName = currentDagName;
+ }
+
      synchronized void setNextHeartbeatTime(long nextTime) {
        nextHeartbeatTime = nextTime;
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 10e192e..e26852a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
  import java.nio.ByteBuffer;
  import java.util.Arrays;
  import java.util.HashMap;
+import java.util.List;
  import java.util.Map;
  import java.util.Set;
  import java.util.concurrent.RejectedExecutionException;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
  import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
  import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
  import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
@@ -58,7 +60,7 @@ import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

  // TODO Convert this to a CompositeService
-public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler {
+public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {

    // TODO Setup a set of threads to process incoming requests.
    // Make sure requests for a single dag/query are handled by the same thread
@@ -212,7 +214,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu

    @Override
    public void queryComplete(QueryCompleteRequestProto request) {
- queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay());
+ LOG.info("Processing queryComplete notification for {}", request.getDagName());
+ List<QueryFragmentInfo> knownFragments =
+ queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay());
+ LOG.info("DBG: Pending fragment count for completed query {} = {}", request.getDagName(),
+ knownFragments.size());
+ for (QueryFragmentInfo fragmentInfo : knownFragments) {
+ LOG.info("DBG: Issuing killFragment for completed query {} {}", request.getDagName(),
+ fragmentInfo.getFragmentIdentifierString());
+ executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+ }
    }

    @Override
@@ -288,12 +299,27 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
      queryTracker.fragmentComplete(fragmentInfo);
    }

+ @Override
+ public void queryFailed(String queryId, String dagName) {
+ LOG.info("Processing query failed notification for {}", dagName);
+ List<QueryFragmentInfo> knownFragments =
+ queryTracker.queryComplete(null, dagName, -1);
+ LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName,
+ knownFragments.size());
+ for (QueryFragmentInfo fragmentInfo : knownFragments) {
+ LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName,
+ fragmentInfo.getFragmentIdentifierString());
+ executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+ }
+ }
+
    private class KilledTaskHandlerImpl implements KilledTaskHandler {

      @Override
      public void taskKilled(String amLocation, int port, String user,
- Token<JobTokenIdentifier> jobToken, TezTaskAttemptID taskAttemptId) {
- amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId);
+ Token<JobTokenIdentifier> jobToken, String queryId, String dagName,
+ TezTaskAttemptID taskAttemptId) {
+ amReporter.taskKilled(amLocation, port, user, jobToken, queryId, dagName, taskAttemptId);
      }
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 7959945..75d1995 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
  import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
  import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -157,7 +158,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
          " sessionId: " + sessionId);


- this.amReporter = new AMReporter(address, daemonConf);
+ this.amReporter = new AMReporter(address, new QueryFailedHandlerProxy(), daemonConf);


      this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort);
@@ -418,4 +419,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
      }
    }

+
+ private class QueryFailedHandlerProxy implements QueryFailedHandler {
+
+ @Override
+ public void queryFailed(String queryId, String dagName) {
+ containerRunner.queryFailed(queryId, dagName);
+ }
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
index 554864e..aa065a9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -71,6 +71,10 @@ public class QueryFragmentInfo {
      return attemptNumber;
    }

+ public String getFragmentIdentifierString() {
+ return fragmentSpec.getFragmentIdentifierString();
+ }
+
    /**
     * Check whether a task can run to completion or may end up blocking on it's sources.
     * This currently happens via looking up source state.

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 6aed60f..27f2d4c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;

  import com.google.common.base.Preconditions;
  import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
  import com.google.common.collect.Multimap;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
@@ -101,6 +102,10 @@ public class QueryInfo {
      knownFragments.remove(fragmentInfo);
    }

+ public List<QueryFragmentInfo> getRegisteredFragments() {
+ return Lists.newArrayList(knownFragments);
+ }
+
    private synchronized void createLocalDirs() throws IOException {
      if (localDirs == null) {
        localDirs = new String[localDirsBase.length];

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index d796b24..19147e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
  import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
@@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory;

  import java.io.IOException;
  import java.util.Collections;
+import java.util.List;
  import java.util.Set;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
@@ -47,6 +49,7 @@ public class QueryTracker extends CompositeService {

    private final String[] localDirsBase;
    private final FileSystem localFs;
+ private final long defaultDeleteDelaySeconds;

    // TODO At the moment there's no way of knowing whether a query is running or not.
    // A race is possible between dagComplete and registerFragment - where the registerFragment
@@ -75,6 +78,9 @@ public class QueryTracker extends CompositeService {
        throw new RuntimeException("Failed to setup local filesystem instance", e);
      }

+ this.defaultDeleteDelaySeconds = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
+ LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+
      queryFileCleaner = new QueryFileCleaner(conf, localFs);
      addService(queryFileCleaner);
    }
@@ -142,7 +148,10 @@ public class QueryTracker extends CompositeService {
     * @param dagName
     * @param deleteDelay
     */
- void queryComplete(String queryId, String dagName, long deleteDelay) {
+ List<QueryFragmentInfo> queryComplete(String queryId, String dagName, long deleteDelay) {
+ if (deleteDelay == -1) {
+ deleteDelay = defaultDeleteDelaySeconds;
+ }
      ReadWriteLock dagLock = getDagLock(dagName);
      dagLock.writeLock().lock();
      try {
@@ -153,6 +162,7 @@ public class QueryTracker extends CompositeService {
        QueryInfo queryInfo = queryInfoMap.remove(dagName);
        if (queryInfo == null) {
          LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
+ return Collections.emptyList();
        }
        String[] localDirs = queryInfo.getLocalDirsNoCreate();
        if (localDirs != null) {
@@ -161,8 +171,13 @@ public class QueryTracker extends CompositeService {
            ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier());
          }
        }
+ // Clearing this before sending a kill is OK, since canFinish will change to false.
+ // Ideally this should be a state machine where kills are issued to the executor,
+ // and the structures are cleaned up once all tasks complete. New requests, however, should not
+ // be allowed after a query complete is received.
        sourceCompletionMap.remove(dagName);
        dagSpecificLocks.remove(dagName);
+ return queryInfo.getRegisteredFragments();
        // TODO HIVE-10762 Issue a kill message to all running fragments for this container.
        // TODO HIVE-10535 Cleanup map join cache
      } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 4c0fb8e..0fd89de 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -263,11 +263,17 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
      }
    }

- private static class WaitQueueWorkerCallback implements FutureCallback {
+ private class WaitQueueWorkerCallback implements FutureCallback {

      @Override
      public void onSuccess(Object result) {
- LOG.error("Wait queue scheduler worker exited with success!");
+ if (isShutdown.get()) {
+ LOG.info("Wait queue scheduler worker exited with success!");
+ } else {
+ LOG.error("Wait queue scheduler worker exited with success!");
+ Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
+ new IllegalStateException("WaitQueue worked exited before shutdown"));
+ }
      }

      @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index cd6a0da..9b14fa3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
  import java.security.PrivilegedExceptionAction;
  import java.util.HashMap;
  import java.util.Map;
-import java.util.Set;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
  import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,7 +125,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
      // Register with the AMReporter when the callable is setup. Unregister once it starts running.
      if (jobToken != null) {
      this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
- request.getUser(), jobToken);
+ request.getUser(), jobToken, null, request.getFragmentSpec().getDagName());
      }
      this.metrics = metrics;
      this.requestId = getRequestId(request);
@@ -287,7 +286,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     */
    public void reportTaskKilled() {
      killedTaskHandler
- .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken,
+ .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, null,
+ taskSpec.getDAGName(),
              taskSpec.getTaskAttemptID());
    }


http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index 2f5e11d..fae7654 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -32,8 +32,8 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
        throws IOException, TezException;

- public void nodeHeartbeat(Text hostname, int port);
+ public void nodeHeartbeat(Text hostname, int port) throws IOException;

- public void taskKilled(TezTaskAttemptID taskAttemptId);
+ public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;

  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 6a38d85..2305b8c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -31,6 +31,7 @@ import com.google.protobuf.ServiceException;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hive.llap.LlapNodeId;
  import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
  import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
@@ -449,7 +450,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
      }

      @Override
- public void nodeHeartbeat(Text hostname, int port) {
+ public void nodeHeartbeat(Text hostname, int port) throws IOException {
        entityTracker.nodePinged(hostname.toString(), port);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
@@ -457,7 +458,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
      }

      @Override
- public void taskKilled(TezTaskAttemptID taskAttemptId) {
+ public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
        // TODO Unregister the task for state updates, which could in turn unregister the node.
        getTaskCommunicatorContext().taskKilled(taskAttemptId,
            TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJun 17, '15 at 6:48a
activeJun 17, '15 at 6:48a
posts1
users1
websitehive.apache.org

1 user in discussion

Sseth: 1 post

People

Translate

site design / logo © 2021 Grokbase