FAQ
HIVE-10249 ACID: show locks should show who the lock is waiting for (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6f6c4ac
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6f6c4ac
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6f6c4ac

Branch: refs/heads/branch-1
Commit: b6f6c4acba3b863851d197a7e6e2e9f4b851da70
Parents: a275951
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Wed Mar 30 12:18:20 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Wed Mar 30 12:18:20 2016 -0700

----------------------------------------------------------------------
  .../hadoop/hive/metastore/txn/TxnDbUtil.java | 6 +-
  .../hadoop/hive/metastore/txn/TxnHandler.java | 46 +-
  .../hive/metastore/txn/TestTxnHandler.java | 2 +-
  .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 +-
  .../exec/vector/VectorizedBatchUtil.java.orig | 707 -----------
  .../ql/parse/LoadSemanticAnalyzer.java.orig | 360 ------
  .../ql/io/orc/TestOrcRawRecordMerger.java.orig | 1150 ------------------
  .../hive/ql/lockmgr/TestDbTxnManager2.java | 28 +
  .../clientpositive/dbtxnmgr_showlocks.q.out | 6 +-
  9 files changed, 89 insertions(+), 2232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 56c9ed8..2e24678 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -103,7 +103,11 @@ public final class TxnDbUtil {
            " HL_ACQUIRED_AT bigint," +
            " HL_USER varchar(128) NOT NULL," +
            " HL_HOST varchar(128) NOT NULL," +
- " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+ " HL_HEARTBEAT_COUNT integer," +
+ " HL_AGENT_INFO varchar(128)," +
+ " HL_BLOCKEDBY_EXT_ID bigint," +
+ " HL_BLOCKEDBY_INT_ID bigint," +
+ " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
        stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");

        stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index a3b0751..ed4a3c2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -869,8 +869,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     */
    private static class LockInfoExt extends LockInfo {
      private final ShowLocksResponseElement e;
- LockInfoExt(ShowLocksResponseElement e, long intLockId) {
- super(e, intLockId);
+ LockInfoExt(ShowLocksResponseElement e) {
+ super(e);
        this.e = e;
      }
    }
@@ -886,7 +886,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          stmt = dbConn.createStatement();

          String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS";
+ "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
+ "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS";
          LOG.debug("Doing to execute query <" + s + ">");
          ResultSet rs = stmt.executeQuery(s);
          while (rs.next()) {
@@ -914,7 +915,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
            e.setUser(rs.getString(10));
            e.setHostname(rs.getString(11));
- sortedList.add(new LockInfoExt(e, rs.getLong(12)));
+ e.setLockIdInternal(rs.getLong(12));
+ long id = rs.getLong(13);
+ if(!rs.wasNull()) {
+ e.setBlockedByExtId(id);
+ }
+ id = rs.getLong(14);
+ if(!rs.wasNull()) {
+ e.setBlockedByIntId(id);
+ }
+ sortedList.add(new LockInfoExt(e));
          }
          LOG.debug("Going to rollback");
          dbConn.rollback();
@@ -1164,6 +1174,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    private static void shouldNeverHappen(long txnid) {
      throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
    }
+ private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) {
+ throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
+ + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
+ }
    public void addDynamicPartitions(AddDynamicPartitions rqst)
        throws NoSuchTxnException, TxnAbortedException, MetaException {
      Connection dbConn = null;
@@ -1711,15 +1725,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        }
        txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL
      }
- LockInfo(ShowLocksResponseElement e, long intLockId) {
+ LockInfo(ShowLocksResponseElement e) {
        extLockId = e.getLockid();
- this.intLockId = intLockId;
+ intLockId = e.getLockIdInternal();
+ txnId = e.getTxnid();
        db = e.getDbname();
        table = e.getTablename();
        partition = e.getPartname();
        state = e.getState();
        type = e.getType();
- txnId = e.getTxnid();
      }

      public boolean equals(Object other) {
@@ -2018,9 +2032,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {

      LOG.debug("Going to execute query <" + query.toString() + ">");
      Statement stmt = null;
+ ResultSet rs = null;
      try {
        stmt = dbConn.createStatement();
- ResultSet rs = stmt.executeQuery(query.toString());
+ rs = stmt.executeQuery(query.toString());
        SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
        while (rs.next()) {
          lockSet.add(new LockInfo(rs));
@@ -2091,7 +2106,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            switch (lockAction) {
              case WAIT:
                if(!ignoreConflict(info, locks[i])) {
+ /*we acquire all locks for a given query atomically; if 1 blocks, all go into (remain) in
+ * Waiting state. wait() will undo any 'acquire()' which may have happened as part of
+ * this (metastore db) transaction and then we record which lock blocked the lock
+ * we were testing ('info').*/
                  wait(dbConn, save);
+ String sqlText = "update HIVE_LOCKS" +
+ " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId +
+ ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId +
+ " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId;
+ LOG.debug("Executing sql: " + sqlText);
+ int updCnt = stmt.executeUpdate(sqlText);
+ if(updCnt != 1) {
+ shouldNeverHappen(info.txnId, info.extLockId, info.intLockId);
+ }
                  LOG.debug("Going to commit");
                  dbConn.commit();
                  response.setState(LockState.WAITING);
@@ -2120,7 +2148,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        dbConn.commit();
        response.setState(LockState.ACQUIRED);
      } finally {
- closeStmt(stmt);
+ close(rs, stmt, null);
      }
      return response;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 6033c15..4d3c3e1 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1114,7 +1114,7 @@ public class TestTxnHandler {
    }

    @Test
- @Ignore
+ @Ignore("Wedges Derby")
    public void deadlockDetected() throws Exception {
      LOG.debug("Starting deadlock test");
      if (txnHandler instanceof TxnHandler) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 414293c..816d8d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2512,6 +2512,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
      os.write(separator);
      os.writeBytes("State");
      os.write(separator);
+ os.writeBytes("Blocked By");
+ os.write(separator);
      os.writeBytes("Type");
      os.write(separator);
      os.writeBytes("Transaction ID");
@@ -2528,7 +2530,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
      List<ShowLocksResponseElement> locks = rsp.getLocks();
      if (locks != null) {
        for (ShowLocksResponseElement lock : locks) {
- os.writeBytes(Long.toString(lock.getLockid()));
+ if(lock.isSetLockIdInternal()) {
+ os.writeBytes(Long.toString(lock.getLockid()) + "." + Long.toString(lock.getLockIdInternal()));
+ }
+ else {
+ os.writeBytes(Long.toString(lock.getLockid()));
+ }
          os.write(separator);
          os.writeBytes(lock.getDbname());
          os.write(separator);
@@ -2538,6 +2545,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
          os.write(separator);
          os.writeBytes(lock.getState().toString());
          os.write(separator);
+ if(lock.isSetBlockedByExtId()) {//both "blockedby" are either there or not
+ os.writeBytes(Long.toString(lock.getBlockedByExtId()) + "." + Long.toString(lock.getBlockedByIntId()));
+ }
+ else {
+ os.writeBytes(" ");//12 chars - try to keep cols aligned
+ }
+ os.write(separator);
          os.writeBytes(lock.getType().toString());
          os.write(separator);
          os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid()));

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
deleted file mode 100644
index af43a07..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java.orig
+++ /dev/null
@@ -1,707 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
-import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hive.common.util.DateUtils;
-
-public class VectorizedBatchUtil {
- private static final Log LOG = LogFactory.getLog(VectorizedBatchUtil.class);
-
- /**
- * Sets the IsNull value for ColumnVector at specified index
- * @param cv
- * @param rowIndex
- */
- public static void setNullColIsNullValue(ColumnVector cv, int rowIndex) {
- cv.isNull[rowIndex] = true;
- if (cv.noNulls) {
- cv.noNulls = false;
- }
- }
-
- /**
- * Iterates thru all the column vectors and sets noNull to
- * specified value.
- *
- * @param batch
- * Batch on which noNull is set
- */
- public static void setNoNullFields(VectorizedRowBatch batch) {
- for (int i = 0; i < batch.numCols; i++) {
- batch.cols[i].noNulls = true;
- }
- }
-
- /**
- * Iterates thru all the column vectors and sets repeating to
- * specified column.
- *
- */
- public static void setRepeatingColumn(VectorizedRowBatch batch, int column) {
- ColumnVector cv = batch.cols[column];
- cv.isRepeating = true;
- }
-
- /**
- * Reduce the batch size for a vectorized row batch
- */
- public static void setBatchSize(VectorizedRowBatch batch, int size) {
- assert (size <= batch.getMaxSize());
- batch.size = size;
- }
-
- /**
- * Walk through the object inspector and add column vectors
- *
- * @param oi
- * @param cvList
- * ColumnVectors are populated in this list
- */
- private static void allocateColumnVector(StructObjectInspector oi,
- List<ColumnVector> cvList) throws HiveException {
- if (cvList == null) {
- throw new HiveException("Null columnvector list");
- }
- if (oi == null) {
- return;
- }
- final List<? extends StructField> fields = oi.getAllStructFieldRefs();
- for(StructField field : fields) {
- ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
- switch(fieldObjectInspector.getCategory()) {
- case PRIMITIVE:
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector;
- switch(poi.getPrimitiveCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case TIMESTAMP:
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case INTERVAL_DAY_TIME:
- cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
- break;
- case FLOAT:
- case DOUBLE:
- cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
- break;
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
- break;
- case DECIMAL:
- DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
- cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- tInfo.precision(), tInfo.scale()));
- break;
- default:
- throw new HiveException("Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
- }
- break;
- case STRUCT:
- throw new HiveException("Struct not supported");
- default:
- throw new HiveException("Flattening is not supported for datatype:"
- + fieldObjectInspector.getCategory());
- }
- }
- }
-
-
- /**
- * Create VectorizedRowBatch from ObjectInspector
- *
- * @param oi
- * @return
- * @throws HiveException
- */
- public static VectorizedRowBatch constructVectorizedRowBatch(
- StructObjectInspector oi) throws HiveException {
- final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
- allocateColumnVector(oi, cvList);
- final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
- int i = 0;
- for(ColumnVector cv : cvList) {
- result.cols[i++] = cv;
- }
- return result;
- }
-
- /**
- * Create VectorizedRowBatch from key and value object inspectors
- * The row object inspector used by ReduceWork needs to be a **standard**
- * struct object inspector, not just any struct object inspector.
- * @param keyInspector
- * @param valueInspector
- * @param vectorScratchColumnTypeMap
- * @return VectorizedRowBatch, OI
- * @throws HiveException
- */
- public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
- StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
- throws HiveException {
-
- ArrayList<String> colNames = new ArrayList<String>();
- ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
- List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- fields = valueInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
-
- VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
- batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
- return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
- }
-
- /**
- * Iterates through all columns in a given row and populates the batch
- *
- * @param row
- * @param oi
- * @param rowIndex
- * @param batch
- * @param buffer
- * @throws HiveException
- */
- public static void addRowToBatch(Object row, StructObjectInspector oi,
- int rowIndex,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer
- ) throws HiveException {
- addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
- }
-
- /**
- * Iterates thru all the columns in a given row and populates the batch
- * from a given offset
- *
- * @param row Deserialized row object
- * @param oi Object insepector for that row
- * @param rowIndex index to which the row should be added to batch
- * @param colOffset offset from where the column begins
- * @param batch Vectorized batch to which the row is added at rowIndex
- * @throws HiveException
- */
- public static void addRowToBatchFrom(Object row, StructObjectInspector oi,
- int rowIndex,
- int colOffset,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer
- ) throws HiveException {
- List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
- final int off = colOffset;
- // Iterate thru the cols and load the batch
- for (int i = 0; i < fieldRefs.size(); i++) {
- setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, off);
- }
- }
-
- /**
- * Add only the projected column of a regular row to the specified vectorized row batch
- * @param row the regular row
- * @param oi object inspector for the row
- * @param rowIndex the offset to add in the batch
- * @param batch vectorized row batch
- * @param buffer data output buffer
- * @throws HiveException
- */
- public static void addProjectedRowToBatchFrom(Object row, StructObjectInspector oi,
- int rowIndex, VectorizedRowBatch batch, DataOutputBuffer buffer) throws HiveException {
- List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
- for (int i = 0; i < fieldRefs.size(); i++) {
- int projectedOutputCol = batch.projectedColumns[i];
- if (batch.cols[projectedOutputCol] == null) {
- continue;
- }
- setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, projectedOutputCol, 0);
- }
- }
- /**
- * Iterates thru all the columns in a given row and populates the batch
- * from a given offset
- *
- * @param row Deserialized row object
- * @param oi Object insepector for that row
- * @param rowIndex index to which the row should be added to batch
- * @param batch Vectorized batch to which the row is added at rowIndex
- * @param context context object for this vectorized batch
- * @param buffer
- * @throws HiveException
- */
- public static void acidAddRowToBatch(Object row,
- StructObjectInspector oi,
- int rowIndex,
- VectorizedRowBatch batch,
- VectorizedRowBatchCtx context,
- DataOutputBuffer buffer) throws HiveException {
- List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
- // Iterate thru the cols and load the batch
- for (int i = 0; i < fieldRefs.size(); i++) {
- if (batch.cols[i] == null) {
- // This means the column was not included in the projection from the underlying read
- continue;
- }
- if (context.isPartitionCol(i)) {
- // The value will have already been set before we're called, so don't overwrite it
- continue;
- }
- setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, 0);
- }
- }
-
- private static void setVector(Object row,
- StructObjectInspector oi,
- StructField field,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer,
- int rowIndex,
- int colIndex,
- int offset) throws HiveException {
-
- Object fieldData = oi.getStructFieldData(row, field);
- ObjectInspector foi = field.getFieldObjectInspector();
-
- // Vectorization only supports PRIMITIVE data types. Assert the same
- assert (foi.getCategory() == Category.PRIMITIVE);
-
- // Get writable object
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
- Object writableCol = poi.getPrimitiveWritableObject(fieldData);
-
- // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
- // float/double. String types have no default value for null.
- switch (poi.getPrimitiveCategory()) {
- case BOOLEAN: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((BooleanWritable) writableCol).get() ? 1 : 0;
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case BYTE: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((ByteWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case SHORT: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case INT: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case LONG: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case DATE: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((DateWritable) writableCol).getDays();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case FLOAT: {
- DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
- dcv.isNull[rowIndex] = false;
- } else {
- dcv.vector[rowIndex] = Double.NaN;
- setNullColIsNullValue(dcv, rowIndex);
- }
- }
- break;
- case DOUBLE: {
- DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
- dcv.isNull[rowIndex] = false;
- } else {
- dcv.vector[rowIndex] = Double.NaN;
- setNullColIsNullValue(dcv, rowIndex);
- }
- }
- break;
- case TIMESTAMP: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
- lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t);
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case INTERVAL_YEAR_MONTH: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- HiveIntervalYearMonth i = ((HiveIntervalYearMonthWritable) writableCol).getHiveIntervalYearMonth();
- lcv.vector[rowIndex] = i.getTotalMonths();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case INTERVAL_DAY_TIME: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- HiveIntervalDayTime i = ((HiveIntervalDayTimeWritable) writableCol).getHiveIntervalDayTime();
- lcv.vector[rowIndex] = DateUtils.getIntervalDayTimeTotalNanos(i);
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- setNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case BINARY: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- bcv.isNull[rowIndex] = false;
- BytesWritable bw = (BytesWritable) writableCol;
- byte[] bytes = bw.getBytes();
- int start = buffer.getLength();
- int length = bw.getLength();
- try {
- buffer.write(bytes, 0, length);
- } catch (IOException ioe) {
- throw new IllegalStateException("bad write", ioe);
- }
- bcv.setRef(rowIndex, buffer.getData(), start, length);
- } else {
- setNullColIsNullValue(bcv, rowIndex);
- }
- }
- break;
- case STRING: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- bcv.isNull[rowIndex] = false;
- Text colText = (Text) writableCol;
- int start = buffer.getLength();
- int length = colText.getLength();
- try {
- buffer.write(colText.getBytes(), 0, length);
- } catch (IOException ioe) {
- throw new IllegalStateException("bad write", ioe);
- }
- bcv.setRef(rowIndex, buffer.getData(), start, length);
- } else {
- setNullColIsNullValue(bcv, rowIndex);
- }
- }
- break;
- case CHAR: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- bcv.isNull[rowIndex] = false;
- HiveChar colHiveChar = ((HiveCharWritable) writableCol).getHiveChar();
- byte[] bytes = colHiveChar.getStrippedValue().getBytes();
-
- // We assume the CHAR maximum length was enforced when the object was created.
- int length = bytes.length;
-
- int start = buffer.getLength();
- try {
- // In vector mode, we store CHAR as unpadded.
- buffer.write(bytes, 0, length);
- } catch (IOException ioe) {
- throw new IllegalStateException("bad write", ioe);
- }
- bcv.setRef(rowIndex, buffer.getData(), start, length);
- } else {
- setNullColIsNullValue(bcv, rowIndex);
- }
- }
- break;
- case VARCHAR: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- bcv.isNull[rowIndex] = false;
- HiveVarchar colHiveVarchar = ((HiveVarcharWritable) writableCol).getHiveVarchar();
- byte[] bytes = colHiveVarchar.getValue().getBytes();
-
- // We assume the VARCHAR maximum length was enforced when the object was created.
- int length = bytes.length;
-
- int start = buffer.getLength();
- try {
- buffer.write(bytes, 0, length);
- } catch (IOException ioe) {
- throw new IllegalStateException("bad write", ioe);
- }
- bcv.setRef(rowIndex, buffer.getData(), start, length);
- } else {
- setNullColIsNullValue(bcv, rowIndex);
- }
- }
- break;
- case DECIMAL:
- DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[offset + colIndex];
- if (writableCol != null) {
- dcv.isNull[rowIndex] = false;
- HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol;
- dcv.set(rowIndex, wobj);
- } else {
- setNullColIsNullValue(dcv, rowIndex);
- }
- break;
- default:
- throw new HiveException("Vectorizaton is not supported for datatype:" +
- poi.getPrimitiveCategory());
- }
- }
-
- public static StandardStructObjectInspector convertToStandardStructObjectInspector(
- StructObjectInspector structObjectInspector) throws HiveException {
-
- List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
- List<ObjectInspector> oids = new ArrayList<ObjectInspector>();
- ArrayList<String> columnNames = new ArrayList<String>();
-
- for(StructField field : fields) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
- field.getFieldObjectInspector().getTypeName());
- ObjectInspector standardWritableObjectInspector =
- TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
- oids.add(standardWritableObjectInspector);
- columnNames.add(field.getFieldName());
- }
- return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
- }
-
- public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
- StructObjectInspector structObjectInspector) throws HiveException {
-
- List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
- PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
-
- int i = 0;
- for(StructField field : fields) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
- field.getFieldObjectInspector().getTypeName());
- result[i++] = (PrimitiveTypeInfo) typeInfo;
- }
- return result;
- }
-
- public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
- String[] typeNames) throws HiveException {
-
- PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
-
- for(int i = 0; i < typeNames.length; i++) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
- result[i] = (PrimitiveTypeInfo) typeInfo;
- }
- return result;
- }
-
- /**
- * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty
- * @param batch the batch to imitate
- * @return the new batch
- * @throws HiveException
- */
- public static VectorizedRowBatch makeLike(VectorizedRowBatch batch) throws HiveException {
- VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols);
- for (int i = 0; i < batch.numCols; i++) {
- ColumnVector colVector = batch.cols[i];
- if (colVector != null) {
- ColumnVector newColVector;
- if (colVector instanceof LongColumnVector) {
- newColVector = new LongColumnVector();
- } else if (colVector instanceof DoubleColumnVector) {
- newColVector = new DoubleColumnVector();
- } else if (colVector instanceof BytesColumnVector) {
- newColVector = new BytesColumnVector();
- } else if (colVector instanceof DecimalColumnVector) {
- DecimalColumnVector decColVector = (DecimalColumnVector) colVector;
- newColVector = new DecimalColumnVector(decColVector.precision, decColVector.scale);
- } else {
- throw new HiveException("Column vector class " + colVector.getClass().getName() +
- " is not supported!");
- }
- newBatch.cols[i] = newColVector;
- newBatch.cols[i].init();
- }
- }
- newBatch.projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectedColumns.length);
- newBatch.projectionSize = batch.projectionSize;
- newBatch.reset();
- return newBatch;
- }
-
- public static String displayBytes(byte[] bytes, int start, int length) {
- StringBuilder sb = new StringBuilder();
- for (int i = start; i < start + length; i++) {
- char ch = (char) bytes[i];
- if (ch < ' ' || ch > '~') {
- sb.append(String.format("\\%03d", bytes[i] & 0xff));
- } else {
- sb.append(ch);
- }
- }
- return sb.toString();
- }
-
- public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
- StringBuilder sb = new StringBuilder();
- sb.append(prefix + " row " + index + " ");
- for (int column = 0; column < batch.cols.length; column++) {
- ColumnVector colVector = batch.cols[column];
- if (colVector == null) {
- sb.append("(null colVector " + column + ")");
- } else {
- boolean isRepeating = colVector.isRepeating;
- index = (isRepeating ? 0 : index);
- if (colVector.noNulls || !colVector.isNull[index]) {
- if (colVector instanceof LongColumnVector) {
- sb.append(((LongColumnVector) colVector).vector[index]);
- } else if (colVector instanceof DoubleColumnVector) {
- sb.append(((DoubleColumnVector) colVector).vector[index]);
- } else if (colVector instanceof BytesColumnVector) {
- BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector;
- byte[] bytes = bytesColumnVector.vector[index];
- int start = bytesColumnVector.start[index];
- int length = bytesColumnVector.length[index];
- if (bytes == null) {
- sb.append("(Unexpected null bytes with start " + start + " length " + length + ")");
- } else {
- sb.append("bytes: '" + displayBytes(bytes, start, length) + "'");
- }
- } else if (colVector instanceof DecimalColumnVector) {
- sb.append(((DecimalColumnVector) colVector).vector[index].toString());
- } else {
- sb.append("Unknown");
- }
- } else {
- sb.append("NULL");
- }
- }
- sb.append(" ");
- }
- LOG.info(sb.toString());
- }
-
- public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {
- for (int i = 0; i < batch.size; i++) {
- int index = (batch.selectedInUse ? batch.selected[i] : i);
- debugDisplayOneRow(batch, index, prefix);
- }
- }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b6f6c4ac/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
deleted file mode 100644
index aacfa92..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java.orig
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.parse;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.antlr.runtime.tree.Tree;
-import org.apache.commons.httpclient.util.URIUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.FileFormatException;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.mapred.InputFormat;
-
-/**
- * LoadSemanticAnalyzer.
- *
- */
-public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
-
- public LoadSemanticAnalyzer(HiveConf conf) throws SemanticException {
- super(conf);
- }
-
- public static FileStatus[] matchFilesOrDir(FileSystem fs, Path path)
- throws IOException {
- FileStatus[] srcs = fs.globStatus(path, new PathFilter() {
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith(".");
- }
- });
- if ((srcs != null) && srcs.length == 1) {
- if (srcs[0].isDir()) {
- srcs = fs.listStatus(srcs[0].getPath(), new PathFilter() {
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- });
- }
- }
- return (srcs);
- }
-
- private URI initializeFromURI(String fromPath, boolean isLocal) throws IOException,
- URISyntaxException {
- URI fromURI = new Path(fromPath).toUri();
-
- String fromScheme = fromURI.getScheme();
- String fromAuthority = fromURI.getAuthority();
- String path = fromURI.getPath();
-
- // generate absolute path relative to current directory or hdfs home
- // directory
- if (!path.startsWith("/")) {
- if (isLocal) {
- path = URIUtil.decode(
- new Path(System.getProperty("user.dir"), fromPath).toUri().toString());
- } else {
- path = new Path(new Path("/user/" + System.getProperty("user.name")),
- path).toString();
- }
- }
-
- // set correct scheme and authority
- if (StringUtils.isEmpty(fromScheme)) {
- if (isLocal) {
- // file for local
- fromScheme = "file";
- } else {
- // use default values from fs.default.name
- URI defaultURI = FileSystem.get(conf).getUri();
- fromScheme = defaultURI.getScheme();
- fromAuthority = defaultURI.getAuthority();
- }
- }
-
- // if scheme is specified but not authority then use the default authority
- if ((!fromScheme.equals("file")) && StringUtils.isEmpty(fromAuthority)) {
- URI defaultURI = FileSystem.get(conf).getUri();
- fromAuthority = defaultURI.getAuthority();
- }
-
- LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
- return new URI(fromScheme, fromAuthority, path, null, null);
- }
-
- private FileStatus[] applyConstraintsAndGetFiles(URI fromURI, URI toURI, Tree ast,
- boolean isLocal) throws SemanticException {
-
- FileStatus[] srcs = null;
-
- // local mode implies that scheme should be "file"
- // we can change this going forward
- if (isLocal && !fromURI.getScheme().equals("file")) {
- throw new SemanticException(ErrorMsg.ILLEGAL_PATH.getMsg(ast,
- "Source file system should be \"file\" if \"local\" is specified"));
- }
-
- try {
- srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new Path(fromURI));
- if (srcs == null || srcs.length == 0) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
- "No files matching path " + fromURI));
- }
-
- for (FileStatus oneSrc : srcs) {
- if (oneSrc.isDir()) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
- "source contains directory: " + oneSrc.getPath().toString()));
- }
- }
- } catch (IOException e) {
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
- }
-
- return srcs;
- }
-
- @Override
- public void analyzeInternal(ASTNode ast) throws SemanticException {
- boolean isLocal = false;
- boolean isOverWrite = false;
- Tree fromTree = ast.getChild(0);
- Tree tableTree = ast.getChild(1);
-
- if (ast.getChildCount() == 4) {
- isLocal = true;
- isOverWrite = true;
- }
-
- if (ast.getChildCount() == 3) {
- if (ast.getChild(2).getText().toLowerCase().equals("local")) {
- isLocal = true;
- } else {
- isOverWrite = true;
- }
- }
-
- // initialize load path
- URI fromURI;
- try {
- String fromPath = stripQuotes(fromTree.getText());
- fromURI = initializeFromURI(fromPath, isLocal);
- } catch (IOException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
- .getMessage()), e);
- } catch (URISyntaxException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
- .getMessage()), e);
- }
-
- // initialize destination table/partition
- TableSpec ts = new TableSpec(db, conf, (ASTNode) tableTree);
-
- if (ts.tableHandle.isOffline()){
- throw new SemanticException(
- ErrorMsg.OFFLINE_TABLE_OR_PARTITION.getMsg(":Table " + ts.tableName));
- }
-
- if (ts.tableHandle.isView()) {
- throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
- }
- if (ts.tableHandle.isNonNative()) {
- throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
- }
-
- if(ts.tableHandle.isStoredAsSubDirectories()) {
- throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
- }
-
- URI toURI = ((ts.partHandle != null) ? ts.partHandle.getDataLocation()
- : ts.tableHandle.getDataLocation()).toUri();
-
- List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
- if ((parts != null && parts.size() > 0)
- && (ts.partSpec == null || ts.partSpec.size() == 0)) {
- throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
- }
-
- // make sure the arguments make sense
- FileStatus[] files = applyConstraintsAndGetFiles(fromURI, toURI, fromTree, isLocal);
-
- // for managed tables, make sure the file formats match
- if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())) {
- ensureFileFormatsMatch(ts, files);
- }
- inputs.add(toReadEntity(new Path(fromURI)));
- Task<? extends Serializable> rTask = null;
-
- // create final load/move work
-
- boolean preservePartitionSpecs = false;
-
- Map<String, String> partSpec = ts.getPartSpec();
- if (partSpec == null) {
- partSpec = new LinkedHashMap<String, String>();
- outputs.add(new WriteEntity(ts.tableHandle,
- (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
- WriteEntity.WriteType.INSERT)));
- } else {
- try{
- Partition part = Hive.get().getPartition(ts.tableHandle, partSpec, false);
- if (part != null) {
- if (part.isOffline()) {
- throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION.
- getMsg(ts.tableName + ":" + part.getName()));
- }
- if (isOverWrite){
- outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT_OVERWRITE));
- } else {
- outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT));
- // If partition already exists and we aren't overwriting it, then respect
- // its current location info rather than picking it from the parent TableDesc
- preservePartitionSpecs = true;
- }
- } else {
- outputs.add(new WriteEntity(ts.tableHandle,
- (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
- WriteEntity.WriteType.INSERT)));
- }
- } catch(HiveException e) {
- throw new SemanticException(e);
- }
- }
-
-
- LoadTableDesc loadTableWork;
- loadTableWork = new LoadTableDesc(new Path(fromURI),
- Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
- if (preservePartitionSpecs){
- // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
- // but preservePartitionSpecs=false(default) here is not sufficient enough
- // info to set inheritTableSpecs=true
- loadTableWork.setInheritTableSpecs(false);
- }
-
- Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
- getOutputs(), loadTableWork, null, true, isLocal), conf);
- if (rTask != null) {
- rTask.addDependentTask(childTask);
- } else {
- rTask = childTask;
- }
-
- rootTasks.add(rTask);
-
- // The user asked for stats to be collected.
- // Some stats like number of rows require a scan of the data
- // However, some other stats, like number of files, do not require a complete scan
- // Update the stats which do not require a complete scan.
- Task<? extends Serializable> statTask = null;
- if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- StatsWork statDesc = new StatsWork(loadTableWork);
- statDesc.setNoStatsAggregator(true);
- statDesc.setClearAggregatorStats(true);
- statDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- statTask = TaskFactory.get(statDesc, conf);
- }
-
- // HIVE-3334 has been filed for load file with index auto update
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
- IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf);
- try {
- List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater.generateUpdateTasks();
-
- for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
- //LOAD DATA will either have a copy & move or just a move,
- // we always want the update to be dependent on the move
- childTask.addDependentTask(updateTask);
- if (statTask != null) {
- updateTask.addDependentTask(statTask);
- }
- }
- } catch (HiveException e) {
- console.printInfo("WARNING: could not auto-update stale indexes, indexes are not out of sync");
- }
- }
- else if (statTask != null) {
- childTask.addDependentTask(statTask);
- }
- }
-
- private void ensureFileFormatsMatch(TableSpec ts, FileStatus[] fileStatuses) throws SemanticException {
- final Class<? extends InputFormat> destInputFormat;
- try {
- if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) {
- destInputFormat = ts.tableHandle.getInputFormatClass();
- } else {
- destInputFormat = ts.partHandle.getInputFormatClass();
- }
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
-
- // Other file formats should do similar check to make sure file formats match
- // when doing LOAD DATA .. INTO TABLE
- if (OrcInputFormat.class.equals(destInputFormat)) {
- for (FileStatus fileStatus : fileStatuses) {
- try {
- Path filePath = fileStatus.getPath();
- FileSystem fs = FileSystem.get(filePath.toUri(), conf);
- // just creating orc reader is going to do sanity checks to make sure its valid ORC file
- OrcFile.createReader(fs, filePath);
- } catch (FileFormatException e) {
- throw new SemanticException(ErrorMsg.INVALID_FILE_FORMAT_IN_LOAD.getMsg("Destination" +
- " table is stored as ORC but the file being loaded is not a valid ORC file."));
- } catch (IOException e) {
- throw new SemanticException("Unable to load data to destination table." +
- " Error: " + e.getMessage());
- }
- }
- }
- }
-}

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 3 of 3 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 30, '16 at 7:17p
activeMar 30, '16 at 7:20p
posts3
users1
websitehive.apache.org

1 user in discussion

Ekoifman: 3 posts

People

Translate

site design / logo © 2021 Grokbase