FAQ
Repository: hive
Updated Branches:
   refs/heads/master 51efcb80e -> 4e9f95a1b


HIVE-10249 ACID: show locks should show who the lock is waiting for (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4e9f95a1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4e9f95a1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4e9f95a1

Branch: refs/heads/master
Commit: 4e9f95a1bad89ac4ea0cefc65eeba7a1e56a948d
Parents: 51efcb8
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Wed Mar 30 12:17:06 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Wed Mar 30 12:17:06 2016 -0700

----------------------------------------------------------------------
  .../hadoop/hive/metastore/txn/TxnDbUtil.java | 6 ++-
  .../hadoop/hive/metastore/txn/TxnHandler.java | 46 ++++++++++++++++----
  .../hive/metastore/txn/TestTxnHandler.java | 2 +-
  .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 ++++++-
  .../hive/ql/lockmgr/TestDbTxnManager2.java | 28 ++++++++++++
  .../clientpositive/dbtxnmgr_showlocks.q.out | 6 +--
  6 files changed, 89 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4e9f95a1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index df480ea..c82d23a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -103,7 +103,11 @@ public final class TxnDbUtil {
            " HL_ACQUIRED_AT bigint," +
            " HL_USER varchar(128) NOT NULL," +
            " HL_HOST varchar(128) NOT NULL," +
- " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+ " HL_HEARTBEAT_COUNT integer," +
+ " HL_AGENT_INFO varchar(128)," +
+ " HL_BLOCKEDBY_EXT_ID bigint," +
+ " HL_BLOCKEDBY_INT_ID bigint," +
+ " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
        stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");

        stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");

http://git-wip-us.apache.org/repos/asf/hive/blob/4e9f95a1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 21faff4..be3c6de 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -847,8 +847,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     */
    private static class LockInfoExt extends LockInfo {
      private final ShowLocksResponseElement e;
- LockInfoExt(ShowLocksResponseElement e, long intLockId) {
- super(e, intLockId);
+ LockInfoExt(ShowLocksResponseElement e) {
+ super(e);
        this.e = e;
      }
    }
@@ -864,7 +864,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          stmt = dbConn.createStatement();

          String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS";
+ "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
+ "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS";
          LOG.debug("Doing to execute query <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
          while (rs.next()) {
@@ -892,7 +893,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
            e.setUser(rs.getString(10));
            e.setHostname(rs.getString(11));
- sortedList.add(new LockInfoExt(e, rs.getLong(12)));
+ e.setLockIdInternal(rs.getLong(12));
+ long id = rs.getLong(13);
+ if(!rs.wasNull()) {
+ e.setBlockedByExtId(id);
+ }
+ id = rs.getLong(14);
+ if(!rs.wasNull()) {
+ e.setBlockedByIntId(id);
+ }
+ sortedList.add(new LockInfoExt(e));
          }
          LOG.debug("Going to rollback");
          dbConn.rollback();
@@ -1142,6 +1152,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    private static void shouldNeverHappen(long txnid) {
      throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
    }
+ private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) {
+ throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
+ + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
+ }
    public void addDynamicPartitions(AddDynamicPartitions rqst)
        throws NoSuchTxnException, TxnAbortedException, MetaException {
      Connection dbConn = null;
@@ -1673,15 +1687,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        }
        txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL
      }
- LockInfo(ShowLocksResponseElement e, long intLockId) {
+ LockInfo(ShowLocksResponseElement e) {
        extLockId = e.getLockid();
- this.intLockId = intLockId;
+ intLockId = e.getLockIdInternal();
+ txnId = e.getTxnid();
        db = e.getDbname();
        table = e.getTablename();
        partition = e.getPartname();
        state = e.getState();
        type = e.getType();
- txnId = e.getTxnid();
      }

      public boolean equals(Object other) {
@@ -1980,9 +1994,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

      LOG.debug("Going to execute query <" + query.toString() + ">");
      Statement stmt = null;
+ ResultSet rs = null;
      try {
        stmt = dbConn.createStatement();
- ResultSet rs = stmt.executeQuery(query.toString());
+ rs = stmt.executeQuery(query.toString());
        SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
        while (rs.next()) {
          lockSet.add(new LockInfo(rs));
@@ -2053,7 +2068,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            switch (lockAction) {
              case WAIT:
                if(!ignoreConflict(info, locks[i])) {
+ /*we acquire all locks for a given query atomically; if 1 blocks, all go into (remain) in
+ * Waiting state. wait() will undo any 'acquire()' which may have happened as part of
+ * this (metastore db) transaction and then we record which lock blocked the lock
+ * we were testing ('info').*/
                  wait(dbConn, save);
+ String sqlText = "update HIVE_LOCKS" +
+ " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId +
+ ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId +
+ " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId;
+ LOG.debug("Executing sql: " + sqlText);
+ int updCnt = stmt.executeUpdate(sqlText);
+ if(updCnt != 1) {
+ shouldNeverHappen(info.txnId, info.extLockId, info.intLockId);
+ }
                  LOG.debug("Going to commit");
                  dbConn.commit();
                  response.setState(LockState.WAITING);
@@ -2082,7 +2110,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        dbConn.commit();
        response.setState(LockState.ACQUIRED);
      } finally {
- closeStmt(stmt);
+ close(rs, stmt, null);
      }
      return response;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/4e9f95a1/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 26a660a..37eacde 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1153,7 +1153,7 @@ public class TestTxnHandler {
    }

    @Test
- @Ignore
+ @Ignore("Wedges Derby")
    public void deadlockDetected() throws Exception {
      LOG.debug("Starting deadlock test");
      if (txnHandler instanceof TxnHandler) {

http://git-wip-us.apache.org/repos/asf/hive/blob/4e9f95a1/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 56eecf6..b26f09d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2541,6 +2541,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
      os.write(separator);
      os.writeBytes("State");
      os.write(separator);
+ os.writeBytes("Blocked By");
+ os.write(separator);
      os.writeBytes("Type");
      os.write(separator);
      os.writeBytes("Transaction ID");
@@ -2557,7 +2559,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
      List<ShowLocksResponseElement> locks = rsp.getLocks();
      if (locks != null) {
        for (ShowLocksResponseElement lock : locks) {
- os.writeBytes(Long.toString(lock.getLockid()));
+ if(lock.isSetLockIdInternal()) {
+ os.writeBytes(Long.toString(lock.getLockid()) + "." + Long.toString(lock.getLockIdInternal()));
+ }
+ else {
+ os.writeBytes(Long.toString(lock.getLockid()));
+ }
          os.write(separator);
          os.writeBytes(lock.getDbname());
          os.write(separator);
@@ -2567,6 +2574,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
          os.write(separator);
          os.writeBytes(lock.getState().toString());
          os.write(separator);
+ if(lock.isSetBlockedByExtId()) {//both "blockedby" are either there or not
+ os.writeBytes(Long.toString(lock.getBlockedByExtId()) + "." + Long.toString(lock.getBlockedByIntId()));
+ }
+ else {
+ os.writeBytes(" ");//12 chars - try to keep cols aligned
+ }
+ os.write(separator);
          os.writeBytes(lock.getType().toString());
          os.write(separator);
          os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid()));

http://git-wip-us.apache.org/repos/asf/hive/blob/4e9f95a1/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index d1b370e..836b507 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -240,6 +240,34 @@ public class TestDbTxnManager2 {
      otherTxnMgr.closeTxnManager();
    }

+ /**
+ * check that locks in Waiting state show what they are waiting on
+ * This test is somewhat abusive in that it make DbLockManager retain locks for 2
+ * different queries (which are not part of the same transaction) which can never
+ * happen in real use cases... but it makes testing convenient.
+ * @throws Exception
+ */
+ @Test
+ public void testLockBlockedBy() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.compileAndRespond("select * from TAB_BLOCKED");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks.get(0));
+ cpr = driver.compileAndRespond("drop table TAB_BLOCKED");
+ checkCmdOnDriver(cpr);
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "TAB_BLOCKED", null, locks.get(1));
+ Assert.assertEquals("BlockedByExtId doesn't match", locks.get(0).getLockid(), locks.get(1).getBlockedByExtId());
+ Assert.assertEquals("BlockedByIntId doesn't match", locks.get(0).getLockIdInternal(), locks.get(1).getBlockedByIntId());
+ }
+
    @Test
    public void testDummyTxnManagerOnAcidTable() throws Exception {
      // Create an ACID table with DbTxnManager

http://git-wip-us.apache.org/repos/asf/hive/blob/4e9f95a1/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index d9d2ed6..46d8ea1 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -2,17 +2,17 @@ PREHOOK: query: show locks
  PREHOOK: type: SHOWLOCKS
  POSTHOOK: query: show locks
  POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Type Transaction ID Last Hearbeat Acquired At User Hostname
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User
  PREHOOK: query: show locks extended
  PREHOOK: type: SHOWLOCKS
  POSTHOOK: query: show locks extended
  POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Type Transaction ID Last Hearbeat Acquired At User Hostname
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User
  PREHOOK: query: show locks default
  PREHOOK: type: SHOWLOCKS
  POSTHOOK: query: show locks default
  POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Type Transaction ID Last Hearbeat Acquired At User Hostname
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User
  PREHOOK: query: show transactions
  PREHOOK: type: SHOW TRANSACTIONS
  POSTHOOK: query: show transactions

Search Discussions

  • Ekoifman at Mar 30, 2016 at 7:20 pm
    Repository: hive
    Updated Branches:
       refs/heads/branch-1 a27595115 -> b6f6c4acb


    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
    deleted file mode 100644
    index 15ee24c..0000000
    --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
    +++ /dev/null
    @@ -1,1150 +0,0 @@
    -/**
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements. See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership. The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License. You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.hadoop.hive.ql.io.orc;
    -
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -import org.apache.hadoop.conf.Configuration;
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.fs.Path;
    -import org.apache.hadoop.hive.common.ValidTxnList;
    -import org.apache.hadoop.hive.common.ValidReadTxnList;
    -import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
    -import org.apache.hadoop.hive.ql.io.AcidUtils;
    -import org.apache.hadoop.hive.ql.io.RecordIdentifier;
    -import org.apache.hadoop.hive.ql.io.RecordUpdater;
    -import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
    -import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
    -import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
    -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    -import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    -import org.apache.hadoop.io.IntWritable;
    -import org.apache.hadoop.io.LongWritable;
    -import org.apache.hadoop.io.NullWritable;
    -import org.apache.hadoop.io.Text;
    -import org.apache.hadoop.mapred.InputFormat;
    -import org.apache.hadoop.mapred.InputSplit;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.Reporter;
    -import org.junit.Test;
    -import org.mockito.MockSettings;
    -import org.mockito.Mockito;
    -
    -import java.io.File;
    -import java.io.IOException;
    -import java.nio.ByteBuffer;
    -import java.util.ArrayList;
    -import java.util.List;
    -
    -import static org.junit.Assert.assertEquals;
    -import static org.junit.Assert.assertTrue;
    -import static org.junit.Assert.assertFalse;
    -import static org.junit.Assert.assertNull;
    -
    -public class TestOrcRawRecordMerger {
    -
    - private static final Logger LOG = LoggerFactory.getLogger(TestOrcRawRecordMerger.class);
    -//todo: why is statementId -1?
    - @Test
    - public void testOrdering() throws Exception {
    - ReaderKey left = new ReaderKey(100, 200, 1200, 300);
    - ReaderKey right = new ReaderKey();
    - right.setValues(100, 200, 1000, 200,1);
    - assertTrue(right.compareTo(left) < 0);
    - assertTrue(left.compareTo(right) > 0);
    - assertEquals(false, left.equals(right));
    - left.set(right);
    - assertTrue(right.compareTo(left) == 0);
    - assertEquals(true, right.equals(left));
    - right.setRowId(2000);
    - assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 4,-1);
    - right.setValues(100, 2, 3, 4,-1);
    - assertTrue(left.compareTo(right) < 0);
    - assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 4,-1);
    - right.setValues(1, 100, 3, 4,-1);
    - assertTrue(left.compareTo(right) < 0);
    - assertTrue(right.compareTo(left) > 0);
    - left.setValues(1, 2, 3, 100,-1);
    - right.setValues(1, 2, 3, 4,-1);
    - assertTrue(left.compareTo(right) < 0);
    - assertTrue(right.compareTo(left) > 0);
    -
    - // ensure that we are consistent when comparing to the base class
    - RecordIdentifier ri = new RecordIdentifier(1, 2, 3);
    - assertEquals(1, ri.compareTo(left));
    - assertEquals(-1, left.compareTo(ri));
    - assertEquals(false, ri.equals(left));
    - assertEquals(false, left.equals(ri));
    - }
    -
    - private static void setRow(OrcStruct event,
    - int operation,
    - long originalTransaction,
    - int bucket,
    - long rowId,
    - long currentTransaction,
    - String value) {
    - event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
    - event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
    - new LongWritable(originalTransaction));
    - event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket));
    - event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(rowId));
    - event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
    - new LongWritable(currentTransaction));
    - OrcStruct row = new OrcStruct(1);
    - row.setFieldValue(0, new Text(value));
    - event.setFieldValue(OrcRecordUpdater.ROW, row);
    - }
    -
    - private static String value(OrcStruct event) {
    - return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
    - }
    -
    - private List<StripeInformation> createStripes(long... rowCounts) {
    - long offset = 0;
    - List<StripeInformation> result =
    - new ArrayList<StripeInformation>(rowCounts.length);
    - for(long count: rowCounts) {
    - OrcProto.StripeInformation.Builder stripe =
    - OrcProto.StripeInformation.newBuilder();
    - stripe.setDataLength(800).setIndexLength(100).setFooterLength(100)
    - .setNumberOfRows(count).setOffset(offset);
    - offset += 1000;
    - result.add(new ReaderImpl.StripeInformationImpl(stripe.build()));
    - }
    - return result;
    - }
    -
    - // can add .verboseLogging() to cause Mockito to log invocations
    - private final MockSettings settings = Mockito.withSettings();
    - private final Path tmpDir = new Path(System.getProperty("test.tmp.dir",
    - "target" + File.separator + "test" + File.separator + "tmp"));
    -
    - private Reader createMockReader() throws IOException {
    - Reader reader = Mockito.mock(Reader.class, settings);
    - RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
    - OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
    - OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
    - OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
    - OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
    - OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
    - Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
    - .thenReturn(recordReader);
    -
    - Mockito.when(recordReader.hasNext()).
    - thenReturn(true, true, true, true, true, false);
    -
    - Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
    -
    - Mockito.when(recordReader.next(null)).thenReturn(row1);
    - Mockito.when(recordReader.next(row1)).thenReturn(row2);
    - Mockito.when(recordReader.next(row2)).thenReturn(row3);
    - Mockito.when(recordReader.next(row3)).thenReturn(row4);
    - Mockito.when(recordReader.next(row4)).thenReturn(row5);
    -
    - return reader;
    - }
    -
    - @Test
    - public void testReaderPair() throws Exception {
    - ReaderKey key = new ReaderKey();
    - Reader reader = createMockReader();
    - RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
    - RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
    - ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
    - new Reader.Options(), 0);
    - RecordReader recordReader = pair.recordReader;
    - assertEquals(10, key.getTransactionId());
    - assertEquals(20, key.getBucketId());
    - assertEquals(40, key.getRowId());
    - assertEquals(120, key.getCurrentTransactionId());
    - assertEquals("third", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(40, key.getTransactionId());
    - assertEquals(50, key.getBucketId());
    - assertEquals(60, key.getRowId());
    - assertEquals(130, key.getCurrentTransactionId());
    - assertEquals("fourth", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(null, pair.nextRecord);
    - Mockito.verify(recordReader).close();
    - }
    -
    - @Test
    - public void testReaderPairNoMin() throws Exception {
    - ReaderKey key = new ReaderKey();
    - Reader reader = createMockReader();
    -
    - ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
    - new Reader.Options(), 0);
    - RecordReader recordReader = pair.recordReader;
    - assertEquals(10, key.getTransactionId());
    - assertEquals(20, key.getBucketId());
    - assertEquals(20, key.getRowId());
    - assertEquals(100, key.getCurrentTransactionId());
    - assertEquals("first", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(10, key.getTransactionId());
    - assertEquals(20, key.getBucketId());
    - assertEquals(30, key.getRowId());
    - assertEquals(110, key.getCurrentTransactionId());
    - assertEquals("second", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(10, key.getTransactionId());
    - assertEquals(20, key.getBucketId());
    - assertEquals(40, key.getRowId());
    - assertEquals(120, key.getCurrentTransactionId());
    - assertEquals("third", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(40, key.getTransactionId());
    - assertEquals(50, key.getBucketId());
    - assertEquals(60, key.getRowId());
    - assertEquals(130, key.getCurrentTransactionId());
    - assertEquals("fourth", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(40, key.getTransactionId());
    - assertEquals(50, key.getBucketId());
    - assertEquals(61, key.getRowId());
    - assertEquals(140, key.getCurrentTransactionId());
    - assertEquals("fifth", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(null, pair.nextRecord);
    - Mockito.verify(recordReader).close();
    - }
    -
    - private static OrcStruct createOriginalRow(String value) {
    - OrcStruct result = new OrcStruct(1);
    - result.setFieldValue(0, new Text(value));
    - return result;
    - }
    -
    - private Reader createMockOriginalReader() throws IOException {
    - Reader reader = Mockito.mock(Reader.class, settings);
    - RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
    - OrcStruct row1 = createOriginalRow("first");
    - OrcStruct row2 = createOriginalRow("second");
    - OrcStruct row3 = createOriginalRow("third");
    - OrcStruct row4 = createOriginalRow("fourth");
    - OrcStruct row5 = createOriginalRow("fifth");
    -
    - Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
    - .thenReturn(recordReader);
    - Mockito.when(recordReader.hasNext()).
    - thenReturn(true, true, true, true, true, false);
    - Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L);
    - Mockito.when(recordReader.next(null)).thenReturn(row1);
    - Mockito.when(recordReader.next(row1)).thenReturn(row2);
    - Mockito.when(recordReader.next(row2)).thenReturn(row3);
    - Mockito.when(recordReader.next(row3)).thenReturn(row4);
    - Mockito.when(recordReader.next(row4)).thenReturn(row5);
    - return reader;
    - }
    -
    - @Test
    - public void testOriginalReaderPair() throws Exception {
    - ReaderKey key = new ReaderKey();
    - Reader reader = createMockOriginalReader();
    - RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
    - RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
    - boolean[] includes = new boolean[]{true, true};
    - ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
    - new Reader.Options().include(includes));
    - RecordReader recordReader = pair.recordReader;
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(2, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    - assertEquals("third", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(3, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    - assertEquals("fourth", value(pair.nextRecord));
    -
    - pair.next(pair.nextRecord);
    - assertEquals(null, pair.nextRecord);
    - Mockito.verify(recordReader).close();
    - }
    -
    - private static ValidTxnList createMaximalTxnList() {
    - return new ValidReadTxnList(Long.MAX_VALUE + ":");
    - }
    -
    - @Test
    - public void testOriginalReaderPairNoMin() throws Exception {
    - ReaderKey key = new ReaderKey();
    - Reader reader = createMockOriginalReader();
    - ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
    - new Reader.Options());
    - assertEquals("first", value(pair.nextRecord));
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(0, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    -
    - pair.next(pair.nextRecord);
    - assertEquals("second", value(pair.nextRecord));
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(1, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    -
    - pair.next(pair.nextRecord);
    - assertEquals("third", value(pair.nextRecord));
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(2, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    -
    - pair.next(pair.nextRecord);
    - assertEquals("fourth", value(pair.nextRecord));
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(3, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    -
    - pair.next(pair.nextRecord);
    - assertEquals("fifth", value(pair.nextRecord));
    - assertEquals(0, key.getTransactionId());
    - assertEquals(10, key.getBucketId());
    - assertEquals(4, key.getRowId());
    - assertEquals(0, key.getCurrentTransactionId());
    -
    - pair.next(pair.nextRecord);
    - assertEquals(null, pair.nextRecord);
    - Mockito.verify(pair.recordReader).close();
    - }
    -
    - @Test
    - public void testNewBase() throws Exception {
    - Configuration conf = new Configuration();
    - conf.set("columns", "col1");
    - conf.set("columns.types", "string");
    - Reader reader = Mockito.mock(Reader.class, settings);
    - RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
    -
    - List<OrcProto.Type> types = new ArrayList<OrcProto.Type>();
    - OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
    - typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
    - .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
    - .addSubtypes(6);
    - types.add(typeBuilder.build());
    - types.add(null);
    - types.add(null);
    - types.add(null);
    - types.add(null);
    - types.add(null);
    - typeBuilder.clearSubtypes();
    - typeBuilder.addSubtypes(7);
    - types.add(typeBuilder.build());
    -
    - Mockito.when(reader.getTypes()).thenReturn(types);
    - Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
    - .thenReturn(recordReader);
    -
    - OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
    - OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
    - OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
    - OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
    - OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
    - setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
    -
    - Mockito.when(recordReader.hasNext()).
    - thenReturn(true, true, true, true, true, false);
    -
    - Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
    -
    - Mockito.when(recordReader.next(null)).thenReturn(row1, row4);
    - Mockito.when(recordReader.next(row1)).thenReturn(row2);
    - Mockito.when(recordReader.next(row2)).thenReturn(row3);
    - Mockito.when(recordReader.next(row3)).thenReturn(row5);
    -
    - Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
    - .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
    - .getBytes("UTF-8")));
    - Mockito.when(reader.getStripes())
    - .thenReturn(createStripes(2, 2, 1));
    -
    - OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
    - false, 10, createMaximalTxnList(),
    - new Reader.Options().range(1000, 1000), null);
    - RecordReader rr = merger.getCurrentReader().recordReader;
    - assertEquals(0, merger.getOtherReaders().size());
    -
    - assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
    - assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
    - RecordIdentifier id = merger.createKey();
    - OrcStruct event = merger.createValue();
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(10, id.getTransactionId());
    - assertEquals(20, id.getBucketId());
    - assertEquals(40, id.getRowId());
    - assertEquals("third", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(40, id.getTransactionId());
    - assertEquals(50, id.getBucketId());
    - assertEquals(60, id.getRowId());
    - assertEquals("fourth", getValue(event));
    -
    - assertEquals(false, merger.next(id, event));
    - assertEquals(1.0, merger.getProgress(), 0.01);
    - merger.close();
    - Mockito.verify(rr).close();
    - Mockito.verify(rr).getProgress();
    -
    - StructObjectInspector eventObjectInspector =
    - (StructObjectInspector) merger.getObjectInspector();
    - List<? extends StructField> fields =
    - eventObjectInspector.getAllStructFieldRefs();
    - assertEquals(OrcRecordUpdater.FIELDS, fields.size());
    - assertEquals("operation",
    - fields.get(OrcRecordUpdater.OPERATION).getFieldName());
    - assertEquals("currentTransaction",
    - fields.get(OrcRecordUpdater.CURRENT_TRANSACTION).getFieldName());
    - assertEquals("originalTransaction",
    - fields.get(OrcRecordUpdater.ORIGINAL_TRANSACTION).getFieldName());
    - assertEquals("bucket",
    - fields.get(OrcRecordUpdater.BUCKET).getFieldName());
    - assertEquals("rowId",
    - fields.get(OrcRecordUpdater.ROW_ID).getFieldName());
    - StructObjectInspector rowObjectInspector =
    - (StructObjectInspector) fields.get(OrcRecordUpdater.ROW)
    - .getFieldObjectInspector();
    - assertEquals("col1",
    - rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName());
    - }
    -
    - static class MyRow {
    - Text col1;
    - RecordIdentifier ROW__ID;
    -
    - MyRow(String val) {
    - col1 = new Text(val);
    - }
    -
    - MyRow(String val, long rowId, long origTxn, int bucket) {
    - col1 = new Text(val);
    - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
    - }
    - }
    -
    - static String getValue(OrcStruct event) {
    - return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
    - }
    -
    - @Test
    - public void testEmpty() throws Exception {
    - final int BUCKET = 0;
    - Configuration conf = new Configuration();
    - OrcOutputFormat of = new OrcOutputFormat();
    - FileSystem fs = FileSystem.getLocal(conf);
    - Path root = new Path(tmpDir, "testEmpty").makeQualified(fs);
    - fs.delete(root, true);
    - ObjectInspector inspector;
    - synchronized (TestOrcFile.class) {
    - inspector = ObjectInspectorFactory.getReflectionObjectInspector
    - (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    - }
    -
    - // write the empty base
    - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
    - .inspector(inspector).bucket(BUCKET).writingBase(true)
    - .maximumTransactionId(100).finalDestination(root);
    - of.getRecordUpdater(root, options).close(false);
    -
    - ValidTxnList txnList = new ValidReadTxnList("200:");
    - AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
    -
    - Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
    - BUCKET);
    - Reader baseReader = OrcFile.createReader(basePath,
    - OrcFile.readerOptions(conf));
    - OrcRawRecordMerger merger =
    - new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
    - createMaximalTxnList(), new Reader.Options(),
    - AcidUtils.getPaths(directory.getCurrentDirectories()));
    - RecordIdentifier key = merger.createKey();
    - OrcStruct value = merger.createValue();
    - assertEquals(false, merger.next(key, value));
    - }
    -
    - /**
    - * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
    - * a base and a delta.
    - * @throws Exception
    - */
    - @Test
    - public void testNewBaseAndDelta() throws Exception {
    - testNewBaseAndDelta(false);
    - testNewBaseAndDelta(true);
    - }
    - private void testNewBaseAndDelta(boolean use130Format) throws Exception {
    - final int BUCKET = 10;
    - String[] values = new String[]{"first", "second", "third", "fourth",
    - "fifth", "sixth", "seventh", "eighth",
    - "ninth", "tenth"};
    - Configuration conf = new Configuration();
    - OrcOutputFormat of = new OrcOutputFormat();
    - FileSystem fs = FileSystem.getLocal(conf);
    - Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs);
    - fs.delete(root, true);
    - ObjectInspector inspector;
    - synchronized (TestOrcFile.class) {
    - inspector = ObjectInspectorFactory.getReflectionObjectInspector
    - (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    - }
    -
    - // write the base
    - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
    - .inspector(inspector).bucket(BUCKET).finalDestination(root);
    - if(!use130Format) {
    - options.statementId(-1);
    - }
    - RecordUpdater ru = of.getRecordUpdater(root,
    - options.writingBase(true).maximumTransactionId(100));
    - for(String v: values) {
    - ru.insert(0, new MyRow(v));
    - }
    - ru.close(false);
    -
    - // write a delta
    - ru = of.getRecordUpdater(root, options.writingBase(false)
    - .minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1));
    - ru.update(200, new MyRow("update 1", 0, 0, BUCKET));
    - ru.update(200, new MyRow("update 2", 2, 0, BUCKET));
    - ru.update(200, new MyRow("update 3", 3, 0, BUCKET));
    - ru.delete(200, new MyRow("", 7, 0, BUCKET));
    - ru.delete(200, new MyRow("", 8, 0, BUCKET));
    - ru.close(false);
    -
    - ValidTxnList txnList = new ValidReadTxnList("200:");
    - AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
    -
    - assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
    - assertEquals(new Path(root, use130Format ?
    - AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
    - directory.getCurrentDirectories().get(0).getPath());
    -
    - Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
    - BUCKET);
    - Reader baseReader = OrcFile.createReader(basePath,
    - OrcFile.readerOptions(conf));
    - OrcRawRecordMerger merger =
    - new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
    - createMaximalTxnList(), new Reader.Options(),
    - AcidUtils.getPaths(directory.getCurrentDirectories()));
    - assertEquals(null, merger.getMinKey());
    - assertEquals(null, merger.getMaxKey());
    - RecordIdentifier id = merger.createKey();
    - OrcStruct event = merger.createValue();
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
    - assertEquals("update 1", getValue(event));
    - assertFalse(merger.isDelete(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
    - assertEquals("second", getValue(event));
    - assertFalse(merger.isDelete(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
    - assertEquals("update 2", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
    - assertEquals("update 3", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
    - assertEquals("fifth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
    - assertEquals("sixth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
    - assertEquals("seventh", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.DELETE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
    - assertNull(OrcRecordUpdater.getRow(event));
    - assertTrue(merger.isDelete(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.DELETE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
    - assertNull(OrcRecordUpdater.getRow(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
    - assertEquals("tenth", getValue(event));
    -
    - assertEquals(false, merger.next(id, event));
    - merger.close();
    -
    - // make a merger that doesn't collapse events
    - merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
    - createMaximalTxnList(), new Reader.Options(),
    - AcidUtils.getPaths(directory.getCurrentDirectories()));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
    - assertEquals("update 1", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 0, 0), id);
    - assertEquals("first", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
    - assertEquals("second", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
    - assertEquals("update 2", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 2, 0), id);
    - assertEquals("third", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
    - assertEquals("update 3", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 3, 0), id);
    - assertEquals("fourth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
    - assertEquals("fifth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
    - assertEquals("sixth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
    - assertEquals("seventh", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.DELETE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
    - assertNull(OrcRecordUpdater.getRow(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 7, 0), id);
    - assertEquals("eighth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.DELETE_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
    - assertNull(OrcRecordUpdater.getRow(event));
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 8, 0), id);
    - assertEquals("ninth", getValue(event));
    -
    - assertEquals(true, merger.next(id, event));
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
    - assertEquals("tenth", getValue(event));
    -
    - assertEquals(false, merger.next(id, event));
    - merger.close();
    -
    - // try ignoring the 200 transaction and make sure it works still
    - ValidTxnList txns = new ValidReadTxnList("2000:200");
    - merger =
    - new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
    - txns, new Reader.Options(),
    - AcidUtils.getPaths(directory.getCurrentDirectories()));
    - for(int i=0; i < values.length; ++i) {
    - assertEquals(true, merger.next(id, event));
    - LOG.info("id = " + id + "event = " + event);
    - assertEquals(OrcRecordUpdater.INSERT_OPERATION,
    - OrcRecordUpdater.getOperation(event));
    - assertEquals(new ReaderKey(0, BUCKET, i, 0), id);
    - assertEquals(values[i], getValue(event));
    - }
    -
    - assertEquals(false, merger.next(id, event));
    - merger.close();
    - }
    -
    - static class BigRow {
    - int myint;
    - long mylong;
    - Text mytext;
    - float myfloat;
    - double mydouble;
    - RecordIdentifier ROW__ID;
    -
    - BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) {
    - this.myint = myint;
    - this.mylong = mylong;
    - this.mytext = new Text(mytext);
    - this.myfloat = myfloat;
    - this.mydouble = mydouble;
    - ROW__ID = null;
    - }
    -
    - BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble,
    - long rowId, long origTxn, int bucket) {
    - this.myint = myint;
    - this.mylong = mylong;
    - this.mytext = new Text(mytext);
    - this.myfloat = myfloat;
    - this.mydouble = mydouble;
    - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
    - }
    -
    - BigRow(long rowId, long origTxn, int bucket) {
    - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
    - }
    - }
    -
    - /**
    - * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
    - * a base and a delta.
    - * @throws Exception
    - */
    - @Test
    - public void testRecordReaderOldBaseAndDelta() throws Exception {
    - final int BUCKET = 10;
    - Configuration conf = new Configuration();
    - OrcOutputFormat of = new OrcOutputFormat();
    - FileSystem fs = FileSystem.getLocal(conf);
    - Path root = new Path(tmpDir, "testOldBaseAndDelta").makeQualified(fs);
    - fs.delete(root, true);
    - ObjectInspector inspector;
    - synchronized (TestOrcFile.class) {
    - inspector = ObjectInspectorFactory.getReflectionObjectInspector
    - (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    - }
    -
    - // write the base
    - MemoryManager mgr = new MemoryManager(conf){
    - int rowsAddedSinceCheck = 0;
    -
    - @Override
    - synchronized void addedRow(int rows) throws IOException {
    - rowsAddedSinceCheck += rows;
    - if (rowsAddedSinceCheck >= 2) {
    - notifyWriters();
    - rowsAddedSinceCheck = 0;
    - }
    - }
    - };
    - // make 5 stripes with 2 rows each
    - Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
    - OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
    - .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
    - .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
    - String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
    - "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
    - for(int i=0; i < values.length; ++i) {
    - writer.addRow(new BigRow(i, i, values[i], i, i));
    - }
    - writer.close();
    -
    - // write a delta
    - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
    - .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
    - RecordUpdater ru = of.getRecordUpdater(root, options);
    - values = new String[]{"0.0", null, null, "1.1", null, null, null,
    - "ignore.7"};
    - for(int i=0; i < values.length; ++i) {
    - if (values[i] != null) {
    - ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
    - }
    - }
    - ru.delete(100, new BigRow(9, 0, BUCKET));
    - ru.close(false);
    -
    - // write a delta
    - options = options.minimumTransactionId(2).maximumTransactionId(2);
    - ru = of.getRecordUpdater(root, options);
    - values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
    - for(int i=0; i < values.length; ++i) {
    - if (values[i] != null) {
    - ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
    - }
    - }
    - ru.delete(100, new BigRow(8, 0, BUCKET));
    - ru.close(false);
    -
    - InputFormat inf = new OrcInputFormat();
    - JobConf job = new JobConf();
    - job.set("mapred.min.split.size", "1");
    - job.set("mapred.max.split.size", "2");
    - job.set("mapred.input.dir", root.toString());
    - InputSplit[] splits = inf.getSplits(job, 5);
    - assertEquals(5, splits.length);
    - org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
    -
    - // loop through the 5 splits and read each
    - for(int i=0; i < 4; ++i) {
    - System.out.println("starting split " + i);
    - rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
    - NullWritable key = rr.createKey();
    - OrcStruct value = rr.createValue();
    -
    - // there should be exactly two rows per a split
    - for(int j=0; j < 2; ++j) {
    - System.out.println("i = " + i + ", j = " + j);
    - assertEquals(true, rr.next(key, value));
    - System.out.println("record = " + value);
    - assertEquals(i + "." + j, value.getFieldValue(2).toString());
    - }
    - assertEquals(false, rr.next(key, value));
    - }
    - rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
    - assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
    - }
    -
    - /**
    - * Test the RecordReader when there is a new base and a delta.
    - * @throws Exception
    - */
    - @Test
    - public void testRecordReaderNewBaseAndDelta() throws Exception {
    - final int BUCKET = 11;
    - Configuration conf = new Configuration();
    - OrcOutputFormat of = new OrcOutputFormat();
    - FileSystem fs = FileSystem.getLocal(conf);
    - Path root = new Path(tmpDir, "testRecordReaderNewBaseAndDelta").makeQualified(fs);
    - fs.delete(root, true);
    - ObjectInspector inspector;
    - synchronized (TestOrcFile.class) {
    - inspector = ObjectInspectorFactory.getReflectionObjectInspector
    - (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    - }
    -
    - // write the base
    - MemoryManager mgr = new MemoryManager(conf){
    - int rowsAddedSinceCheck = 0;
    -
    - @Override
    - synchronized void addedRow(int rows) throws IOException {
    - rowsAddedSinceCheck += rows;
    - if (rowsAddedSinceCheck >= 2) {
    - notifyWriters();
    - rowsAddedSinceCheck = 0;
    - }
    - }
    - };
    -
    - // make 5 stripes with 2 rows each
    - OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions)
    - new OrcRecordUpdater.OrcOptions(conf)
    - .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs);
    - options.orcOptions(OrcFile.writerOptions(conf)
    - .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
    - .memory(mgr));
    - options.finalDestination(root);
    - RecordUpdater ru = of.getRecordUpdater(root, options);
    - String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
    - "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
    - for(int i=0; i < values.length; ++i) {
    - ru.insert(0, new BigRow(i, i, values[i], i, i));
    - }
    - ru.close(false);
    -
    - // write a delta
    - options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5);
    - ru = of.getRecordUpdater(root, options);
    - values = new String[]{"0.0", null, null, "1.1", null, null, null,
    - "ignore.7"};
    - for(int i=0; i < values.length; ++i) {
    - if (values[i] != null) {
    - ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
    - }
    - }
    - ru.delete(100, new BigRow(9, 0, BUCKET));
    - ru.close(false);
    -
    - // write a delta
    - options.minimumTransactionId(2).maximumTransactionId(2);
    - ru = of.getRecordUpdater(root, options);
    - values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
    - for(int i=0; i < values.length; ++i) {
    - if (values[i] != null) {
    - ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
    - }
    - }
    - ru.delete(100, new BigRow(8, 0, BUCKET));
    - ru.close(false);
    -
    - InputFormat inf = new OrcInputFormat();
    - JobConf job = new JobConf();
    - job.set("mapred.min.split.size", "1");
    - job.set("mapred.max.split.size", "2");
    - job.set("mapred.input.dir", root.toString());
    - InputSplit[] splits = inf.getSplits(job, 5);
    - assertEquals(5, splits.length);
    - org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
    -
    - // loop through the 5 splits and read each
    - for(int i=0; i < 4; ++i) {
    - System.out.println("starting split " + i);
    - rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
    - NullWritable key = rr.createKey();
    - OrcStruct value = rr.createValue();
    -
    - // there should be exactly two rows per a split
    - for(int j=0; j < 2; ++j) {
    - System.out.println("i = " + i + ", j = " + j);
    - assertEquals(true, rr.next(key, value));
    - System.out.println("record = " + value);
    - assertEquals(i + "." + j, value.getFieldValue(2).toString());
    - }
    - assertEquals(false, rr.next(key, value));
    - }
    - rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
    - assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
    - }
    -
    - /**
    - * Test the RecordReader when there is a new base and a delta.
    - * @throws Exception
    - */
    - @Test
    - public void testRecordReaderDelta() throws Exception {
    - final int BUCKET = 0;
    - Configuration conf = new Configuration();
    - OrcOutputFormat of = new OrcOutputFormat();
    - FileSystem fs = FileSystem.getLocal(conf);
    - Path root = new Path(tmpDir, "testRecordReaderDelta").makeQualified(fs);
    - fs.delete(root, true);
    - ObjectInspector inspector;
    - synchronized (TestOrcFile.class) {
    - inspector = ObjectInspectorFactory.getReflectionObjectInspector
    - (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    - }
    -
    - // write a delta
    - AcidOutputFormat.Options options =
    - new AcidOutputFormat.Options(conf)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs)
    - .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
    - .finalDestination(root);
    - RecordUpdater ru = of.getRecordUpdater(root, options);
    - String[] values = new String[]{"a", "b", "c", "d", "e"};
    - for(int i=0; i < values.length; ++i) {
    - ru.insert(1, new MyRow(values[i]));
    - }
    - ru.close(false);
    -
    - // write a delta
    - options.minimumTransactionId(2).maximumTransactionId(2);
    - ru = of.getRecordUpdater(root, options);
    - values = new String[]{"f", "g", "h", "i", "j"};
    - for(int i=0; i < values.length; ++i) {
    - ru.insert(2, new MyRow(values[i]));
    - }
    - ru.close(false);
    -
    - InputFormat inf = new OrcInputFormat();
    - JobConf job = new JobConf();
    - job.set("mapred.min.split.size", "1");
    - job.set("mapred.max.split.size", "2");
    - job.set("mapred.input.dir", root.toString());
    - job.set("bucket_count", "1");
    - InputSplit[] splits = inf.getSplits(job, 5);
    - assertEquals(1, splits.length);
    - org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
    - rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
    - values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
    - OrcStruct row = rr.createValue();
    - for(int i = 0; i < values.length; ++i) {
    - System.out.println("Checking " + i);
    - assertEquals(true, rr.next(NullWritable.get(), row));
    - assertEquals(values[i], row.getFieldValue(0).toString());
    - }
    - assertEquals(false, rr.next(NullWritable.get(), row));
    - }
    -
    - /**
    - * Test the RecordReader when the delta has been flushed, but not closed.
    - * @throws Exception
    - */
    - @Test
    - public void testRecordReaderIncompleteDelta() throws Exception {
    - testRecordReaderIncompleteDelta(false);
    - testRecordReaderIncompleteDelta(true);
    - }
    - /**
    - *
    - * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
    - */
    - private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
    - final int BUCKET = 1;
    - Configuration conf = new Configuration();
    - OrcOutputFormat of = new OrcOutputFormat();
    - FileSystem fs = FileSystem.getLocal(conf).getRaw();
    - Path root = new Path(tmpDir, "testRecordReaderIncompleteDelta").makeQualified(fs);
    - fs.delete(root, true);
    - ObjectInspector inspector;
    - synchronized (TestOrcFile.class) {
    - inspector = ObjectInspectorFactory.getReflectionObjectInspector
    - (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
    - }
    -
    - // write a base
    - AcidOutputFormat.Options options =
    - new AcidOutputFormat.Options(conf)
    - .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
    - .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
    - if(!use130Format) {
    - options.statementId(-1);
    - }
    - RecordUpdater ru = of.getRecordUpdater(root, options);
    - String[] values= new String[]{"1", "2", "3", "4", "5"};
    - for(int i=0; i < values.length; ++i) {
    - ru.insert(0, new MyRow(values[i]));
    - }
    - ru.close(false);
    -
    - // write a delta
    - options.writingBase(false).minimumTransactionId(10)
    - .maximumTransactionId(19);
    - ru = of.getRecordUpdater(root, options);
    - values = new String[]{"6", "7", "8"};
    - for(int i=0; i < values.length; ++i) {
    - ru.insert(1, new MyRow(values[i]));
    - }
    - InputFormat inf = new OrcInputFormat();
    - JobConf job = new JobConf();
    - job.set("mapred.input.dir", root.toString());
    - job.set("bucket_count", "2");
    -
    - // read the keys before the delta is flushed
    - InputSplit[] splits = inf.getSplits(job, 1);
    - assertEquals(2, splits.length);
    - org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
    - inf.getRecordReader(splits[0], job, Reporter.NULL);
    - NullWritable key = rr.createKey();
    - OrcStruct value = rr.createValue();
    - System.out.println("Looking at split " + splits[0]);
    - for(int i=1; i < 6; ++i) {
    - System.out.println("Checking row " + i);
    - assertEquals(true, rr.next(key, value));
    - assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
    - }
    - assertEquals(false, rr.next(key, value));
    -
    - ru.flush();
    - ru.flush();
    - values = new String[]{"9", "10"};
    - for(int i=0; i < values.length; ++i) {
    - ru.insert(3, new MyRow(values[i]));
    - }
    - ru.flush();
    -
    - splits = inf.getSplits(job, 1);
    - assertEquals(2, splits.length);
    - rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
    - Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
    - AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
    - assertEquals(true, fs.exists(sideFile));
    - assertEquals(24, fs.getFileStatus(sideFile).getLen());
    -
    - for(int i=1; i < 11; ++i) {
    - assertEquals(true, rr.next(key, value));
    - assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
    - }
    - assertEquals(false, rr.next(key, value));
    - }
    -
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    index 5b775f9..0a91348 100644
    --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
    @@ -241,6 +241,34 @@ public class TestDbTxnManager2 {
          otherTxnMgr.closeTxnManager();
        }

    + /**
    + * check that locks in Waiting state show what they are waiting on
    + * This test is somewhat abusive in that it make DbLockManager retain locks for 2
    + * different queries (which are not part of the same transaction) which can never
    + * happen in real use cases... but it makes testing convenient.
    + * @throws Exception
    + */
    + @Test
    + public void testLockBlockedBy() throws Exception {
    + CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    + checkCmdOnDriver(cpr);
    + cpr = driver.compileAndRespond("select * from TAB_BLOCKED");
    + checkCmdOnDriver(cpr);
    + txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM");
    + List<ShowLocksResponseElement> locks = getLocks(txnMgr);
    + Assert.assertEquals("Unexpected lock count", 1, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks.get(0));
    + cpr = driver.compileAndRespond("drop table TAB_BLOCKED");
    + checkCmdOnDriver(cpr);
    + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking
    + locks = getLocks(txnMgr);
    + Assert.assertEquals("Unexpected lock count", 2, locks.size());
    + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks.get(0));
    + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "TAB_BLOCKED", null, locks.get(1));
    + Assert.assertEquals("BlockedByExtId doesn't match", locks.get(0).getLockid(), locks.get(1).getBlockedByExtId());
    + Assert.assertEquals("BlockedByIntId doesn't match", locks.get(0).getLockIdInternal(), locks.get(1).getBlockedByIntId());
    + }
    +
        @Test
        public void testDummyTxnManagerOnAcidTable() throws Exception {
          // Create an ACID table with DbTxnManager

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
    ----------------------------------------------------------------------
    diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
    index d9d2ed6..46d8ea1 100644
    --- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
    +++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
    @@ -2,17 +2,17 @@ PREHOOK: query: show locks
      PREHOOK: type: SHOWLOCKS
      POSTHOOK: query: show locks
      POSTHOOK: type: SHOWLOCKS
    -Lock ID Database Table Partition State Type Transaction ID Last Hearbeat Acquired At User Hostname
    +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User
      PREHOOK: query: show locks extended
      PREHOOK: type: SHOWLOCKS
      POSTHOOK: query: show locks extended
      POSTHOOK: type: SHOWLOCKS
    -Lock ID Database Table Partition State Type Transaction ID Last Hearbeat Acquired At User Hostname
    +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User
      PREHOOK: query: show locks default
      PREHOOK: type: SHOWLOCKS
      POSTHOOK: query: show locks default
      POSTHOOK: type: SHOWLOCKS
    -Lock ID Database Table Partition State Type Transaction ID Last Hearbeat Acquired At User Hostname
    +Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User
      PREHOOK: query: show transactions
      PREHOOK: type: SHOW TRANSACTIONS
      POSTHOOK: query: show transactions
  • Ekoifman at Mar 30, 2016 at 7:20 pm
    HIVE-10249 ACID: show locks should show who the lock is waiting for (Eugene Koifman, reviewed by Wei Zheng)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6f6c4ac
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6f6c4ac
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6f6c4ac

    Branch: refs/heads/branch-1
    Commit: b6f6c4acba3b863851d197a7e6e2e9f4b851da70
    Parents: a275951
    Author: Eugene Koifman <ekoifman@hortonworks.com>
    Authored: Wed Mar 30 12:18:20 2016 -0700
    Committer: Eugene Koifman <ekoifman@hortonworks.com>
    Committed: Wed Mar 30 12:18:20 2016 -0700

    ----------------------------------------------------------------------
      .../hadoop/hive/metastore/txn/TxnDbUtil.java | 6 +-
      .../hadoop/hive/metastore/txn/TxnHandler.java | 46 +-
      .../hive/metastore/txn/TestTxnHandler.java | 2 +-
      .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 +-
      .../exec/vector/VectorizedBatchUtil.java.orig | 707 -----------
      .../ql/parse/LoadSemanticAnalyzer.java.orig | 360 ------
      .../ql/io/orc/TestOrcRawRecordMerger.java.orig | 1150 ------------------
      .../hive/ql/lockmgr/TestDbTxnManager2.java | 28 +
      .../clientpositive/dbtxnmgr_showlocks.q.out | 6 +-
      9 files changed, 89 insertions(+), 2232 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    index 56c9ed8..2e24678 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    @@ -103,7 +103,11 @@ public final class TxnDbUtil {
                " HL_ACQUIRED_AT bigint," +
                " HL_USER varchar(128) NOT NULL," +
                " HL_HOST varchar(128) NOT NULL," +
    - " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
    + " HL_HEARTBEAT_COUNT integer," +
    + " HL_AGENT_INFO varchar(128)," +
    + " HL_BLOCKEDBY_EXT_ID bigint," +
    + " HL_BLOCKEDBY_INT_ID bigint," +
    + " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
            stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");

            stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    index a3b0751..ed4a3c2 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    @@ -869,8 +869,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         */
        private static class LockInfoExt extends LockInfo {
          private final ShowLocksResponseElement e;
    - LockInfoExt(ShowLocksResponseElement e, long intLockId) {
    - super(e, intLockId);
    + LockInfoExt(ShowLocksResponseElement e) {
    + super(e);
            this.e = e;
          }
        }
    @@ -886,7 +886,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
              stmt = dbConn.createStatement();

              String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
    - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS";
    + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
    + "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS";
              LOG.debug("Doing to execute query <" + s + ">");
              ResultSet rs = stmt.executeQuery(s);
              while (rs.next()) {
    @@ -914,7 +915,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
                e.setUser(rs.getString(10));
                e.setHostname(rs.getString(11));
    - sortedList.add(new LockInfoExt(e, rs.getLong(12)));
    + e.setLockIdInternal(rs.getLong(12));
    + long id = rs.getLong(13);
    + if(!rs.wasNull()) {
    + e.setBlockedByExtId(id);
    + }
    + id = rs.getLong(14);
    + if(!rs.wasNull()) {
    + e.setBlockedByIntId(id);
    + }
    + sortedList.add(new LockInfoExt(e));
              }
              LOG.debug("Going to rollback");
              dbConn.rollback();
    @@ -1164,6 +1174,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        private static void shouldNeverHappen(long txnid) {
          throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
        }
    + private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) {
    + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
    + + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
    + }
        public void addDynamicPartitions(AddDynamicPartitions rqst)
            throws NoSuchTxnException, TxnAbortedException, MetaException {
          Connection dbConn = null;
    @@ -1711,15 +1725,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            }
            txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL
          }
    - LockInfo(ShowLocksResponseElement e, long intLockId) {
    + LockInfo(ShowLocksResponseElement e) {
            extLockId = e.getLockid();
    - this.intLockId = intLockId;
    + intLockId = e.getLockIdInternal();
    + txnId = e.getTxnid();
            db = e.getDbname();
            table = e.getTablename();
            partition = e.getPartname();
            state = e.getState();
            type = e.getType();
    - txnId = e.getTxnid();
          }

          public boolean equals(Object other) {
    @@ -2018,9 +2032,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

          LOG.debug("Going to execute query <" + query.toString() + ">");
          Statement stmt = null;
    + ResultSet rs = null;
          try {
            stmt = dbConn.createStatement();
    - ResultSet rs = stmt.executeQuery(query.toString());
    + rs = stmt.executeQuery(query.toString());
            SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
            while (rs.next()) {
              lockSet.add(new LockInfo(rs));
    @@ -2091,7 +2106,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                switch (lockAction) {
                  case WAIT:
                    if(!ignoreConflict(info, locks[i])) {
    + /*we acquire all locks for a given query atomically; if 1 blocks, all go into (remain) in
    + * Waiting state. wait() will undo any 'acquire()' which may have happened as part of
    + * this (metastore db) transaction and then we record which lock blocked the lock
    + * we were testing ('info').*/
                      wait(dbConn, save);
    + String sqlText = "update HIVE_LOCKS" +
    + " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId +
    + ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId +
    + " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId;
    + LOG.debug("Executing sql: " + sqlText);
    + int updCnt = stmt.executeUpdate(sqlText);
    + if(updCnt != 1) {
    + shouldNeverHappen(info.txnId, info.extLockId, info.intLockId);
    + }
                      LOG.debug("Going to commit");
                      dbConn.commit();
                      response.setState(LockState.WAITING);
    @@ -2120,7 +2148,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            dbConn.commit();
            response.setState(LockState.ACQUIRED);
          } finally {
    - closeStmt(stmt);
    + close(rs, stmt, null);
          }
          return response;
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    index 6033c15..4d3c3e1 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    @@ -1114,7 +1114,7 @@ public class TestTxnHandler {
        }

        @Test
    - @Ignore
    + @Ignore("Wedges Derby")
        public void deadlockDetected() throws Exception {
          LOG.debug("Starting deadlock test");
          if (txnHandler instanceof TxnHandler) {

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    index 414293c..816d8d4 100644
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    @@ -2512,6 +2512,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
          os.write(separator);
          os.writeBytes("State");
          os.write(separator);
    + os.writeBytes("Blocked By");
    + os.write(separator);
          os.writeBytes("Type");
          os.write(separator);
          os.writeBytes("Transaction ID");
    @@ -2528,7 +2530,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
          List<ShowLocksResponseElement> locks = rsp.getLocks();
          if (locks != null) {
            for (ShowLocksResponseElement lock : locks) {
    - os.writeBytes(Long.toString(lock.getLockid()));
    + if(lock.isSetLockIdInternal()) {
    + os.writeBytes(Long.toString(lock.getLockid()) + "." + Long.toString(lock.getLockIdInternal()));
    + }
    + else {
    + os.writeBytes(Long.toString(lock.getLockid()));
    + }
              os.write(separator);
              os.writeBytes(lock.getDbname());
              os.write(separator);
    @@ -2538,6 +2545,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
              os.write(separator);
              os.writeBytes(lock.getState().toString());
              os.write(separator);
    + if(lock.isSetBlockedByExtId()) {//both "blockedby" are either there or not
    + os.writeBytes(Long.toString(lock.getBlockedByExtId()) + "." + Long.toString(lock.getBlockedByIntId()));
    + }
    + else {
    + os.writeBytes(" ");//12 chars - try to keep cols aligned
    + }
    + os.write(separator);
              os.writeBytes(lock.getType().toString());
              os.write(separator);
              os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid()));

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
    deleted file mode 100644
    index af43a07..0000000
    --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
    +++ /dev/null
    @@ -1,707 +0,0 @@
    -/**
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements. See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership. The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License. You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.hadoop.hive.ql.exec.vector;
    -
    -import java.io.IOException;
    -import java.sql.Timestamp;
    -import java.util.ArrayList;
    -import java.util.Arrays;
    -import java.util.LinkedList;
    -import java.util.List;
    -import java.util.Map;
    -
    -import org.apache.commons.logging.Log;
    -import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hive.common.ObjectPair;
    -import org.apache.hadoop.hive.common.type.HiveChar;
    -import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
    -import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
    -import org.apache.hadoop.hive.common.type.HiveVarchar;
    -import org.apache.hadoop.hive.ql.exec.Utilities;
    -import org.apache.hadoop.hive.ql.metadata.HiveException;
    -import org.apache.hadoop.hive.serde2.io.ByteWritable;
    -import org.apache.hadoop.hive.serde2.io.DateWritable;
    -import org.apache.hadoop.hive.serde2.io.DoubleWritable;
    -import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
    -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
    -import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
    -import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
    -import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
    -import org.apache.hadoop.hive.serde2.io.ShortWritable;
    -import org.apache.hadoop.hive.serde2.io.TimestampWritable;
    -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    -import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
    -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    -import org.apache.hadoop.io.BooleanWritable;
    -import org.apache.hadoop.io.BytesWritable;
    -import org.apache.hadoop.io.DataOutputBuffer;
    -import org.apache.hadoop.io.FloatWritable;
    -import org.apache.hadoop.io.IntWritable;
    -import org.apache.hadoop.io.LongWritable;
    -import org.apache.hadoop.io.Text;
    -import org.apache.hive.common.util.DateUtils;
    -
    -public class VectorizedBatchUtil {
    - private static final Log LOG = LogFactory.getLog(VectorizedBatchUtil.class);
    -
    - /**
    - * Sets the IsNull value for ColumnVector at specified index
    - * @param cv
    - * @param rowIndex
    - */
    - public static void setNullColIsNullValue(ColumnVector cv, int rowIndex) {
    - cv.isNull[rowIndex] = true;
    - if (cv.noNulls) {
    - cv.noNulls = false;
    - }
    - }
    -
    - /**
    - * Iterates thru all the column vectors and sets noNull to
    - * specified value.
    - *
    - * @param batch
    - * Batch on which noNull is set
    - */
    - public static void setNoNullFields(VectorizedRowBatch batch) {
    - for (int i = 0; i < batch.numCols; i++) {
    - batch.cols[i].noNulls = true;
    - }
    - }
    -
    - /**
    - * Iterates thru all the column vectors and sets repeating to
    - * specified column.
    - *
    - */
    - public static void setRepeatingColumn(VectorizedRowBatch batch, int column) {
    - ColumnVector cv = batch.cols[column];
    - cv.isRepeating = true;
    - }
    -
    - /**
    - * Reduce the batch size for a vectorized row batch
    - */
    - public static void setBatchSize(VectorizedRowBatch batch, int size) {
    - assert (size <= batch.getMaxSize());
    - batch.size = size;
    - }
    -
    - /**
    - * Walk through the object inspector and add column vectors
    - *
    - * @param oi
    - * @param cvList
    - * ColumnVectors are populated in this list
    - */
    - private static void allocateColumnVector(StructObjectInspector oi,
    - List<ColumnVector> cvList) throws HiveException {
    - if (cvList == null) {
    - throw new HiveException("Null columnvector list");
    - }
    - if (oi == null) {
    - return;
    - }
    - final List<? extends StructField> fields = oi.getAllStructFieldRefs();
    - for(StructField field : fields) {
    - ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
    - switch(fieldObjectInspector.getCategory()) {
    - case PRIMITIVE:
    - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector;
    - switch(poi.getPrimitiveCategory()) {
    - case BOOLEAN:
    - case BYTE:
    - case SHORT:
    - case INT:
    - case LONG:
    - case TIMESTAMP:
    - case DATE:
    - case INTERVAL_YEAR_MONTH:
    - case INTERVAL_DAY_TIME:
    - cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
    - break;
    - case FLOAT:
    - case DOUBLE:
    - cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
    - break;
    - case BINARY:
    - case STRING:
    - case CHAR:
    - case VARCHAR:
    - cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
    - break;
    - case DECIMAL:
    - DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
    - cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
    - tInfo.precision(), tInfo.scale()));
    - break;
    - default:
    - throw new HiveException("Vectorizaton is not supported for datatype:"
    - + poi.getPrimitiveCategory());
    - }
    - break;
    - case STRUCT:
    - throw new HiveException("Struct not supported");
    - default:
    - throw new HiveException("Flattening is not supported for datatype:"
    - + fieldObjectInspector.getCategory());
    - }
    - }
    - }
    -
    -
    - /**
    - * Create VectorizedRowBatch from ObjectInspector
    - *
    - * @param oi
    - * @return
    - * @throws HiveException
    - */
    - public static VectorizedRowBatch constructVectorizedRowBatch(
    - StructObjectInspector oi) throws HiveException {
    - final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
    - allocateColumnVector(oi, cvList);
    - final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
    - int i = 0;
    - for(ColumnVector cv : cvList) {
    - result.cols[i++] = cv;
    - }
    - return result;
    - }
    -
    - /**
    - * Create VectorizedRowBatch from key and value object inspectors
    - * The row object inspector used by ReduceWork needs to be a **standard**
    - * struct object inspector, not just any struct object inspector.
    - * @param keyInspector
    - * @param valueInspector
    - * @param vectorScratchColumnTypeMap
    - * @return VectorizedRowBatch, OI
    - * @throws HiveException
    - */
    - public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
    - StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
    - throws HiveException {
    -
    - ArrayList<String> colNames = new ArrayList<String>();
    - ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
    - List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
    - for (StructField field: fields) {
    - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
    - ois.add(field.getFieldObjectInspector());
    - }
    - fields = valueInspector.getAllStructFieldRefs();
    - for (StructField field: fields) {
    - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
    - ois.add(field.getFieldObjectInspector());
    - }
    - StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
    -
    - VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
    - batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
    - return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
    - }
    -
    - /**
    - * Iterates through all columns in a given row and populates the batch
    - *
    - * @param row
    - * @param oi
    - * @param rowIndex
    - * @param batch
    - * @param buffer
    - * @throws HiveException
    - */
    - public static void addRowToBatch(Object row, StructObjectInspector oi,
    - int rowIndex,
    - VectorizedRowBatch batch,
    - DataOutputBuffer buffer
    - ) throws HiveException {
    - addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
    - }
    -
    - /**
    - * Iterates thru all the columns in a given row and populates the batch
    - * from a given offset
    - *
    - * @param row Deserialized row object
    - * @param oi Object insepector for that row
    - * @param rowIndex index to which the row should be added to batch
    - * @param colOffset offset from where the column begins
    - * @param batch Vectorized batch to which the row is added at rowIndex
    - * @throws HiveException
    - */
    - public static void addRowToBatchFrom(Object row, StructObjectInspector oi,
    - int rowIndex,
    - int colOffset,
    - VectorizedRowBatch batch,
    - DataOutputBuffer buffer
    - ) throws HiveException {
    - List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
    - final int off = colOffset;
    - // Iterate thru the cols and load the batch
    - for (int i = 0; i < fieldRefs.size(); i++) {
    - setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, off);
    - }
    - }
    -
    - /**
    - * Add only the projected column of a regular row to the specified vectorized row batch
    - * @param row the regular row
    - * @param oi object inspector for the row
    - * @param rowIndex the offset to add in the batch
    - * @param batch vectorized row batch
    - * @param buffer data output buffer
    - * @throws HiveException
    - */
    - public static void addProjectedRowToBatchFrom(Object row, StructObjectInspector oi,
    - int rowIndex, VectorizedRowBatch batch, DataOutputBuffer buffer) throws HiveException {
    - List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
    - for (int i = 0; i < fieldRefs.size(); i++) {
    - int projectedOutputCol = batch.projectedColumns[i];
    - if (batch.cols[projectedOutputCol] == null) {
    - continue;
    - }
    - setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, projectedOutputCol, 0);
    - }
    - }
    - /**
    - * Iterates thru all the columns in a given row and populates the batch
    - * from a given offset
    - *
    - * @param row Deserialized row object
    - * @param oi Object insepector for that row
    - * @param rowIndex index to which the row should be added to batch
    - * @param batch Vectorized batch to which the row is added at rowIndex
    - * @param context context object for this vectorized batch
    - * @param buffer
    - * @throws HiveException
    - */
    - public static void acidAddRowToBatch(Object row,
    - StructObjectInspector oi,
    - int rowIndex,
    - VectorizedRowBatch batch,
    - VectorizedRowBatchCtx context,
    - DataOutputBuffer buffer) throws HiveException {
    - List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
    - // Iterate thru the cols and load the batch
    - for (int i = 0; i < fieldRefs.size(); i++) {
    - if (batch.cols[i] == null) {
    - // This means the column was not included in the projection from the underlying read
    - continue;
    - }
    - if (context.isPartitionCol(i)) {
    - // The value will have already been set before we're called, so don't overwrite it
    - continue;
    - }
    - setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, 0);
    - }
    - }
    -
    - private static void setVector(Object row,
    - StructObjectInspector oi,
    - StructField field,
    - VectorizedRowBatch batch,
    - DataOutputBuffer buffer,
    - int rowIndex,
    - int colIndex,
    - int offset) throws HiveException {
    -
    - Object fieldData = oi.getStructFieldData(row, field);
    - ObjectInspector foi = field.getFieldObjectInspector();
    -
    - // Vectorization only supports PRIMITIVE data types. Assert the same
    - assert (foi.getCategory() == Category.PRIMITIVE);
    -
    - // Get writable object
    - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
    - Object writableCol = poi.getPrimitiveWritableObject(fieldData);
    -
    - // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
    - // float/double. String types have no default value for null.
    - switch (poi.getPrimitiveCategory()) {
    - case BOOLEAN: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - lcv.vector[rowIndex] = ((BooleanWritable) writableCol).get() ? 1 : 0;
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case BYTE: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - lcv.vector[rowIndex] = ((ByteWritable) writableCol).get();
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case SHORT: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case INT: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case LONG: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case DATE: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - lcv.vector[rowIndex] = ((DateWritable) writableCol).getDays();
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case FLOAT: {
    - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
    - dcv.isNull[rowIndex] = false;
    - } else {
    - dcv.vector[rowIndex] = Double.NaN;
    - setNullColIsNullValue(dcv, rowIndex);
    - }
    - }
    - break;
    - case DOUBLE: {
    - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
    - dcv.isNull[rowIndex] = false;
    - } else {
    - dcv.vector[rowIndex] = Double.NaN;
    - setNullColIsNullValue(dcv, rowIndex);
    - }
    - }
    - break;
    - case TIMESTAMP: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
    - lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t);
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case INTERVAL_YEAR_MONTH: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - HiveIntervalYearMonth i = ((HiveIntervalYearMonthWritable) writableCol).getHiveIntervalYearMonth();
    - lcv.vector[rowIndex] = i.getTotalMonths();
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case INTERVAL_DAY_TIME: {
    - LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - HiveIntervalDayTime i = ((HiveIntervalDayTimeWritable) writableCol).getHiveIntervalDayTime();
    - lcv.vector[rowIndex] = DateUtils.getIntervalDayTimeTotalNanos(i);
    - lcv.isNull[rowIndex] = false;
    - } else {
    - lcv.vector[rowIndex] = 1;
    - setNullColIsNullValue(lcv, rowIndex);
    - }
    - }
    - break;
    - case BINARY: {
    - BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - bcv.isNull[rowIndex] = false;
    - BytesWritable bw = (BytesWritable) writableCol;
    - byte[] bytes = bw.getBytes();
    - int start = buffer.getLength();
    - int length = bw.getLength();
    - try {
    - buffer.write(bytes, 0, length);
    - } catch (IOException ioe) {
    - throw new IllegalStateException("bad write", ioe);
    - }
    - bcv.setRef(rowIndex, buffer.getData(), start, length);
    - } else {
    - setNullColIsNullValue(bcv, rowIndex);
    - }
    - }
    - break;
    - case STRING: {
    - BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - bcv.isNull[rowIndex] = false;
    - Text colText = (Text) writableCol;
    - int start = buffer.getLength();
    - int length = colText.getLength();
    - try {
    - buffer.write(colText.getBytes(), 0, length);
    - } catch (IOException ioe) {
    - throw new IllegalStateException("bad write", ioe);
    - }
    - bcv.setRef(rowIndex, buffer.getData(), start, length);
    - } else {
    - setNullColIsNullValue(bcv, rowIndex);
    - }
    - }
    - break;
    - case CHAR: {
    - BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - bcv.isNull[rowIndex] = false;
    - HiveChar colHiveChar = ((HiveCharWritable) writableCol).getHiveChar();
    - byte[] bytes = colHiveChar.getStrippedValue().getBytes();
    -
    - // We assume the CHAR maximum length was enforced when the object was created.
    - int length = bytes.length;
    -
    - int start = buffer.getLength();
    - try {
    - // In vector mode, we store CHAR as unpadded.
    - buffer.write(bytes, 0, length);
    - } catch (IOException ioe) {
    - throw new IllegalStateException("bad write", ioe);
    - }
    - bcv.setRef(rowIndex, buffer.getData(), start, length);
    - } else {
    - setNullColIsNullValue(bcv, rowIndex);
    - }
    - }
    - break;
    - case VARCHAR: {
    - BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - bcv.isNull[rowIndex] = false;
    - HiveVarchar colHiveVarchar = ((HiveVarcharWritable) writableCol).getHiveVarchar();
    - byte[] bytes = colHiveVarchar.getValue().getBytes();
    -
    - // We assume the VARCHAR maximum length was enforced when the object was created.
    - int length = bytes.length;
    -
    - int start = buffer.getLength();
    - try {
    - buffer.write(bytes, 0, length);
    - } catch (IOException ioe) {
    - throw new IllegalStateException("bad write", ioe);
    - }
    - bcv.setRef(rowIndex, buffer.getData(), start, length);
    - } else {
    - setNullColIsNullValue(bcv, rowIndex);
    - }
    - }
    - break;
    - case DECIMAL:
    - DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[offset + colIndex];
    - if (writableCol != null) {
    - dcv.isNull[rowIndex] = false;
    - HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol;
    - dcv.set(rowIndex, wobj);
    - } else {
    - setNullColIsNullValue(dcv, rowIndex);
    - }
    - break;
    - default:
    - throw new HiveException("Vectorizaton is not supported for datatype:" +
    - poi.getPrimitiveCategory());
    - }
    - }
    -
    - public static StandardStructObjectInspector convertToStandardStructObjectInspector(
    - StructObjectInspector structObjectInspector) throws HiveException {
    -
    - List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
    - List<ObjectInspector> oids = new ArrayList<ObjectInspector>();
    - ArrayList<String> columnNames = new ArrayList<String>();
    -
    - for(StructField field : fields) {
    - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
    - field.getFieldObjectInspector().getTypeName());
    - ObjectInspector standardWritableObjectInspector =
    - TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
    - oids.add(standardWritableObjectInspector);
    - columnNames.add(field.getFieldName());
    - }
    - return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
    - }
    -
    - public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
    - StructObjectInspector structObjectInspector) throws HiveException {
    -
    - List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
    - PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
    -
    - int i = 0;
    - for(StructField field : fields) {
    - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
    - field.getFieldObjectInspector().getTypeName());
    - result[i++] = (PrimitiveTypeInfo) typeInfo;
    - }
    - return result;
    - }
    -
    - public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
    - String[] typeNames) throws HiveException {
    -
    - PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
    -
    - for(int i = 0; i < typeNames.length; i++) {
    - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
    - result[i] = (PrimitiveTypeInfo) typeInfo;
    - }
    - return result;
    - }
    -
    - /**
    - * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty
    - * @param batch the batch to imitate
    - * @return the new batch
    - * @throws HiveException
    - */
    - public static VectorizedRowBatch makeLike(VectorizedRowBatch batch) throws HiveException {
    - VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols);
    - for (int i = 0; i < batch.numCols; i++) {
    - ColumnVector colVector = batch.cols[i];
    - if (colVector != null) {
    - ColumnVector newColVector;
    - if (colVector instanceof LongColumnVector) {
    - newColVector = new LongColumnVector();
    - } else if (colVector instanceof DoubleColumnVector) {
    - newColVector = new DoubleColumnVector();
    - } else if (colVector instanceof BytesColumnVector) {
    - newColVector = new BytesColumnVector();
    - } else if (colVector instanceof DecimalColumnVector) {
    - DecimalColumnVector decColVector = (DecimalColumnVector) colVector;
    - newColVector = new DecimalColumnVector(decColVector.precision, decColVector.scale);
    - } else {
    - throw new HiveException("Column vector class " + colVector.getClass().getName() +
    - " is not supported!");
    - }
    - newBatch.cols[i] = newColVector;
    - newBatch.cols[i].init();
    - }
    - }
    - newBatch.projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectedColumns.length);
    - newBatch.projectionSize = batch.projectionSize;
    - newBatch.reset();
    - return newBatch;
    - }
    -
    - public static String displayBytes(byte[] bytes, int start, int length) {
    - StringBuilder sb = new StringBuilder();
    - for (int i = start; i < start + length; i++) {
    - char ch = (char) bytes[i];
    - if (ch < ' ' || ch > '~') {
    - sb.append(String.format("\\%03d", bytes[i] & 0xff));
    - } else {
    - sb.append(ch);
    - }
    - }
    - return sb.toString();
    - }
    -
    - public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
    - StringBuilder sb = new StringBuilder();
    - sb.append(prefix + " row " + index + " ");
    - for (int column = 0; column < batch.cols.length; column++) {
    - ColumnVector colVector = batch.cols[column];
    - if (colVector == null) {
    - sb.append("(null colVector " + column + ")");
    - } else {
    - boolean isRepeating = colVector.isRepeating;
    - index = (isRepeating ? 0 : index);
    - if (colVector.noNulls || !colVector.isNull[index]) {
    - if (colVector instanceof LongColumnVector) {
    - sb.append(((LongColumnVector) colVector).vector[index]);
    - } else if (colVector instanceof DoubleColumnVector) {
    - sb.append(((DoubleColumnVector) colVector).vector[index]);
    - } else if (colVector instanceof BytesColumnVector) {
    - BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
    - byte[] bytes = bytesColumnVector.vector[index];
    - int start = bytesColumnVector.start[index];
    - int length = bytesColumnVector.length[index];
    - if (bytes == null) {
    - sb.append("(Unexpected null bytes with start " + start + " length " + length + ")");
    - } else {
    - sb.append("bytes: '" + displayBytes(bytes, start, length) + "'");
    - }
    - } else if (colVector instanceof DecimalColumnVector) {
    - sb.append(((DecimalColumnVector) colVector).vector[index].toString());
    - } else {
    - sb.append("Unknown");
    - }
    - } else {
    - sb.append("NULL");
    - }
    - }
    - sb.append(" ");
    - }
    - LOG.info(sb.toString());
    - }
    -
    - public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {
    - for (int i = 0; i < batch.size; i++) {
    - int index = (batch.selectedInUse ? batch.selected[i] : i);
    - debugDisplayOneRow(batch, index, prefix);
    - }
    - }
    -}

    http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
    ----------------------------------------------------------------------
    diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
    deleted file mode 100644
    index aacfa92..0000000
    --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
    +++ /dev/null
    @@ -1,360 +0,0 @@
    -/**
    - * Licensed to the Apache Software Foundation (ASF) under one
    - * or more contributor license agreements. See the NOTICE file
    - * distributed with this work for additional information
    - * regarding copyright ownership. The ASF licenses this file
    - * to you under the Apache License, Version 2.0 (the
    - * "License"); you may not use this file except in compliance
    - * with the License. You may obtain a copy of the License at
    - *
    - * http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.hadoop.hive.ql.parse;
    -
    -import java.io.IOException;
    -import java.io.Serializable;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.LinkedHashMap;
    -import java.util.List;
    -import java.util.Map;
    -
    -import org.antlr.runtime.tree.Tree;
    -import org.apache.commons.httpclient.util.URIUtil;
    -import org.apache.commons.lang.StringUtils;
    -import org.apache.hadoop.fs.FileStatus;
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.fs.Path;
    -import org.apache.hadoop.fs.PathFilter;
    -import org.apache.hadoop.hive.conf.HiveConf;
    -import org.apache.hadoop.hive.metastore.TableType;
    -import org.apache.hadoop.hive.metastore.api.FieldSchema;
    -import org.apache.hadoop.hive.ql.ErrorMsg;
    -import org.apache.hadoop.hive.ql.exec.Task;
    -import org.apache.hadoop.hive.ql.exec.TaskFactory;
    -import org.apache.hadoop.hive.ql.exec.Utilities;
    -import org.apache.hadoop.hive.ql.hooks.WriteEntity;
    -import org.apache.hadoop.hive.ql.io.FileFormatException;
    -import org.apache.hadoop.hive.ql.io.orc.OrcFile;
    -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
    -import org.apache.hadoop.hive.ql.metadata.Hive;
    -import org.apache.hadoop.hive.ql.metadata.HiveException;
    -import org.apache.hadoop.hive.ql.metadata.Partition;
    -import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
    -import org.apache.hadoop.hive.ql.plan.MoveWork;
    -import org.apache.hadoop.hive.ql.plan.StatsWork;
    -import org.apache.hadoop.mapred.InputFormat;
    -
    -/**
    - * LoadSemanticAnalyzer.
    - *
    - */
    -public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
    -
    - public LoadSemanticAnalyzer(HiveConf conf) throws SemanticException {
    - super(conf);
    - }
    -
    - public static FileStatus[] matchFilesOrDir(FileSystem fs, Path path)
    - throws IOException {
    - FileStatus[] srcs = fs.globStatus(path, new PathFilter() {
    - @Override
    - public boolean accept(Path p) {
    - String name = p.getName();
    - return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith(".");
    - }
    - });
    - if ((srcs != null) && srcs.length == 1) {
    - if (srcs[0].isDir()) {
    - srcs = fs.listStatus(srcs[0].getPath(), new PathFilter() {
    - @Override
    - public boolean accept(Path p) {
    - String name = p.getName();
    - return !name.startsWith("_") && !name.startsWith(".");
    - }
    - });
    - }
    - }
    - return (srcs);
    - }
    -
    - private URI initializeFromURI(String fromPath, boolean isLocal) throws IOException,
    - URISyntaxException {
    - URI fromURI = new Path(fromPath).toUri();
    -
    - String fromScheme = fromURI.getScheme();
    - String fromAuthority = fromURI.getAuthority();
    - String path = fromURI.getPath();
    -
    - // generate absolute path relative to current directory or hdfs home
    - // directory
    - if (!path.startsWith("/")) {
    - if (isLocal) {
    - path = URIUtil.decode(
    - new Path(System.getProperty("user.dir"), fromPath).toUri().toString());
    - } else {
    - path = new Path(new Path("/user/" + System.getProperty("user.name")),
    - path).toString();
    - }
    - }
    -
    - // set correct scheme and authority
    - if (StringUtils.isEmpty(fromScheme)) {
    - if (isLocal) {
    - // file for local
    - fromScheme = "file";
    - } else {
    - // use default values from fs.default.name
    - URI defaultURI = FileSystem.get(conf).getUri();
    - fromScheme = defaultURI.getScheme();
    - fromAuthority = defaultURI.getAuthority();
    - }
    - }
    -
    - // if scheme is specified but not authority then use the default authority
    - if ((!fromScheme.equals("file")) && StringUtils.isEmpty(fromAuthority)) {
    - URI defaultURI = FileSystem.get(conf).getUri();
    - fromAuthority = defaultURI.getAuthority();
    - }
    -
    - LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
    - return new URI(fromScheme, fromAuthority, path, null, null);
    - }
    -
    - private FileStatus[] applyConstraintsAndGetFiles(URI fromURI, URI toURI, Tree ast,
    - boolean isLocal) throws SemanticException {
    -
    - FileStatus[] srcs = null;
    -
    - // local mode implies that scheme should be "file"
    - // we can change this going forward
    - if (isLocal && !fromURI.getScheme().equals("file")) {
    - throw new SemanticException(ErrorMsg.ILLEGAL_PATH.getMsg(ast,
    - "Source file system should be \"file\" if \"local\" is specified"));
    - }
    -
    - try {
    - srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new Path(fromURI));
    - if (srcs == null || srcs.length == 0) {
    - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
    - "No files matching path " + fromURI));
    - }
    -
    - for (FileStatus oneSrc : srcs) {
    - if (oneSrc.isDir()) {
    - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
    - "source contains directory: " + oneSrc.getPath().toString()));
    - }
    - }
    - } catch (IOException e) {
    - // Has to use full name to make sure it does not conflict with
    - // org.apache.commons.lang.StringUtils
    - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
    - }
    -
    - return srcs;
    - }
    -
    - @Override
    - public void analyzeInternal(ASTNode ast) throws SemanticException {
    - boolean isLocal = false;
    - boolean isOverWrite = false;
    - Tree fromTree = ast.getChild(0);
    - Tree tableTree = ast.getChild(1);
    -
    - if (ast.getChildCount() == 4) {
    - isLocal = true;
    - isOverWrite = true;
    - }
    -
    - if (ast.getChildCount() == 3) {
    - if (ast.getChild(2).getText().toLowerCase().equals("local")) {
    - isLocal = true;
    - } else {
    - isOverWrite = true;
    - }
    - }
    -
    - // initialize load path
    - URI fromURI;
    - try {
    - String fromPath = stripQuotes(fromTree.getText());
    - fromURI = initializeFromURI(fromPath, isLocal);
    - } catch (IOException e) {
    - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
    - .getMessage()), e);
    - } catch (URISyntaxException e) {
    - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
    - .getMessage()), e);
    - }
    -
    - // initialize destination table/partition
    - TableSpec ts = new TableSpec(db, conf, (ASTNode) tableTree);
    -
    - if (ts.tableHandle.isOffline()){
    - throw new SemanticException(
    - ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg(":Table " + ts.tableName));
    - }
    -
    - if (ts.tableHandle.isView()) {
    - throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
    - }
    - if (ts.tableHandle.isNonNative()) {
    - throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
    - }
    -
    - if(ts.tableHandle.isStoredAsSubDirectories()) {
    - throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
    - }
    -
    - URI toURI = ((ts.partHandle != null) ? ts.partHandle.getDataLocation()
    - : ts.tableHandle.getDataLocation()).toUri();
    -
    - List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
    - if ((parts != null && parts.size() > 0)
    - && (ts.partSpec == null || ts.partSpec.size() == 0)) {
    - throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
    - }
    -
    - // make sure the arguments make sense
    - FileStatus[] files = applyConstraintsAndGetFiles(fromURI, toURI, fromTree, isLocal);
    -
    - // for managed tables, make sure the file formats match
    - if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())) {
    - ensureFileFormatsMatch(ts, files);
    - }
    - inputs.add(toReadEntity(new Path(fromURI)));
    - Task<? extends Serializable> rTask = null;
    -
    - // create final load/move work
    -
    - boolean preservePartitionSpecs = false;
    -
    - Map<String, String> partSpec = ts.getPartSpec();
    - if (partSpec == null) {
    - partSpec = new LinkedHashMap<String, String>();
    - outputs.add(new WriteEntity(ts.tableHandle,
    - (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
    - WriteEntity.WriteType.INSERT)));
    - } else {
    - try{
    - Partition part = Hive.get().getPartition(ts.tableHandle, partSpec, false);
    - if (part != null) {
    - if (part.isOffline()) {
    - throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION.
    - getMsg(ts.tableName + ":" + part.getName()));
    - }
    - if (isOverWrite){
    - outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT_OVERWRITE));
    - } else {
    - outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT));
    - // If partition already exists and we aren't overwriting it, then respect
    - // its current location info rather than picking it from the parent TableDesc
    - preservePartitionSpecs = true;
    - }
    - } else {
    - outputs.add(new WriteEntity(ts.tableHandle,
    - (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
    - WriteEntity.WriteType.INSERT)));
    - }
    - } catch(HiveException e) {
    - throw new SemanticException(e);
    - }
    - }
    -
    -
    - LoadTableDesc loadTableWork;
    - loadTableWork = new LoadTableDesc(new Path(fromURI),
    - Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
    - if (preservePartitionSpecs){
    - // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
    - // but preservePartitionSpecs=false(default) here is not sufficient enough
    - // info to set inheritTableSpecs=true
    - loadTableWork.setInheritTableSpecs(false);
    - }
    -
    - Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
    - getOutputs(), loadTableWork, null, true, isLocal), conf);
    - if (rTask != null) {
    - rTask.addDependentTask(childTask);
    - } else {
    - rTask = childTask;
    - }
    -
    - rootTasks.add(rTask);
    -
    - // The user asked for stats to be collected.
    - // Some stats like number of rows require a scan of the data
    - // However, some other stats, like number of files, do not require a complete scan
    - // Update the stats which do not require a complete scan.
    - Task<? extends Serializable> statTask = null;
    - if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
    - StatsWork statDesc = new StatsWork(loadTableWork);
    - statDesc.setNoStatsAggregator(true);
    - statDesc.setClearAggregatorStats(true);
    - statDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
    - statTask = TaskFactory.get(statDesc, conf);
    - }
    -
    - // HIVE-3334 has been filed for load file with index auto update
    - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
    - IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf);
    - try {
    - List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater.generateUpdateTasks();
    -
    - for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
    - //LOAD DATA will either have a copy & move or just a move,
    - // we always want the update to be dependent on the move
    - childTask.addDependentTask(updateTask);
    - if (statTask != null) {
    - updateTask.addDependentTask(statTask);
    - }
    - }
    - } catch (HiveException e) {
    - console.printInfo("WARNING: could not auto-update stale indexes, indexes are not out of sync");
    - }
    - }
    - else if (statTask != null) {
    - childTask.addDependentTask(statTask);
    - }
    - }
    -
    - private void ensureFileFormatsMatch(TableSpec ts, FileStatus[] fileStatuses) throws SemanticException {
    - final Class<? extends InputFormat> destInputFormat;
    - try {
    - if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) {
    - destInputFormat = ts.tableHandle.getInputFormatClass();
    - } else {
    - destInputFormat = ts.partHandle.getInputFormatClass();
    - }
    - } catch (HiveException e) {
    - throw new SemanticException(e);
    - }
    -
    - // Other file formats should do similar check to make sure file formats match
    - // when doing LOAD DATA .. INTO TABLE
    - if (OrcInputFormat.class.equals(destInputFormat)) {
    - for (FileStatus fileStatus : fileStatuses) {
    - try {
    - Path filePath = fileStatus.getPath();
    - FileSystem fs = FileSystem.get(filePath.toUri(), conf);
    - // just creating orc reader is going to do sanity checks to make sure its valid ORC file
    - OrcFile.createReader(fs, filePath);
    - } catch (FileFormatException e) {
    - throw new SemanticException(ErrorMsg.INVALID_FILE_FORMAT_IN_LOAD.getMsg("Destination" +
    - " table is stored as ORC but the file being loaded is not a valid ORC file."));
    - } catch (IOException e) {
    - throw new SemanticException("Unable to load data to destination table." +
    - " Error: " + e.getMessage());
    - }
    - }
    - }
    - }
    -}

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 30, '16 at 7:17p
activeMar 30, '16 at 7:20p
posts3
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 3 posts

People

Translate

site design / logo © 2021 Grokbase