FAQ
Author: gates
Date: Fri Sep 5 18:12:51 2014
New Revision: 1622751

URL: http://svn.apache.org/r1622751
Log:
HIVE-7811 Compactions need to update table/partition stats (Eugene Koifman via Alan Gates)

Added:
     hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/
     hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
     hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
Modified:
     hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
     hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
     hive/trunk/itests/hive-unit/pom.xml
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
     hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Modified: hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (original)
+++ hive/trunk/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Fri Sep 5 18:12:51 2014
@@ -343,7 +343,10 @@ public class HiveEndPoint {
        if (ep.partitionVals.isEmpty()) {
          return;
        }
- SessionState state = SessionState.start(new CliSessionState(conf));
+ SessionState localSession = null;
+ if(SessionState.get() == null) {
+ localSession = SessionState.start(new CliSessionState(conf));
+ }
        Driver driver = new Driver(conf);

        try {
@@ -372,7 +375,9 @@ public class HiveEndPoint {
        } finally {
          driver.close();
          try {
- state.close();
+ if(localSession != null) {
+ localSession.close();
+ }
          } catch (IOException e) {
            LOG.warn("Error closing SessionState used to run Hive DDL.");
          }
@@ -563,11 +568,14 @@ public class HiveEndPoint {

      /**
       * Get Id of currently open transaction
- * @return
+ * @return -1 if there is no open TX
       */
      @Override
      public Long getCurrentTxnId() {
- return txnIds.get(currentTxnIndex);
+ if(currentTxnIndex >= 0) {
+ return txnIds.get(currentTxnIndex);
+ }
+ return -1L;
      }

      /**

Modified: hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (original)
+++ hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Fri Sep 5 18:12:51 2014
@@ -54,6 +54,8 @@ import org.junit.Before;
  import org.junit.Rule;
  import org.junit.Test;
  import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

  import java.io.File;
  import java.io.FileNotFoundException;
@@ -67,6 +69,7 @@ import java.util.Map;


  public class TestStreaming {
+ private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);

    public static class RawFileSystem extends RawLocalFileSystem {
      private static final URI NAME;
@@ -636,18 +639,25 @@ public class TestStreaming {
      connection.close();
    }

- class WriterThd extends Thread {
+ private static class WriterThd extends Thread {

- private StreamingConnection conn;
- private HiveEndPoint ep;
- private DelimitedInputWriter writer;
- private String data;
+ private final StreamingConnection conn;
+ private final DelimitedInputWriter writer;
+ private final String data;
+ private Throwable error;

      WriterThd(HiveEndPoint ep, String data) throws Exception {
- this.ep = ep;
+ super("Writer_" + data);
        writer = new DelimitedInputWriter(fieldNames, ",", ep);
        conn = ep.newConnection(false);
        this.data = data;
+ setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ error = throwable;
+ LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
+ }
+ });
      }

      @Override
@@ -668,14 +678,14 @@ public class TestStreaming {
            try {
              txnBatch.close();
            } catch (Exception e) {
+ LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
              conn.close();
- throw new RuntimeException(e);
            }
          }
          try {
            conn.close();
          } catch (Exception e) {
- throw new RuntimeException(e);
+ LOG.error("conn.close() failed: " + e.getMessage(), e);
          }

        }
@@ -685,18 +695,23 @@ public class TestStreaming {
    @Test
    public void testConcurrentTransactionBatchCommits() throws Exception {
      final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
- WriterThd t1 = new WriterThd(ep, "1,Matrix");
- WriterThd t2 = new WriterThd(ep, "2,Gandhi");
- WriterThd t3 = new WriterThd(ep, "3,Silence");
-
- t1.start();
- t2.start();
- t3.start();
-
- t1.join();
- t2.join();
- t3.join();
-
+ List<WriterThd> writers = new ArrayList<WriterThd>(3);
+ writers.add(new WriterThd(ep, "1,Matrix"));
+ writers.add(new WriterThd(ep, "2,Gandhi"));
+ writers.add(new WriterThd(ep, "3,Silence"));
+
+ for(WriterThd w : writers) {
+ w.start();
+ }
+ for(WriterThd w : writers) {
+ w.join();
+ }
+ for(WriterThd w : writers) {
+ if(w.error != null) {
+ Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() +
+ " See log file for stack trace", true);
+ }
+ }
    }

    // delete db and all tables in it

Modified: hive/trunk/itests/hive-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/pom.xml?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/pom.xml (original)
+++ hive/trunk/itests/hive-unit/pom.xml Fri Sep 5 18:12:51 2014
@@ -53,6 +53,16 @@
        <artifactId>hive-exec</artifactId>
        <version>${project.version}</version>
      </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-streaming</artifactId>
+ <version>${project.version}</version>
+ </dependency>

      <!-- dependencies are always listed in sorted order by groupId, artifectId -->
      <!-- test intra-project -->

Added: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1622751&view=auto
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (added)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Fri Sep 5 18:12:51 2014
@@ -0,0 +1,310 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class TestCompactor {
+ private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class);
+ private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
+ File.separator + TestCompactor.class.getCanonicalName() + "-" + System.currentTimeMillis());
+ private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+
+ @Rule
+ public TemporaryFolder stagingFolder = new TemporaryFolder();
+ private HiveConf conf;
+ IMetaStoreClient msClient;
+ private Driver driver;
+
+ @Before
+ public void setup() throws Exception {
+
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if(!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+ }
+
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
+
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ conf = hiveConf;
+ msClient = new HiveMetaStoreClient(conf);
+ driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+
+
+ int LOOP_SIZE = 3;
+ String[] input = new String[LOOP_SIZE * LOOP_SIZE];
+ int k = 0;
+ for (int i = 1; i <= LOOP_SIZE; i++) {
+ String si = i + "";
+ for (int j = 1; j <= LOOP_SIZE; j++) {
+ String sj = "S" + j + "S";
+ input[k] = si + "\t" + sj;
+ k++;
+ }
+ }
+ createTestDataFile(BASIC_FILE_NAME, input);
+ }
+ @After
+ public void tearDown() {
+ conf = null;
+ if(msClient != null) {
+ msClient.close();
+ }
+ if(driver != null) {
+ driver.close();
+ }
+ }
+
+ /**
+ * After each major compaction, stats need to be updated on each column of the
+ * table/partition which previously had stats.
+ * 1. create a bucketed ORC backed table (Orc is currently required by ACID)
+ * 2. populate 2 partitions with data
+ * 3. compute stats
+ * 4. insert some data into the table using StreamingAPI
+ * 5. Trigger major compaction (which should update stats)
+ * 6. check that stats have been updated
+ * @throws Exception
+ * todo:
+ * 2. add non-partitioned test
+ * 4. add a test with sorted table?
+ */
+ @Test
+ public void testStatsAfterCompactionPartTbl() throws Exception {
+ //as of (8/27/2014) Hive 0.14, ACID/Orc requires HiveInputFormat
+ String tblName = "compaction_test";
+ String tblNameStg = tblName + "_stg";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("drop table if exists " + tblNameStg, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(bkt INT)" +
+ " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC", driver);
+ executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
+ " STORED AS TEXTFILE" +
+ " LOCATION '" + stagingFolder.newFolder() + "'", driver);
+
+ executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
+ "' overwrite into table " + tblNameStg, driver);
+ execSelectAndDumpData("select * from " + tblNameStg, driver, "Dumping data for " +
+ tblNameStg + " after load:");
+ executeStatementOnDriver("FROM " + tblNameStg +
+ " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=0) " +
+ "SELECT a, b where a < 2", driver);
+ executeStatementOnDriver("FROM " + tblNameStg +
+ " INSERT OVERWRITE TABLE " + tblName + " PARTITION(bkt=1) " +
+ "SELECT a, b where a >= 2", driver);
+ execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
+ tblName + " after load:");
+
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
+ LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
+ Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
+ System.getProperty("user.name"));
+ su.gatherStats();//compute stats before compaction
+ LOG.debug("List of stats columns after analyze Part1: " + txnHandler.findColumnsWithStats(ci));
+
+ CompactionInfo ciPart2 = new CompactionInfo("default", tblName, "bkt=1", CompactionType.MAJOR);
+ LOG.debug("List of stats columns before analyze Part2: " + txnHandler.findColumnsWithStats(ci));
+ su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name"));
+ su.gatherStats();//compute stats before compaction
+ LOG.debug("List of stats columns after analyze Part2: " + txnHandler.findColumnsWithStats(ci));
+
+ //now make sure we get the stats we expect for partition we are going to add data to later
+ Map<String, List<ColumnStatisticsObj>> stats = msClient.getPartitionColumnStatistics(ci.dbname,
+ ci.tableName, Arrays.asList(ci.partName), colNames);
+ List<ColumnStatisticsObj> colStats = stats.get(ci.partName);
+ Assert.assertNotNull("No stats found for partition " + ci.partName, colStats);
+ Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName());
+ Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName());
+ LongColumnStatsData colAStats = colStats.get(0).getStatsData().getLongStats();
+ Assert.assertEquals("lowValue a", 1, colAStats.getLowValue());
+ Assert.assertEquals("highValue a", 1, colAStats.getHighValue());
+ Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls());
+ Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs());
+ StringColumnStatsData colBStats = colStats.get(1).getStatsData().getStringStats();
+ Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen());
+ Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen());
+ Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
+ Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs());
+
+ //now save stats for partition we won't modify
+ stats = msClient.getPartitionColumnStatistics(ciPart2.dbname,
+ ciPart2.tableName, Arrays.asList(ciPart2.partName), colNames);
+ colStats = stats.get(ciPart2.partName);
+ LongColumnStatsData colAStatsPart2 = colStats.get(0).getStatsData().getLongStats();
+ StringColumnStatsData colBStatsPart2 = colStats.get(1).getStatsData().getStringStats();
+
+
+ HiveEndPoint endPt = new HiveEndPoint(null, ci.dbname, ci.tableName, Arrays.asList("0"));
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ /*next call will eventually end up in HiveEndPoint.createPartitionIfNotExists() which
+ makes an operation on Driver
+ * and starts it's own CliSessionState and then closes it, which removes it from ThreadLoacal;
+ * thus the session
+ * created in this class is gone after this; I fixed it in HiveEndPoint*/
+ StreamingConnection connection = endPt.newConnection(true);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ Assert.assertEquals(TransactionBatch.TxnState.OPEN, txnBatch.getCurrentTransactionState());
+ txnBatch.write("50,Kiev".getBytes());
+ txnBatch.write("51,St. Petersburg".getBytes());
+ txnBatch.write("44,Boston".getBytes());
+ txnBatch.commit();
+
+ txnBatch.beginNextTransaction();
+ txnBatch.write("52,Tel Aviv".getBytes());
+ txnBatch.write("53,Atlantis".getBytes());
+ txnBatch.write("53,Boston".getBytes());
+ txnBatch.commit();
+
+ txnBatch.close();
+ connection.close();
+ execSelectAndDumpData("select * from " + ci.getFullTableName(), driver, ci.getFullTableName());
+
+ //so now we have written some new data to bkt=0 and it shows up
+ CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, CompactionType.MAJOR);
+ rqst.setPartitionname(ci.partName);
+ txnHandler.compact(rqst);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+ stop.boolVal = true;
+ t.init(stop);
+ t.run();
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
+ Arrays.asList(ci.partName), colNames);
+ colStats = stats.get(ci.partName);
+ Assert.assertNotNull("No stats found for partition " + ci.partName, colStats);
+ Assert.assertEquals("Expected column 'a' at index 0", "a", colStats.get(0).getColName());
+ Assert.assertEquals("Expected column 'b' at index 1", "b", colStats.get(1).getColName());
+ colAStats = colStats.get(0).getStatsData().getLongStats();
+ Assert.assertEquals("lowValue a", 1, colAStats.getLowValue());
+ Assert.assertEquals("highValue a", 53, colAStats.getHighValue());
+ Assert.assertEquals("numNulls a", 0, colAStats.getNumNulls());
+ Assert.assertEquals("numNdv a", 6, colAStats.getNumDVs());
+ colBStats = colStats.get(1).getStatsData().getStringStats();
+ Assert.assertEquals("maxColLen b", 14, colBStats.getMaxColLen());
+ //cast it to long to get rid of periodic decimal
+ Assert.assertEquals("avgColLen b", (long)6.1111111111, (long)colBStats.getAvgColLen());
+ Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
+ Assert.assertEquals("nunDVs", 10, colBStats.getNumDVs());
+
+ //now check that stats for partition we didn't modify did not change
+ stats = msClient.getPartitionColumnStatistics(ciPart2.dbname, ciPart2.tableName,
+ Arrays.asList(ciPart2.partName), colNames);
+ colStats = stats.get(ciPart2.partName);
+ Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same",
+ colAStatsPart2, colStats.get(0).getStatsData().getLongStats());
+ Assert.assertEquals("Expected stats for " + ciPart2.partName + " to stay the same",
+ colBStatsPart2, colStats.get(1).getStatsData().getStringStats());
+ }
+
+ /**
+ * convenience method to execute a select stmt and dump results to log file
+ */
+ private static void execSelectAndDumpData(String selectStmt, Driver driver, String msg)
+ throws Exception {
+ executeStatementOnDriver(selectStmt, driver);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ int rowIdx = 0;
+ LOG.debug(msg);
+ for(String row : valuesReadFromHiveDriver) {
+ LOG.debug(" rowIdx=" + rowIdx++ + ":" + row);
+ }
+ }
+ /**
+ * Execute Hive CLI statement
+ * @param cmd arbitrary statement to execute
+ */
+ static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
+ LOG.debug("Executing: " + cmd);
+ CommandProcessorResponse cpr = driver.run(cmd);
+ if(cpr.getResponseCode() != 0) {
+ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+ }
+ }
+ static void createTestDataFile(String filename, String[] lines) throws IOException {
+ FileWriter writer = null;
+ try {
+ File file = new File(filename);
+ file.deleteOnExit();
+ writer = new FileWriter(file);
+ for (String line : lines) {
+ writer.write(line + "\n");
+ }
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ }
+}

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java Fri Sep 5 18:12:51 2014
@@ -34,9 +34,17 @@ public class CompactionInfo {
    private String fullPartitionName = null;
    private String fullTableName = null;

+ public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) {
+ this.dbname = dbname;
+ this.tableName = tableName;
+ this.partName = partName;
+ this.type = type;
+ }
+ CompactionInfo() {}
+
    public String getFullPartitionName() {
      if (fullPartitionName == null) {
- StringBuffer buf = new StringBuffer(dbname);
+ StringBuilder buf = new StringBuilder(dbname);
        buf.append('.');
        buf.append(tableName);
        if (partName != null) {
@@ -50,11 +58,14 @@ public class CompactionInfo {

    public String getFullTableName() {
      if (fullTableName == null) {
- StringBuffer buf = new StringBuffer(dbname);
+ StringBuilder buf = new StringBuilder(dbname);
        buf.append('.');
        buf.append(tableName);
        fullTableName = buf.toString();
      }
      return fullTableName;
    }
+ public boolean isMajorCompaction() {
+ return CompactionType.MAJOR == type;
+ }
  }

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Fri Sep 5 18:12:51 2014
@@ -535,6 +535,46 @@ public class CompactionTxnHandler extend
        deadlockCnt = 0;
      }
    }
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = dbConn.createStatement();
+ String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")
+ + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+ + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");
+ LOG.debug("Going to execute <" + s + ">");
+ rs = stmt.executeQuery(s);
+ List<String> columns = new ArrayList<String>();
+ while(rs.next()) {
+ columns.add(rs.getString(1));
+ }
+ LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+ (ci.partName == null ? "" : "/" + ci.partName));
+ dbConn.commit();
+ return columns;
+ } catch (SQLException e) {
+ try {
+ LOG.error("Failed to find columns to analyze stats on for " + ci.tableName +
+ (ci.partName == null ? "" : "/" + ci.partName), e);
+ dbConn.rollback();
+ } catch (SQLException e1) {
+ //nothing we can do here
+ }
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ }
  }



Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Fri Sep 5 18:12:51 2014
@@ -860,6 +860,29 @@ public class TxnHandler {
    }

    /**
+ * Close the ResultSet.
+ * @param rs may be {@code null}
+ */
+ void close(ResultSet rs) {
+ try {
+ if (rs != null && !rs.isClosed()) {
+ rs.close();
+ }
+ }
+ catch(SQLException ex) {
+ LOG.warn("Failed to close statement " + ex.getMessage());
+ }
+ }
+
+ /**
+ * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
+ */
+ void close(ResultSet rs, Statement stmt, Connection dbConn) {
+ close(rs);
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ /**
     * Determine if an exception was a deadlock. Unfortunately there is no standard way to do
     * this, so we have to inspect the error messages and catch the telltale signs for each
     * different database.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java Fri Sep 5 18:12:51 2014
@@ -76,4 +76,9 @@ public class CommandProcessorResponse {
    public String getSQLState() { return SQLState; }
    public Schema getSchema() { return resSchema; }
    public Throwable getException() { return exception; }
+ public String toString() {
+ return "(" + responseCode + "," + errorMessage + "," + SQLState +
+ (resSchema == null ? "" : ",") +
+ (exception == null ? "" : exception.getMessage()) + ")";
+ }
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Fri Sep 5 18:12:51 2014
@@ -100,7 +100,7 @@ public class CompactorMR {
     * @throws java.io.IOException if the job fails
     */
    void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, boolean isMajor) throws IOException {
+ ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException {
      JobConf job = new JobConf(conf);
      job.setJobName(jobName);
      job.setOutputKeyClass(NullWritable.class);
@@ -182,6 +182,7 @@ public class CompactorMR {
      LOG.debug("Setting maximume transaction to " + maxTxn);

      JobClient.runJob(job).waitForCompletion();
+ su.gatherStats();
    }

    /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Fri Sep 5 18:12:51 2014
@@ -20,20 +20,28 @@ package org.apache.hadoop.hive.ql.txn.co
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
  import org.apache.hadoop.hive.metastore.api.MetaException;
  import org.apache.hadoop.hive.metastore.api.Partition;
  import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
  import org.apache.hadoop.hive.metastore.api.Table;
  import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
  import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.security.UserGroupInformation;
  import org.apache.hadoop.util.StringUtils;

+import java.io.IOException;
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;

  /**
   * A class to do compactions. This will run in a separate thread. It will spin on the
@@ -110,7 +118,7 @@ public class Worker extends CompactorThr
            continue;
          }

- final boolean isMajor = (ci.type == CompactionType.MAJOR);
+ final boolean isMajor = ci.isMajorCompaction();
          final ValidTxnList txns =
              TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
          final StringBuffer jobName = new StringBuffer(name);
@@ -129,17 +137,19 @@ public class Worker extends CompactorThr
          LOG.info("Starting " + ci.type.toString() + " compaction for " +
              ci.getFullPartitionName());

+ final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf,
+ runJobAsSelf(runAs) ? runAs : t.getOwner());
          final CompactorMR mr = new CompactorMR();
          try {
            if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+ mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
            } else {
              UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
                UserGroupInformation.getLoginUser());
              ugi.doAs(new PrivilegedExceptionAction<Object>() {
                @Override
                public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, sd, txns, isMajor);
+ mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
                  return null;
                }
              });
@@ -161,11 +171,95 @@ public class Worker extends CompactorThr
    public void init(BooleanPointer stop) throws MetaException {
      super.init(stop);

- StringBuffer name = new StringBuffer(hostname());
+ StringBuilder name = new StringBuilder(hostname());
      name.append("-");
      name.append(getId());
      this.name = name.toString();
      setName(name.toString());
    }

+ static final class StatsUpdater {
+ static final private Log LOG = LogFactory.getLog(StatsUpdater.class);
+
+ public static StatsUpdater init(CompactionInfo ci, List<String> columnListForStats,
+ HiveConf conf, String userName) {
+ return new StatsUpdater(ci, columnListForStats, conf, userName);
+ }
+ /**
+ * list columns for which to compute stats. This maybe empty which means no stats gathering
+ * is needed.
+ */
+ private final List<String> columnList;
+ private final HiveConf conf;
+ private final String userName;
+ private final CompactionInfo ci;
+
+ private StatsUpdater(CompactionInfo ci, List<String> columnListForStats,
+ HiveConf conf, String userName) {
+ this.conf = conf;
+ this.userName = userName;
+ this.ci = ci;
+ if(!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) {
+ columnList = Collections.emptyList();
+ return;
+ }
+ columnList = columnListForStats;
+ }
+
+ /**
+ * todo: what should this do on failure? Should it rethrow? Invalidate stats?
+ */
+ void gatherStats() throws IOException {
+ if(!ci.isMajorCompaction()) {
+ return;
+ }
+ if(columnList.isEmpty()) {
+ LOG.debug("No existing stats for " + ci.dbname + "." + ci.tableName + " found. Will not run analyze.");
+ return;//nothing to do
+ }
+ //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’)
+ // compute statistics for columns viewtime
+ StringBuilder sb = new StringBuilder("analyze table ").append(ci.dbname).append(".").append(ci.tableName);
+ if(ci.partName != null) {
+ try {
+ sb.append(" partition(");
+ Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
+ for(Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
+ sb.append(ent.getKey()).append("='").append(ent.getValue()).append("'");
+ }
+ sb.append(")");
+ }
+ catch(MetaException ex) {
+ throw new IOException(ex);
+ }
+ }
+ sb.append(" compute statistics for columns ");
+ for(String colName : columnList) {
+ sb.append(colName).append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing ,
+ LOG.debug("running '" + sb.toString() + "'");
+ Driver d = new Driver(conf, userName);
+ SessionState localSession = null;
+ if(SessionState.get() == null) {
+ localSession = SessionState.start(new SessionState(conf));
+ }
+ try {
+ CommandProcessorResponse cpr = d.run(sb.toString());
+ if (cpr.getResponseCode() != 0) {
+ throw new IOException("Could not update stats for table " + ci.getFullTableName() +
+ (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cpr);
+ }
+ }
+ catch(CommandNeedRetryException cnre) {
+ throw new IOException("Could not update stats for table " + ci.getFullTableName() +
+ (ci.partName == null ? "" : "/" + ci.partName) + " due to: " + cnre.getMessage());
+ }
+ finally {
+ if(localSession != null) {
+ localSession.close();
+ }
+ }
+ }
+ }
  }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1622751&r1=1622750&r2=1622751&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Fri Sep 5 18:12:51 2014
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.*;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.*;
  import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.junit.After;
  import org.junit.Before;
  import org.junit.Test;

@@ -280,7 +279,7 @@ public class TestWorker extends Compacto
      // There should still now be 5 directories in the location
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString());
+ for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString());
      Assert.assertEquals(4, stat.length);

      // Find the new delta file and make sure it has the right contents
@@ -507,7 +506,7 @@ for (int i = 0; i < stat.length; i++) Sy
      Assert.assertEquals(1, compacts.size());
      Assert.assertEquals("ready for cleaning", compacts.get(0).getState());

- // There should still now be 5 directories in the location
+ // There should now be 3 directories in the location
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
      Assert.assertEquals(3, stat.length);

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedSep 5, '14 at 6:13p
activeSep 5, '14 at 6:13p
posts1
users1
websitehive.apache.org

1 user in discussion

Gates: 1 post

People

Translate

site design / logo © 2021 Grokbase