FAQ
Repository: hive
Updated Branches:
   refs/heads/master 522bb600b -> 24988f77f


HIVE-11972 : [Refactor] Improve determination of dynamic partitioning columns in FileSink Operator (Ashutosh Chauhan via Prasanth J)

Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/24988f77
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/24988f77
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/24988f77

Branch: refs/heads/master
Commit: 24988f77f2898bbcd91f5665b865bcc251e3cade
Parents: 522bb60
Author: Ashutosh Chauhan <hashutosh@apache.org>
Authored: Sat Sep 26 12:19:00 2015 -0800
Committer: Ashutosh Chauhan <hashutosh@apache.org>
Committed: Thu Oct 1 11:41:53 2015 -0700

----------------------------------------------------------------------
  .../hadoop/hive/ql/exec/FileSinkOperator.java | 19 +-
  .../apache/hadoop/hive/ql/exec/Utilities.java | 17 +
  .../optimizer/ConstantPropagateProcFactory.java | 11 +-
  .../hive/ql/optimizer/GenMapRedUtils.java | 10 +-
  .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 30 +-
  .../hive/ql/plan/DynamicPartitionCtx.java | 27 --
  .../hive/ql/exec/TestFileSinkOperator.java | 384 ++++++++++++-------
  7 files changed, 284 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2604d5d..39944a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -493,24 +493,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
        assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has "
            + inputObjInspectors.length;
        StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
- // remove the last dpMapping.size() columns from the OI
- List<? extends StructField> fieldOI = soi.getAllStructFieldRefs();
- ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>();
- ArrayList<String> newFieldsName = new ArrayList<String>();
- this.dpStartCol = 0;
- for (StructField sf : fieldOI) {
- String fn = sf.getFieldName();
- if (!dpCtx.getInputToDPCols().containsKey(fn)) {
- newFieldsOI.add(sf.getFieldObjectInspector());
- newFieldsName.add(sf.getFieldName());
- this.dpStartCol++;
- } else {
- // once we found the start column for partition column we are done
- break;
- }
- }
- assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty";
-
+ this.dpStartCol = Utilities.getDPColOffset(conf);
        this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
        this.dpVals = new ArrayList<String>(numDynParts);
        this.dpWritables = new ArrayList<Object>(numDynParts);

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index bcf85a4..5b21af9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -119,6 +119,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
  import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
  import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
  import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
  import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
  import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -3916,4 +3917,20 @@ public final class Utilities {
        HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD, "");
      }
    }
+
+ public static int getDPColOffset(FileSinkDesc conf) {
+
+ if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns.
+ //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details.
+ return 1;
+ } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+ // For updates, ROW__ID is an extra column at index 0.
+ //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details.
+ return getColumnNames(conf.getTableInfo().getProperties()).size() + 1;
+ } else {
+ return getColumnNames(conf.getTableInfo().getProperties()).size();
+ }
+
+ }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
index 5c6a6df..25156b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
@@ -843,7 +843,7 @@ public final class ConstantPropagateProcFactory {
            }
          }
          if (constant.getTypeInfo().getCategory() != Category.PRIMITIVE) {
- // nested complex types cannot be folded cleanly
+ // nested complex types cannot be folded cleanly
            return null;
          }
          Object value = constant.getValue();
@@ -1163,16 +1163,15 @@ public final class ConstantPropagateProcFactory {
        DynamicPartitionCtx dpCtx = fsdesc.getDynPartCtx();
        if (dpCtx != null) {

- // If all dynamic partitions are propagated as constant, remove DP.
- Set<String> inputs = dpCtx.getInputToDPCols().keySet();
-
          // Assume only 1 parent for FS operator
          Operator<? extends Serializable> parent = op.getParentOperators().get(0);
          Map<ColumnInfo, ExprNodeDesc> parentConstants = cppCtx.getPropagatedConstants(parent);
          RowSchema rs = parent.getSchema();
          boolean allConstant = true;
- for (String input : inputs) {
- ColumnInfo ci = rs.getColumnInfo(input);
+ int dpColStartIdx = Utilities.getDPColOffset(fsdesc);
+ List<ColumnInfo> colInfos = rs.getSignature();
+ for (int i = dpColStartIdx; i < colInfos.size(); i++) {
+ ColumnInfo ci = colInfos.get(i);
            if (parentConstants.get(ci) == null) {
              allConstant = false;
              break;

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 02fbdfe..c696fd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -497,9 +497,6 @@ public final class GenMapRedUtils {
          partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
        } catch (SemanticException e) {
          throw e;
- } catch (HiveException e) {
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
        }
      }

@@ -990,7 +987,7 @@ public final class GenMapRedUtils {
      fileSinkOp.setParentOperators(Utilities.makeList(parent));

      // Create a dummy TableScanOperator for the file generated through fileSinkOp
- TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator(
+ TableScanOperator tableScanOp = createTemporaryTableScanOperator(
              parent.getSchema());

      // Connect this TableScanOperator to child.
@@ -1235,19 +1232,16 @@ public final class GenMapRedUtils {
        // adding DP ColumnInfo to the RowSchema signature
        ArrayList<ColumnInfo> signature = inputRS.getSignature();
        String tblAlias = fsInputDesc.getTableInfo().getTableName();
- LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
        for (String dpCol : dpCtx.getDPColNames()) {
          ColumnInfo colInfo = new ColumnInfo(dpCol,
              TypeInfoFactory.stringTypeInfo, // all partition column type should be string
              tblAlias, true); // partition column is virtual column
          signature.add(colInfo);
- colMap.put(dpCol, dpCol); // input and output have the same column name
        }
        inputRS.setSignature(signature);

        // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
        DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
- dpCtx2.setInputToDPCols(colMap);
        fsOutputDesc.setDynPartCtx(dpCtx2);

        // update the FileSinkOperator to include partition columns
@@ -1896,7 +1890,7 @@ public final class GenMapRedUtils {
          "Partition Names, " + Arrays.toString(partNames) + " don't match partition Types, "
          + Arrays.toString(partTypes));

- Map<String, String> typeMap = new HashMap();
+ Map<String, String> typeMap = new HashMap<>();
      for (int i = 0; i < partNames.length; i++) {
        String previousValue = typeMap.put(partNames[i], partTypes[i]);
        Preconditions.checkArgument(previousValue == null, "Partition columns configuration is inconsistent. "

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dbc6d8f..4bec228 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -736,7 +736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
      Path dataDir = null;
      if(!qb.getEncryptedTargetTablePaths().isEmpty()) {
        //currently only Insert into T values(...) is supported thus only 1 values clause
- //and only 1 target table are possible. If/when support for
+ //and only 1 target table are possible. If/when support for
        //select ... from values(...) is added an insert statement may have multiple
        //encrypted target tables.
        dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri());
@@ -1556,7 +1556,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {

        for (String alias : tabAliases) {
          String tab_name = qb.getTabNameForAlias(alias);
-
+
          // we first look for this alias from CTE, and then from catalog.
          /*
           * if this s a CTE reference: Add its AST as a SubQuery to this QB.
@@ -6830,30 +6830,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                .getColumnInfos()), input), rowResolver);
        input.setColumnExprMap(colExprMap);
      }
-
- rowFields = opParseCtx.get(input).getRowResolver()
- .getColumnInfos();
- if (deleting()) {
- // Figure out if we have partition columns in the list or not. If so,
- // add them into the mapping. Partition columns will be located after the row id.
- if (rowFields.size() > 1) {
- // This means we have partition columns to deal with, so set up the mapping from the
- // input to the partition columns.
- dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size()));
- }
- } else if (updating()) {
- // In this case we expect the number of in fields to exceed the number of out fields by one
- // (for the ROW__ID virtual column). If there are more columns than this,
- // then the extras are for dynamic partitioning
- if (dynPart && dpCtx != null) {
- dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size()));
- }
- } else {
- if (dynPart && dpCtx != null) {
- // create the mapping from input ExprNode to dest table DP column
- dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
- }
- }
      return input;
    }

@@ -10105,7 +10081,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
          return;
        }
        for (Node child : node.getChildren()) {
- //each insert of multi insert looks like
+ //each insert of multi insert looks like
          //(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1)))
          if (((ASTNode) child).getToken().getType() != HiveParser.TOK_INSERT) {
            continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 24db7d0..95d5635 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -19,14 +19,11 @@ package org.apache.hadoop.hive.ql.plan;

  import java.io.Serializable;
  import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
  import java.util.List;
  import java.util.Map;

  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
  import org.apache.hadoop.hive.ql.metadata.Table;

  public class DynamicPartitionCtx implements Serializable {
@@ -43,8 +40,6 @@ public class DynamicPartitionCtx implements Serializable {
    private Path rootPath; // the root path DP columns paths start from
    private int numBuckets; // number of buckets in each partition

- private Map<String, String> inputToDPCols; // mapping from input column names to DP columns
-
    private List<String> spNames; // sp column names
    private List<String> dpNames; // dp column names
    private String defaultPartName; // default partition name in case of null or empty value
@@ -71,7 +66,6 @@ public class DynamicPartitionCtx implements Serializable {
      }
      this.numDPCols = dpNames.size();
      this.numSPCols = spNames.size();
- this.inputToDPCols = new HashMap<String, String>();
      if (this.numSPCols > 0) {
        this.spPath = Warehouse.makeDynamicPartName(partSpec);
      } else {
@@ -86,25 +80,12 @@ public class DynamicPartitionCtx implements Serializable {
      this.spPath = dp.spPath;
      this.rootPath = dp.rootPath;
      this.numBuckets = dp.numBuckets;
- this.inputToDPCols = dp.inputToDPCols;
      this.spNames = dp.spNames;
      this.dpNames = dp.dpNames;
      this.defaultPartName = dp.defaultPartName;
      this.maxPartsPerNode = dp.maxPartsPerNode;
    }

- public void mapInputToDP(List<ColumnInfo> fs) {
-
- assert fs.size() == this.numDPCols: "input DP column size != numDPCols";
-
- Iterator<ColumnInfo> itr1 = fs.iterator();
- Iterator<String> itr2 = dpNames.iterator();
-
- while (itr1.hasNext() && itr2.hasNext()) {
- inputToDPCols.put(itr1.next().getInternalName(), itr2.next());
- }
- }
-
    public int getMaxPartitionsPerNode() {
      return this.maxPartsPerNode;
    }
@@ -161,14 +142,6 @@ public class DynamicPartitionCtx implements Serializable {
      this.spNames = sp;
    }

- public Map<String, String> getInputToDPCols() {
- return this.inputToDPCols;
- }
-
- public void setInputToDPCols(Map<String, String> map) {
- this.inputToDPCols = map;
- }
-
    public void setNumDPCols(int dp) {
      this.numDPCols = dp;
    }

http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index c6ae030..9e89376 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.StatsSetupConst;
  import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
  import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -45,12 +44,11 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
  import org.apache.hadoop.hive.ql.stats.StatsAggregator;
  import org.apache.hadoop.hive.ql.stats.StatsPublisher;
  import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
  import org.apache.hadoop.hive.serde2.SerDeException;
  import org.apache.hadoop.hive.serde2.SerDeStats;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
  import org.apache.hadoop.io.NullWritable;
  import org.apache.hadoop.io.Text;
@@ -77,7 +75,6 @@ import java.io.File;
  import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Collections;
-import java.util.HashMap;
  import java.util.LinkedHashMap;
  import java.util.List;
  import java.util.Map;
@@ -94,8 +91,7 @@ public class TestFileSinkOperator {
    private static TableDesc nonAcidTableDescriptor;
    private static TableDesc acidTableDescriptor;
    private static ObjectInspector inspector;
- private static List<TFSORow> rows;
- private static ValidTxnList txnList;
+ private static List<Row> rows;

    private Path basePath;
    private JobConf jc;
@@ -105,34 +101,33 @@ public class TestFileSinkOperator {
      Properties properties = new Properties();
      properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName());
      nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+ properties.setProperty(serdeConstants.LIST_COLUMNS,"data");
      properties = new Properties(properties);
      properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1");
      acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
-
      tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") +
          "testFileSinkOperator");
      tmpdir.mkdir();
      tmpdir.deleteOnExit();
- txnList = new ValidReadTxnList(new long[]{}, 2);
    }

    @Test
    public void testNonAcidWrite() throws Exception {
      setBasePath("write");
- setupData(DataFormat.SIMPLE);
+ setupData(DataFormat.WITH_PARTITION_VALUE);
      FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0);
      processRows(op);
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
    }

    @Test
    public void testInsert() throws Exception {
      setBasePath("insert");
- setupData(DataFormat.SIMPLE);
+ setupData(DataFormat.WITH_PARTITION_VALUE);
      FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1);
      processRows(op);
      Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
    }

    @Test
@@ -142,7 +137,7 @@ public class TestFileSinkOperator {
      FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2);
      processRows(op);
      Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID);
    }

    @Test
@@ -152,7 +147,7 @@ public class TestFileSinkOperator {
      FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2);
      processRows(op);
      Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID);
    }

    @Test
@@ -161,7 +156,7 @@ public class TestFileSinkOperator {
      setupData(DataFormat.WITH_PARTITION_VALUE);
      FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0);
      processRows(op);
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
    }


@@ -174,7 +169,7 @@ public class TestFileSinkOperator {
      // We only expect 5 here because we'll get whichever of the partitions published its stats
      // last.
      Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
    }

    @Test
@@ -184,19 +179,19 @@ public class TestFileSinkOperator {
      FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2);
      processRows(op);
      Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
    }

    @Test
    public void testDeleteDynamicPartitioning() throws Exception {
      setBasePath("deleteDP");
- setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+ setupData(DataFormat.WITH_RECORD_ID);
      FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2);
      processRows(op);
      // We only expect -5 here because we'll get whichever of the partitions published its stats
      // last.
      Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID);
    }


@@ -217,64 +212,52 @@ public class TestFileSinkOperator {

    }

- private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE,
- WITH_RECORD_ID_AND_PARTITION_VALUE};
+ private enum DataFormat {WITH_RECORD_ID, WITH_PARTITION_VALUE, WITH_RECORD_ID_AND_PARTITION_VALUE};

    private void setupData(DataFormat format) {

- // Build object inspector
- inspector = ObjectInspectorFactory.getReflectionObjectInspector
- (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
- rows = new ArrayList<TFSORow>();
-
+ Class<?> rType;
      switch (format) {
- case SIMPLE:
- // Build rows
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("mary had a little lamb")
- )
- );
- }
+ case WITH_PARTITION_VALUE:
+ rType = RowWithPartVal.class;
          break;
-
        case WITH_RECORD_ID:
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("its fleect was white as snow"),
- new RecordIdentifier(1, 1, i)
- )
- );
- }
+ rType = RowWithRecID.class;
          break;
-
- case WITH_PARTITION_VALUE:
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("its fleect was white as snow"),
- (i < 5) ? new Text("Monday") : new Text("Tuesday")
- )
- );
- }
- break;
-
        case WITH_RECORD_ID_AND_PARTITION_VALUE:
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("its fleect was white as snow"),
- (i < 5) ? new Text("Monday") : new Text("Tuesday"),
- new RecordIdentifier(1, 1, i)
- )
- );
- }
+ rType = RowWithPartNRecID.class;
          break;
-
        default:
- throw new RuntimeException("Unknown option!");
+ throw new RuntimeException("Unknown type");
+ }
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (rType, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+
+ rows = new ArrayList<Row>();
+ Row r;
+ for (int i = 0; i < 10; i++) {
+ switch (format) {
+ case WITH_PARTITION_VALUE:
+ r =
+ new RowWithPartVal(
+ new Text("mary had a little lamb"),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"));
+ break;
+ case WITH_RECORD_ID:
+ r = new RowWithRecID(new RecordIdentifier(1, 1, i),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"));
+ break;
+ case WITH_RECORD_ID_AND_PARTITION_VALUE:
+ r = new RowWithPartNRecID(
+ new Text("its fleect was white as snow"),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"),
+ new RecordIdentifier(1, 1, i));
+ break;
+ default:
+ throw new RuntimeException("Unknown data format");
+ }
+ rows.add(r);
+
      }
    }

@@ -300,9 +283,6 @@ public class TestFileSinkOperator {
        Map<String, String> partColMap= new LinkedHashMap<String, String>(1);
        partColMap.put(PARTCOL_NAME, null);
        DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
- Map<String, String> partColNames = new HashMap<String, String>(1);
- partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
- dpCtx.setInputToDPCols(partColNames);
        //todo: does this need the finalDestination?
        desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
      } else {
@@ -320,27 +300,27 @@ public class TestFileSinkOperator {
    }

    private void processRows(FileSinkOperator op) throws HiveException {
- for (TFSORow r : rows) op.process(r, 0);
+ for (Object r : rows) op.process(r, 0);
      op.jobCloseOp(jc, true);
      op.close(false);
    }

- private void confirmOutput() throws IOException, SerDeException {
+ private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException {
      Path[] paths = findFilesInBasePath();
- TFSOInputFormat input = new TFSOInputFormat();
+ TFSOInputFormat input = new TFSOInputFormat(rType);
      FileInputFormat.setInputPaths(jc, paths);

      InputSplit[] splits = input.getSplits(jc, 1);
- RecordReader<NullWritable, TFSORow> reader = input.getRecordReader(splits[0], jc,
+ RecordReader<NullWritable, Row> reader = input.getRecordReader(splits[0], jc,
          Mockito.mock(Reporter.class));
      NullWritable key = reader.createKey();
- TFSORow value = reader.createValue();
- List<TFSORow> results = new ArrayList<TFSORow>(rows.size());
- List<TFSORow> sortedRows = new ArrayList<TFSORow>(rows.size());
+ Row value = reader.createValue();
+ List<Row> results = new ArrayList<Row>(rows.size());
+ List<Row> sortedRows = new ArrayList<Row>(rows.size());
      for (int i = 0; i < rows.size(); i++) {
        Assert.assertTrue(reader.next(key, value));
- results.add(new TFSORow(value));
- sortedRows.add(new TFSORow(rows.get(i)));
+ results.add(value.clone());
+ sortedRows.add(rows.get(i));
      }
      Assert.assertFalse(reader.next(key, value));
      Collections.sort(results);
@@ -370,36 +350,172 @@ public class TestFileSinkOperator {
      }
    }

- private static class TFSORow implements WritableComparable<TFSORow> {
+ public static interface Row extends WritableComparable<Row> {
+
+ Row clone() throws CloneNotSupportedException;
+ }
+
+ private static class RowWithRecID implements Row {
+
      private RecordIdentifier recId;
- private Text data;
      private Text partVal;

- TFSORow() {
- this(null, null, null);
+ public RowWithRecID() {
+ }
+ public RowWithRecID(RecordIdentifier recId, Text partVal) {
+ super();
+ this.recId = recId;
+ this.partVal = partVal;
      }

- TFSORow(Text t) {
- this(t, null, null);
+ @Override
+ public
+ Row clone() throws CloneNotSupportedException {
+ return new RowWithRecID(this.recId, this.partVal);
      }

- TFSORow(Text t, Text pv) {
- this(t, pv, null);
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ if (partVal == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ partVal.write(dataOutput);
+ }
+ if (recId == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ recId.write(dataOutput);
+ }
      }
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ boolean notNull = dataInput.readBoolean();
+ if (notNull) {
+ partVal = new Text();
+ partVal.readFields(dataInput);
+ }
+ notNull = dataInput.readBoolean();
+ if (notNull) {
+ recId = new RecordIdentifier();
+ recId.readFields(dataInput);
+ }

- TFSORow(Text t, RecordIdentifier ri) {
- this(t, null, ri);
      }
+ @Override
+ public int compareTo(Row row) {
+ RowWithRecID other = (RowWithRecID) row;
+ if (recId == null && other.recId == null) {
+ return comparePartVal(other);
+ } else if (recId == null) {
+ return -1;
+ } else {
+ int rc = recId.compareTo(other.recId);
+ if (rc == 0) return comparePartVal(other);
+ else return rc;
+ }
+ }
+ private int comparePartVal(RowWithRecID other) {

- TFSORow(Text t, Text pv, RecordIdentifier ri) {
+ return partVal.compareTo(other.partVal);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return compareTo((RowWithRecID)obj) == 0;
+ }
+ }
+ private static class RowWithPartVal implements Row {
+
+ public RowWithPartVal(Text data, Text partVal) {
+ super();
+ this.data = data;
+ this.partVal = partVal;
+ }
+
+ public RowWithPartVal() {
+ }
+
+ private Text data;
+ private Text partVal;
+
+ @Override
+ public Row clone() throws CloneNotSupportedException {
+ return new RowWithPartVal(this.data, this.partVal);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ data.write(dataOutput);
+ if (partVal == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ partVal.write(dataOutput);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ data = new Text();
+ data.readFields(dataInput);
+ boolean notNull = dataInput.readBoolean();
+ if (notNull) {
+ partVal = new Text();
+ partVal.readFields(dataInput);
+ }
+ }
+
+ @Override
+ public int compareTo(Row row) {
+ RowWithPartVal other = (RowWithPartVal) row;
+ if (partVal == null && other.partVal == null) {
+ return compareData(other);
+ } else if (partVal == null) {
+ return -1;
+ } else {
+ int rc = partVal.compareTo(other.partVal);
+ if (rc == 0) return compareData(other);
+ else return rc;
+ }
+ }
+
+ private int compareData(RowWithPartVal other) {
+ if (data == null && other.data == null) return 0;
+ else if (data == null) return -1;
+ else return data.compareTo(other.data);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof RowWithPartVal) {
+ RowWithPartVal other = (RowWithPartVal) obj;
+ return compareTo(other) == 0;
+
+ } else {
+ return false;
+ }
+ }
+ }
+ private static class RowWithPartNRecID implements Row {
+ private RecordIdentifier recId;
+ private Text data;
+ private Text partVal;
+
+ RowWithPartNRecID() {
+ this(null, null, null);
+ }
+
+ RowWithPartNRecID(Text t, Text pv, RecordIdentifier ri) {
        data = t;
        partVal = pv;
        recId = ri;
-
      }

- TFSORow(TFSORow other) {
- this(other.data, other.partVal, other.recId);
+ @Override
+ public RowWithPartNRecID clone() throws CloneNotSupportedException {
+ return new RowWithPartNRecID(this.data, this.partVal, this.recId);
      }

      @Override
@@ -437,8 +553,8 @@ public class TestFileSinkOperator {

      @Override
      public boolean equals(Object obj) {
- if (obj instanceof TFSORow) {
- TFSORow other = (TFSORow) obj;
+ if (obj instanceof RowWithPartNRecID) {
+ RowWithPartNRecID other = (RowWithPartNRecID) obj;
          if (data == null && other.data == null) return checkPartVal(other);
          else if (data == null) return false;
          else if (data.equals(other.data)) return checkPartVal(other);
@@ -448,21 +564,22 @@ public class TestFileSinkOperator {
        }
      }

- private boolean checkPartVal(TFSORow other) {
+ private boolean checkPartVal(RowWithPartNRecID other) {
        if (partVal == null && other.partVal == null) return checkRecId(other);
        else if (partVal == null) return false;
        else if (partVal.equals(other.partVal)) return checkRecId(other);
        else return false;
      }

- private boolean checkRecId(TFSORow other) {
+ private boolean checkRecId(RowWithPartNRecID other) {
        if (recId == null && other.recId == null) return true;
        else if (recId == null) return false;
        else return recId.equals(other.recId);
      }

      @Override
- public int compareTo(TFSORow other) {
+ public int compareTo(Row row) {
+ RowWithPartNRecID other = (RowWithPartNRecID) row;
        if (recId == null && other.recId == null) {
          return comparePartVal(other);
        } else if (recId == null) {
@@ -474,7 +591,7 @@ public class TestFileSinkOperator {
        }
      }

- private int comparePartVal(TFSORow other) {
+ private int comparePartVal(RowWithPartNRecID other) {
        if (partVal == null && other.partVal == null) {
          return compareData(other);
        } else if (partVal == null) {
@@ -486,21 +603,26 @@ public class TestFileSinkOperator {
        }
      }

- private int compareData(TFSORow other) {
+ private int compareData(RowWithPartNRecID other) {
        if (data == null && other.data == null) return 0;
        else if (data == null) return -1;
        else return data.compareTo(other.data);
      }
    }

- private static class TFSOInputFormat extends FileInputFormat<NullWritable, TFSORow>
- implements AcidInputFormat<NullWritable, TFSORow> {
+ private static class TFSOInputFormat extends FileInputFormat<NullWritable, Row>
+ implements AcidInputFormat<NullWritable, Row> {

      FSDataInputStream in[] = null;
      int readingFrom = -1;
+ DataFormat rType;
+
+ public TFSOInputFormat(DataFormat rType) {
+ this.rType = rType;
+ }

      @Override
- public RecordReader<NullWritable, TFSORow> getRecordReader(
+ public RecordReader<NullWritable, Row> getRecordReader(
          InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
        if (in == null) {
          Path paths[] = FileInputFormat.getInputPaths(entries);
@@ -511,10 +633,10 @@ public class TestFileSinkOperator {
          }
          readingFrom = 0;
        }
- return new RecordReader<NullWritable, TFSORow>() {
+ return new RecordReader<NullWritable, Row>() {

          @Override
- public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws
+ public boolean next(NullWritable nullWritable, Row tfsoRecord) throws
              IOException {
            try {
              tfsoRecord.readFields(in[readingFrom]);
@@ -532,8 +654,18 @@ public class TestFileSinkOperator {
          }

          @Override
- public TFSORow createValue() {
- return new TFSORow();
+ public Row createValue() {
+ switch (rType) {
+ case WITH_RECORD_ID_AND_PARTITION_VALUE:
+ return new RowWithPartNRecID();
+ case WITH_PARTITION_VALUE:
+ return new RowWithPartVal();
+ case WITH_RECORD_ID:
+ return new RowWithRecID();
+
+ default:
+ throw new RuntimeException("Unknown row Type");
+ }
          }

          @Override
@@ -554,14 +686,14 @@ public class TestFileSinkOperator {
      }

      @Override
- public RowReader<TFSORow> getReader(InputSplit split,
+ public RowReader<Row> getReader(InputSplit split,
                                             Options options) throws
          IOException {
        return null;
      }

      @Override
- public RawReader<TFSORow> getRawReader(Configuration conf,
+ public RawReader<Row> getRawReader(Configuration conf,
                                                boolean collapseEvents,
                                                int bucket,
                                                ValidTxnList validTxnList,
@@ -578,9 +710,9 @@ public class TestFileSinkOperator {
      }
    }

- public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, TFSORow>
- implements AcidOutputFormat<NullWritable, TFSORow> {
- List<TFSORow> records = new ArrayList<TFSORow>();
+ public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, Row>
+ implements AcidOutputFormat<NullWritable, Row> {
+ List<Row> records = new ArrayList<>();
      long numRecordsAdded = 0;
      FSDataOutputStream out = null;

@@ -588,7 +720,6 @@ public class TestFileSinkOperator {
      public RecordUpdater getRecordUpdater(final Path path, final Options options) throws
          IOException {

- final StructObjectInspector inspector = (StructObjectInspector)options.getInspector();
        return new RecordUpdater() {
          @Override
          public void insert(long currentTransaction, Object row) throws IOException {
@@ -608,9 +739,8 @@ public class TestFileSinkOperator {
          }

          private void addRow(Object row) {
- assert row instanceof TFSORow : "Expected TFSORow but got " +
- row.getClass().getName();
- records.add((TFSORow)row);
+ assert row instanceof Row : "Expected Row but got " + row.getClass().getName();
+ records.add((Row)row);
          }

          @Override
@@ -619,7 +749,7 @@ public class TestFileSinkOperator {
              FileSystem fs = path.getFileSystem(options.getConfiguration());
              out = fs.create(path);
            }
- for (TFSORow r : records) r.write(out);
+ for (Writable r : records) r.write(out);
            records.clear();
            out.flush();
          }
@@ -657,8 +787,8 @@ public class TestFileSinkOperator {
        return new FileSinkOperator.RecordWriter() {
          @Override
          public void write(Writable w) throws IOException {
- Assert.assertTrue(w instanceof TFSORow);
- records.add((TFSORow) w);
+ Assert.assertTrue(w instanceof Row);
+ records.add((Row)w);
          }

          @Override
@@ -667,7 +797,7 @@ public class TestFileSinkOperator {
              FileSystem fs = finalOutPath.getFileSystem(jc);
              out = fs.create(finalOutPath);
            }
- for (TFSORow r : records) r.write(out);
+ for (Writable r : records) r.write(out);
            records.clear();
            out.flush();
            out.close();
@@ -676,7 +806,7 @@ public class TestFileSinkOperator {
      }

      @Override
- public RecordWriter<NullWritable, TFSORow> getRecordWriter(
+ public RecordWriter<NullWritable, Row> getRecordWriter(
          FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws
          IOException {
        return null;
@@ -688,7 +818,7 @@ public class TestFileSinkOperator {
      }
    }

- public static class TFSOSerDe implements SerDe {
+ public static class TFSOSerDe extends AbstractSerDe {

      @Override
      public void initialize(Configuration conf, Properties tbl) throws SerDeException {
@@ -697,20 +827,18 @@ public class TestFileSinkOperator {

      @Override
      public Class<? extends Writable> getSerializedClass() {
- return TFSORow.class;
+ return RowWithPartNRecID.class;
      }

      @Override
      public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
- assert obj instanceof TFSORow : "Expected TFSORow or decendent, got "
- + obj.getClass().getName();
- return (TFSORow)obj;
+ assert obj instanceof Row : "Expected Row or decendent, got " + obj.getClass().getName();
+ return (Row)obj;
      }

      @Override
      public Object deserialize(Writable blob) throws SerDeException {
- assert blob instanceof TFSORow : "Expected TFSORow or decendent, got "
- + blob.getClass().getName();
+ assert blob instanceof Row : "Expected Row or decendent, got "+ blob.getClass().getName();
        return blob;
      }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 1, '15 at 6:42p
activeOct 1, '15 at 6:42p
posts1
users1
websitehive.apache.org

1 user in discussion

Hashutosh: 1 post

People

Translate

site design / logo © 2021 Grokbase