FAQ
Repository: hive
Updated Branches:
   refs/heads/branch-2.0 e8388ae67 -> 0ffef3f63


HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned(). it should not. (Eugene Koifman, reviewed by Alan Gates) (ADDENDUM)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ffef3f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ffef3f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ffef3f6

Branch: refs/heads/branch-2.0
Commit: 0ffef3f63aeadc3eabf6df4a1e3bf7c5ca1fc7ff
Parents: e8388ae
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu Jan 21 18:39:57 2016 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu Jan 21 18:39:57 2016 -0800

----------------------------------------------------------------------
  .../ql/txn/AcidCompactionHistoryService.java | 83 ++++++++++++++++++
  .../txn/compactor/HouseKeeperServiceBase.java | 92 ++++++++++++++++++++
  2 files changed, 175 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ffef3f6/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
new file mode 100644
index 0000000..a91ca5c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Purges obsolete items from compaction history data
+ */
+public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class);
+
+ @Override
+ protected long getStartDelayMs() {
+ return 0;
+ }
+ @Override
+ protected long getIntervalMs() {
+ return hiveConf.getTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ @Override
+ protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ return new ObsoleteEntryReaper(hiveConf, isAliveCounter);
+ }
+
+ @Override
+ public String getServiceDescription() {
+ return "Removes obsolete entries from Compaction History";
+ }
+
+ private static final class ObsoleteEntryReaper implements Runnable {
+ private final CompactionTxnHandler txnHandler;
+ private final AtomicInteger isAliveCounter;
+ private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ txnHandler = new CompactionTxnHandler(hiveConf);
+ this.isAliveCounter = isAliveCounter;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ txnHandler.purgeCompactionHistory();
+ int count = isAliveCounter.incrementAndGet();
+ LOG.info("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
+ }
+ catch(Throwable t) {
+ LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ }
+ }
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffef3f6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
new file mode 100644
index 0000000..947f17c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class HouseKeeperServiceBase implements HouseKeeperService {
+ private static final Logger LOG = LoggerFactory.getLogger(HouseKeeperServiceBase.class);
+ private ScheduledExecutorService pool = null;
+ protected final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
+ protected HiveConf hiveConf;
+
+ @Override
+ public void start(HiveConf hiveConf) throws Exception {
+ this.hiveConf = hiveConf;
+ HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
+ if(!mgr.supportsAcid()) {
+ LOG.info(this.getClass().getName() + " not started since " +
+ mgr.getClass().getName() + " does not support Acid.");
+ return;//there are no transactions in this case
+ }
+ pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ private final AtomicInteger threadCounter = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, this.getClass().getName() + "-" + threadCounter.getAndIncrement());
+ }
+ });
+
+ TimeUnit tu = TimeUnit.MILLISECONDS;
+ pool.scheduleAtFixedRate(getScheduedAction(hiveConf, isAliveCounter), getStartDelayMs(),
+ getIntervalMs(), tu);
+ LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + getStartDelayMs() + "/" +
+ getIntervalMs() + " " + tu);
+ }
+
+ @Override
+ public void stop() {
+ if(pool != null && !pool.isShutdown()) {
+ pool.shutdown();
+ }
+ pool = null;
+ }
+
+ /**
+ * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1.
+ * Starts with {@link java.lang.Integer#MIN_VALUE}
+ */
+ @Override
+ public int getIsAliveCounter() {
+ return isAliveCounter.get();
+ }
+
+ /**
+ * Delay in millis before first run of the task of this service.
+ */
+ protected abstract long getStartDelayMs();
+ /**
+ * Determines how fequently the service is running its task.
+ */
+ protected abstract long getIntervalMs();
+
+ /**
+ * The actual task implementation. Must increment the counter on each iteration.
+ */
+ protected abstract Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter);
+}

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJan 22, '16 at 2:40a
activeJan 22, '16 at 2:40a
posts1
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 1 post

People

Translate

site design / logo © 2021 Grokbase