FAQ
Author: omalley
Date: Thu Apr 10 22:33:46 2014
New Revision: 1586489

URL: http://svn.apache.org/r1586489
Log:
HIVE-6319. Add compactor for ACID tables. (Alan Gates via omalley)

Added:
     hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
       - copied unchanged from r1586488, hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
     hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/txn/
       - copied from r1586488, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/
     hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/txn/
       - copied from r1586488, hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/
Modified:
     hive/branches/branch-0.13/ (props changed)
     hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
     hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
     hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
     hive/branches/branch-0.13/ql/pom.xml
     hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
     hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java

Propchange: hive/branches/branch-0.13/
------------------------------------------------------------------------------
   Merged /hive/trunk:r1586488

Modified: hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu Apr 10 22:33:46 2014
@@ -42,6 +42,10 @@ import java.util.Map.Entry;
  import java.util.Properties;
  import java.util.Set;
  import java.util.Timer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
  import java.util.regex.Pattern;

  import org.apache.commons.cli.OptionBuilder;
@@ -5077,7 +5081,12 @@ public class HiveMetaStore extends Thrif
          }
        });

- startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf);
+ Lock startLock = new ReentrantLock();
+ Condition startCondition = startLock.newCondition();
+ MetaStoreThread.BooleanPointer startedServing = new MetaStoreThread.BooleanPointer();
+ startMetaStoreThreads(conf, startLock, startCondition, startedServing);
+ startMetaStore(cli.port, ShimLoader.getHadoopThriftAuthBridge(), conf, startLock,
+ startCondition, startedServing);
      } catch (Throwable t) {
        // Catch the exception, log it and rethrow it.
        HMSHandler.LOG
@@ -5095,7 +5104,19 @@ public class HiveMetaStore extends Thrif
     */
    public static void startMetaStore(int port, HadoopThriftAuthBridge bridge)
        throws Throwable {
- startMetaStore(port, bridge, new HiveConf(HMSHandler.class));
+ startMetaStore(port, bridge, new HiveConf(HMSHandler.class), null, null, null);
+ }
+
+ /**
+ * Start the metastore store.
+ * @param port
+ * @param bridge
+ * @param conf
+ * @throws Throwable
+ */
+ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
+ HiveConf conf) throws Throwable {
+ startMetaStore(port, bridge, conf, null, null, null);
    }

    /**
@@ -5108,7 +5129,8 @@ public class HiveMetaStore extends Thrif
     * @throws Throwable
     */
    public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
- HiveConf conf) throws Throwable {
+ HiveConf conf, Lock startLock, Condition startCondition,
+ MetaStoreThread.BooleanPointer startedServing) throws Throwable {
      try {

        // Server will create new threads up to max as necessary. After an idle
@@ -5176,6 +5198,10 @@ public class HiveMetaStore extends Thrif
        HMSHandler.LOG.info("Options.maxWorkerThreads = "
            + maxWorkerThreads);
        HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+
+ if (startLock != null) {
+ signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
+ }
        tServer.serve();
      } catch (Throwable x) {
        x.printStackTrace();
@@ -5183,4 +5209,119 @@ public class HiveMetaStore extends Thrif
        throw x;
      }
    }
+
+ private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
+ final Condition startCondition,
+ final MetaStoreThread.BooleanPointer startedServing) {
+ // A simple thread to wait until the server has started and then signal the other threads to
+ // begin
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ do {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Signalling thread was interuppted: " + e.getMessage());
+ }
+ } while (!server.isServing());
+ startLock.lock();
+ try {
+ startedServing.boolVal = true;
+ startCondition.signalAll();
+ } finally {
+ startLock.unlock();
+ }
+ }
+ };
+ t.start();
+ }
+
+ /**
+ * Start threads outside of the thrift service, such as the compactor threads.
+ * @param conf Hive configuration object
+ */
+ private static void startMetaStoreThreads(final HiveConf conf, final Lock startLock,
+ final Condition startCondition, final
+ MetaStoreThread.BooleanPointer startedServing) {
+ // A thread is spun up to start these other threads. That's because we can't start them
+ // until after the TServer has started, but once TServer.serve is called we aren't given back
+ // control.
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ // This is a massive hack. The compactor threads have to access packages in ql (such as
+ // AcidInputFormat). ql depends on metastore so we can't directly access those. To deal
+ // with this the compactor thread classes have been put in ql and they are instantiated here
+ // dyanmically. This is not ideal but it avoids a massive refactoring of Hive packages.
+ //
+ // Wrap the start of the threads in a catch Throwable loop so that any failures
+ // don't doom the rest of the metastore.
+ startLock.lock();
+ try {
+ // Per the javadocs on Condition, do not depend on the condition alone as a start gate
+ // since spurious wake ups are possible.
+ while (!startedServing.boolVal) startCondition.await();
+ startCompactorInitiator(conf);
+ startCompactorWorkers(conf);
+ startCompactorCleaner(conf);
+ } catch (Throwable e) {
+ LOG.error("Failure when starting the compactor, compactions may not happen, " +
+ StringUtils.stringifyException(e));
+ } finally {
+ startLock.unlock();
+ }
+ }
+ };
+
+ t.start();
+ }
+
+ private static void startCompactorInitiator(HiveConf conf) throws Exception {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+ MetaStoreThread initiator =
+ instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
+ initializeAndStartThread(initiator, conf);
+ }
+ }
+
+ private static void startCompactorWorkers(HiveConf conf) throws Exception {
+ int numWorkers = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_THREADS);
+ for (int i = 0; i < numWorkers; i++) {
+ MetaStoreThread worker =
+ instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
+ initializeAndStartThread(worker, conf);
+ }
+ }
+
+ private static void startCompactorCleaner(HiveConf conf) throws Exception {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
+ MetaStoreThread cleaner =
+ instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
+ initializeAndStartThread(cleaner, conf);
+ }
+ }
+
+ private static MetaStoreThread instantiateThread(String classname) throws Exception {
+ Class c = Class.forName(classname);
+ Object o = c.newInstance();
+ if (MetaStoreThread.class.isAssignableFrom(o.getClass())) {
+ return (MetaStoreThread)o;
+ } else {
+ String s = classname + " is not an instance of MetaStoreThread.";
+ LOG.error(s);
+ throw new IOException(s);
+ }
+ }
+
+ private static int nextThreadId = 1000000;
+
+ private static void initializeAndStartThread(MetaStoreThread thread, HiveConf conf) throws
+ MetaException {
+ LOG.info("Starting metastore thread of type " + thread.getClass().getName());
+ thread.setHiveConf(conf);
+ thread.setThreadId(nextThreadId++);
+ thread.init(new MetaStoreThread.BooleanPointer());
+ thread.start();
+ }
  }

Modified: hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/branches/branch-0.13/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Thu Apr 10 22:33:46 2014
@@ -51,13 +51,14 @@ public class CompactionTxnHandler extend
     * @return list of CompactionInfo structs. These will not have id, type,
     * or runAs set since these are only potential compactions not actual ones.
     */
- public List<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
      Connection dbConn = getDbConn();
- List<CompactionInfo> response = new ArrayList<CompactionInfo>();
+ Set<CompactionInfo> response = new HashSet<CompactionInfo>();
      try {
        Statement stmt = dbConn.createStatement();
        // Check for completed transactions
- String s = "select ctc_database, ctc_table, ctc_partition from COMPLETED_TXN_COMPONENTS";
+ String s = "select distinct ctc_database, ctc_table, " +
+ "ctc_partition from COMPLETED_TXN_COMPONENTS";
        LOG.debug("Going to execute query <" + s + ">");
        ResultSet rs = stmt.executeQuery(s);
        while (rs.next()) {

Modified: hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java (original)
+++ hive/branches/branch-0.13/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java Thu Apr 10 22:33:46 2014
@@ -30,6 +30,7 @@ import org.junit.Test;

  import java.util.ArrayList;
  import java.util.List;
+import java.util.Set;

  import static junit.framework.Assert.*;

@@ -317,7 +318,7 @@ public class TestCompactionTxnHandler {
      txnHandler.commitTxn(new CommitTxnRequest(txnid));
      assertEquals(0, txnHandler.numLocksInLockTable());

- List<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
      assertEquals(2, potentials.size());
      boolean sawMyTable = false, sawYourTable = false;
      for (CompactionInfo ci : potentials) {

Modified: hive/branches/branch-0.13/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/pom.xml?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/pom.xml (original)
+++ hive/branches/branch-0.13/ql/pom.xml Thu Apr 10 22:33:46 2014
@@ -352,6 +352,13 @@
          </dependency>
          <dependency>
            <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop-23.version}</version>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop-23.version}</version>
            <optional>true</optional>

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Thu Apr 10 22:33:46 2014
@@ -51,18 +51,29 @@ public class AcidUtils {
    public static final String DELTA_PREFIX = "delta_";
    public static final String BUCKET_PREFIX = "bucket_";

- private static final String BUCKET_DIGITS = "%05d";
- private static final String DELTA_DIGITS = "%07d";
+ public static final String BUCKET_DIGITS = "%05d";
+ public static final String DELTA_DIGITS = "%07d";

    private static final Pattern ORIGINAL_PATTERN =
        Pattern.compile("[0-9]+_[0-9]+");

+ public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+ public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+
    public static final PathFilter hiddenFileFilter = new PathFilter(){
      public boolean accept(Path p){
        String name = p.getName();
        return !name.startsWith("_") && !name.startsWith(".");
      }
    };
+
+ public static final PathFilter bucketFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(BUCKET_PREFIX);
+ }
+ };
+
    private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();

    /**

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java?rev=1586489&r1=1586488&r2=1586489&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java Thu Apr 10 22:33:46 2014
@@ -118,12 +118,16 @@ public class RecordIdentifier implements

    @Override
    public void write(DataOutput dataOutput) throws IOException {
- throw new UnsupportedOperationException("Can't write RecordIdentifier");
+ dataOutput.writeLong(transactionId);
+ dataOutput.writeInt(bucketId);
+ dataOutput.writeLong(rowId);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
- throw new UnsupportedOperationException("Can't read RecordIdentifier");
+ transactionId = dataInput.readLong();
+ bucketId = dataInput.readInt();
+ rowId = dataInput.readLong();
    }

    @Override

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 10, '14 at 10:34p
activeApr 10, '14 at 10:34p
posts1
users1
websitehive.apache.org

1 user in discussion

Omalley: 1 post

People

Translate

site design / logo © 2021 Grokbase