FAQ
Author: hashutosh
Date: Sat Sep 28 04:26:55 2013
New Revision: 1527149

URL: http://svn.apache.org/r1527149
Log:
HIVE-5324 : Extend record writer and ORC reader/writer interfaces to provide statistics (Prasanth J via Ashutosh Chauhan)

Added:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
Modified:
     hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
     hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
     hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
     hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
     hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java

Modified: hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java (original)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -24,7 +24,7 @@ import java.util.Properties;

  import org.apache.commons.codec.binary.Base64;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.Text;
@@ -53,13 +53,13 @@ public class Base64TextOutputFormat<K ex
     * Base64RecordWriter.
     *
     */
- public static class Base64RecordWriter implements RecordWriter,
+ public static class Base64RecordWriter implements FSRecordWriter,
        JobConfigurable {

- RecordWriter writer;
+ FSRecordWriter writer;
      BytesWritable bytesWritable;

- public Base64RecordWriter(RecordWriter writer) {
+ public Base64RecordWriter(FSRecordWriter writer) {
        this.writer = writer;
        bytesWritable = new BytesWritable();
      }
@@ -119,7 +119,7 @@ public class Base64TextOutputFormat<K ex
    }

    @Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
        Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException {


Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
  import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.hive.shims.ShimLoader;
  import org.apache.hadoop.io.Text;
@@ -71,7 +71,7 @@ public class HiveHFileOutputFormat exten
    }

    @Override
- public RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
      final JobConf jc,
      final Path finalOutPath,
      Class<? extends Writable> valueClass,
@@ -120,7 +120,7 @@ public class HiveHFileOutputFormat exten
        ++i;
      }

- return new RecordWriter() {
+ return new FSRecordWriter() {

        @Override
        public void close(boolean abort) throws IOException {

Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java (original)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java Sat Sep 28 04:26:55 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.metastore.HiveMetaHook;
  import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -285,7 +286,7 @@ class DummyStorageHandler extends HCatSt
       * org.apache.hadoop.util.Progressable)
       */
      @Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
        JobConf jc, Path finalOutPath,
        Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress)

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -25,6 +25,7 @@ import java.util.Properties;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.io.WritableComparable;
@@ -40,7 +41,7 @@ public class HBaseBaseOutputFormat imple
    HiveOutputFormat<WritableComparable<?>, Put> {

    @Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
      JobConf jc, Path finalOutPath,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProperties, Progressable progress)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Sep 28 04:26:55 2013
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.FileUtils;
  import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
  import org.apache.hadoop.hive.ql.io.HiveKey;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
  import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
  import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -84,11 +86,13 @@ public class FileSinkOperator extends Te
    protected transient int dpStartCol; // start column # for DP columns
    protected transient List<String> dpVals; // array of values corresponding to DP columns
    protected transient List<Object> dpWritables;
- protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
+ protected transient FSRecordWriter[] rowOutWriters; // row specific RecordWriters
    protected transient int maxPartitions;
    protected transient ListBucketingCtx lbCtx;
    protected transient boolean isSkewedStoredAsSubDirectories;
    private transient boolean statsCollectRawDataSize;
+ private transient boolean[] statsFromRecordWriter;
+ private transient boolean isCollectRWStats;


    private static final transient String[] FATAL_ERR_MSG = {
@@ -96,22 +100,12 @@ public class FileSinkOperator extends Te
        "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode."
    };

- /**
- * RecordWriter.
- *
- */
- public static interface RecordWriter {
- void write(Writable w) throws IOException;
-
- void close(boolean abort) throws IOException;
- }
-
    public class FSPaths implements Cloneable {
      Path tmpPath;
      Path taskOutputTempPath;
      Path[] outPaths;
      Path[] finalPaths;
- RecordWriter[] outWriters;
+ FSRecordWriter[] outWriters;
      Stat stat;

      public FSPaths() {
@@ -122,7 +116,7 @@ public class FileSinkOperator extends Te
        taskOutputTempPath = Utilities.toTaskTempPath(specPath);
        outPaths = new Path[numFiles];
        finalPaths = new Path[numFiles];
- outWriters = new RecordWriter[numFiles];
+ outWriters = new FSRecordWriter[numFiles];
        stat = new Stat();
      }

@@ -166,11 +160,11 @@ public class FileSinkOperator extends Te
        }
      }

- public void setOutWriters(RecordWriter[] out) {
+ public void setOutWriters(FSRecordWriter[] out) {
        outWriters = out;
      }

- public RecordWriter[] getOutWriters() {
+ public FSRecordWriter[] getOutWriters() {
        return outWriters;
      }

@@ -324,6 +318,7 @@ public class FileSinkOperator extends Te
        isCompressed = conf.getCompressed();
        parent = Utilities.toTempPath(conf.getDirName());
        statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
+ statsFromRecordWriter = new boolean[numFiles];

        serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
        serializer.initialize(null, conf.getTableInfo().getProperties());
@@ -516,6 +511,8 @@ public class FileSinkOperator extends Te
          fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
              jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
              reporter);
+ // If the record writer provides stats, get it from there instead of the serde
+ statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
          // increment the CREATED_FILES counter
          if (reporter != null) {
            reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -619,7 +616,11 @@ public class FileSinkOperator extends Te
        }

        rowOutWriters = fpaths.outWriters;
- if (conf.isGatherStats()) {
+ // check if all record writers implement statistics. if atleast one RW
+ // doesn't implement stats interface we will fallback to conventional way
+ // of gathering stats
+ isCollectRWStats = areAllTrue(statsFromRecordWriter);
+ if (conf.isGatherStats() && !isCollectRWStats) {
          if (statsCollectRawDataSize) {
            SerDeStats stats = serializer.getSerDeStats();
            if (stats != null) {
@@ -630,12 +631,14 @@ public class FileSinkOperator extends Te
        }


+ FSRecordWriter rowOutWriter = null;
+
        if (row_count != null) {
          row_count.set(row_count.get() + 1);
        }

        if (!multiFileSpray) {
- rowOutWriters[0].write(recordValue);
+ rowOutWriter = rowOutWriters[0];
        } else {
          int keyHashCode = 0;
          for (int i = 0; i < partitionEval.length; i++) {
@@ -646,8 +649,9 @@ public class FileSinkOperator extends Te
          key.setHashCode(keyHashCode);
          int bucketNum = prtner.getBucket(key, null, totalFiles);
          int idx = bucketMap.get(bucketNum);
- rowOutWriters[idx].write(recordValue);
+ rowOutWriter = rowOutWriters[idx];
        }
+ rowOutWriter.write(recordValue);
      } catch (IOException e) {
        throw new HiveException(e);
      } catch (SerDeException e) {
@@ -655,6 +659,15 @@ public class FileSinkOperator extends Te
      }
    }

+ private boolean areAllTrue(boolean[] statsFromRW) {
+ for(boolean b : statsFromRW) {
+ if (!b) {
+ return false;
+ }
+ }
+ return true;
+ }
+
    /**
     * Lookup list bucketing path.
     * @param lbDirName
@@ -864,6 +877,27 @@ public class FileSinkOperator extends Te
      if (!abort) {
        for (FSPaths fsp : valToPaths.values()) {
          fsp.closeWriters(abort);
+
+ // before closing the operator check if statistics gathering is requested
+ // and is provided by record writer. this is different from the statistics
+ // gathering done in processOp(). In processOp(), for each row added
+ // serde statistics about the row is gathered and accumulated in hashmap.
+ // this adds more overhead to the actual processing of row. But if the
+ // record writer already gathers the statistics, it can simply return the
+ // accumulated statistics which will be aggregated in case of spray writers
+ if (conf.isGatherStats() && isCollectRWStats) {
+ for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+ FSRecordWriter outWriter = fsp.outWriters[idx];
+ if (outWriter != null) {
+ SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ }
+ }
+ }
+ }
+
          if (isNativeTable) {
            fsp.commit(fs);
          }
@@ -934,7 +968,7 @@ public class FileSinkOperator extends Te
                   hiveOutputFormat = ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
             }
            else {
- hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
            }
          }
          else {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Sep 28 04:26:55 2013
@@ -102,12 +102,12 @@ import org.apache.hadoop.hive.ql.Context
  import org.apache.hadoop.hive.ql.Driver;
  import org.apache.hadoop.hive.ql.ErrorMsg;
  import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
  import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
  import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
  import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
  import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
  import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
  import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -1694,7 +1694,7 @@ public final class Utilities {

      for (String p : paths) {
        Path path = new Path(p);
- RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+ FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter(
            jc, hiveOutputFormat, outputClass, isCompressed,
            tableInfo.getProperties(), path, reporter);
        writer.close(false);
@@ -2853,7 +2853,7 @@ public final class Utilities {
      Path newFilePath = new Path(newFile);

      String onefile = newPath.toString();
- RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+ FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
          Text.class, false, props, null);
      if (dummyRow) {
        // empty files are omitted at CombineHiveInputFormat.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java Sat Sep 28 04:26:55 2013
@@ -28,8 +28,8 @@ import java.util.Properties;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
@@ -240,7 +240,7 @@ public class PTFRowContainer<Row extends
    }


- private static class PTFRecordWriter implements RecordWriter {
+ private static class PTFRecordWriter implements FSRecordWriter {
      BytesWritable EMPTY_KEY = new BytesWritable();

      SequenceFile.Writer outStream;
@@ -262,7 +262,7 @@ public class PTFRowContainer<Row extends
      extends HiveSequenceFileOutputFormat<K,V> {

      @Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
          Class<? extends Writable> valueClass, boolean isCompressed,
          Properties tableProperties, Progressable progress) throws IOException {


Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Sat Sep 28 04:26:55 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.LocalFileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -105,7 +105,7 @@ public class RowContainer<ROW extends Li
    int acutalSplitNum = 0;
    int currentSplitPointer = 0;
    org.apache.hadoop.mapred.RecordReader rr = null; // record reader
- RecordWriter rw = null;
+ FSRecordWriter rw = null;
    InputFormat<WritableComparable, Writable> inputFormat = null;
    InputSplit[] inputSplits = null;
    private ROW dummyRow = null;
@@ -531,7 +531,7 @@ public class RowContainer<ROW extends Li

    }

- protected RecordWriter getRecordWriter() {
+ protected FSRecordWriter getRecordWriter() {
      return rw;
    }


Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java?rev=1527149&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java Sat Sep 28 04:26:55 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Record writer used by file sink operator.
+ *
+ * FSRecordWriter.
+ *
+ */
+public interface FSRecordWriter {
+ void write(Writable w) throws IOException;
+
+ void close(boolean abort) throws IOException;
+
+ /**
+ * If a file format internally gathers statistics (like ORC) while writing then
+ * it can expose the statistics through this record writer interface. Writer side
+ * statistics is useful for updating the metastore with table/partition level
+ * statistics.
+ * StatsProvidingRecordWriter.
+ *
+ */
+ public interface StatsProvidingRecordWriter extends FSRecordWriter{
+ /**
+ * Returns the statistics information
+ * @return SerDeStats
+ */
+ SerDeStats getStats();
+ }
+
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -24,7 +24,6 @@ import java.util.Properties;

  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.Writable;
@@ -43,7 +42,7 @@ public class HiveBinaryOutputFormat<K ex
    /**
     * create the final out file, and output row by row. After one row is
     * appended, a configured row separator is appended
- *
+ *
     * @param jc
     * the job configuration file
     * @param outPath
@@ -59,14 +58,14 @@ public class HiveBinaryOutputFormat<K ex
     * @return the RecordWriter
     */
    @Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
        Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException {

      FileSystem fs = outPath.getFileSystem(jc);
      final OutputStream outStream = fs.create(outPath);

- return new RecordWriter() {
+ return new FSRecordWriter() {
        public void write(Writable r) throws IOException {
          if (r instanceof Text) {
            Text tr = (Text) r;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Sat Sep 28 04:26:55 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.exec.Operator;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -246,7 +245,7 @@ public final class HiveFileFormatUtils {
      return true;
    }

- public static RecordWriter getHiveRecordWriter(JobConf jc,
+ public static FSRecordWriter getHiveRecordWriter(JobConf jc,
        TableDesc tableInfo, Class<? extends Writable> outputClass,
        FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
      boolean storagehandlerofhivepassthru = false;
@@ -287,7 +286,7 @@ public final class HiveFileFormatUtils {
      }
    }

- public static RecordWriter getRecordWriter(JobConf jc,
+ public static FSRecordWriter getRecordWriter(JobConf jc,
        HiveOutputFormat<?, ?> hiveOutputFormat,
        final Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProp, Path outPath, Reporter reporter

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -25,7 +25,6 @@ import java.util.Properties;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.serde.serdeConstants;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.Text;
@@ -39,7 +38,7 @@ import org.apache.hadoop.util.Progressab
  /**
   * HiveIgnoreKeyTextOutputFormat replaces key with null before feeding the <key,
   * value> to TextOutputFormat.RecordWriter.
- *
+ *
   */
  public class HiveIgnoreKeyTextOutputFormat<K extends WritableComparable, V extends Writable>
      extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
@@ -47,7 +46,7 @@ public class HiveIgnoreKeyTextOutputForm
    /**
     * create the final out file, and output row by row. After one row is
     * appended, a configured row separator is appended
- *
+ *
     * @param jc
     * the job configuration file
     * @param outPath
@@ -63,7 +62,7 @@ public class HiveIgnoreKeyTextOutputForm
     * @return the RecordWriter
     */
    @Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
        Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException {
      int rowSeparator = 0;
@@ -79,7 +78,7 @@ public class HiveIgnoreKeyTextOutputForm
      FileSystem fs = outPath.getFileSystem(jc);
      final OutputStream outStream = Utilities.createCompressedStream(jc, fs
          .create(outPath), isCompressed);
- return new RecordWriter() {
+ return new FSRecordWriter() {
        public void write(Writable r) throws IOException {
          if (r instanceof Text) {
            Text tr = (Text) r;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -23,7 +23,6 @@ import java.util.Properties;

  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.NullWritable;
@@ -48,7 +47,7 @@ public class HiveNullValueSequenceFileOu
    private boolean keyIsText;

    @Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
        Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException {

@@ -58,7 +57,7 @@ public class HiveNullValueSequenceFileOu

      keyWritable = new HiveKey();
      keyIsText = valueClass.equals(Text.class);
- return new RecordWriter() {
+ return new FSRecordWriter() {
        public void write(Writable r) throws IOException {
          if (keyIsText) {
            Text text = (Text) r;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
  import java.util.Properties;

  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.OutputFormat;
@@ -58,7 +57,7 @@ public interface HiveOutputFormat<K, V>
     * progress used for status report
     * @return the RecordWriter for the output file
     */
- RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
        final Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException;


Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -49,7 +49,7 @@ public class HivePassThroughOutputFormat
                                    "org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";

    public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
- "hive.passthrough.storagehandler.of";
+ "hive.passthrough.storagehandler.of";

    public HivePassThroughOutputFormat() {
      //construct this class through ReflectionUtils from FileSinkOperator
@@ -99,7 +99,7 @@ public class HivePassThroughOutputFormat
    }

    @Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
        JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException {
      if (this.initialized == false) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java Sat Sep 28 04:26:55 2013
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io;

  import java.io.IOException;

-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.io.Writable;
  import org.apache.hadoop.io.WritableComparable;


  public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
-implements RecordWriter {
+implements FSRecordWriter {

    private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;


Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -23,7 +23,6 @@ import java.util.Properties;

  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +55,7 @@ public class HiveSequenceFileOutputForma
     * @return the RecordWriter for the output file
     */
    @Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
        Class<? extends Writable> valueClass, boolean isCompressed,
        Properties tableProperties, Progressable progress) throws IOException {

@@ -64,7 +63,7 @@ public class HiveSequenceFileOutputForma
      final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
          fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);

- return new RecordWriter() {
+ return new FSRecordWriter() {
        public void write(Writable r) throws IOException {
          outStream.append(EMPTY_KEY, r);
        }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -118,7 +118,7 @@ public class RCFileOutputFormat extends
     * @throws IOException
     */
    @Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
        JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
        boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException {

@@ -135,7 +135,7 @@ public class RCFileOutputFormat extends
        (jc, finalOutPath.getFileSystem(jc),
         finalOutPath, isCompressed);

- return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+ return new FSRecordWriter() {
        public void write(Writable r) throws IOException {
          outWriter.append(r);
        }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -17,6 +17,14 @@
   */
  package org.apache.hadoop.hive.ql.io.avro;

+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+
+import java.io.IOException;
+import java.util.Properties;
+
  import org.apache.avro.Schema;
  import org.apache.avro.file.CodecFactory;
  import org.apache.avro.file.DataFileWriter;
@@ -24,7 +32,7 @@ import org.apache.avro.generic.GenericDa
  import org.apache.avro.generic.GenericRecord;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
  import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
@@ -36,14 +44,6 @@ import org.apache.hadoop.mapred.RecordWr
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.util.Progressable;

-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
-
  /**
   * Write to an Avro file from a Hive process.
   */
@@ -51,7 +51,7 @@ public class AvroContainerOutputFormat
          implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {

    @Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+ public FSRecordWriter getHiveRecordWriter(JobConf jobConf,
           Path path, Class<? extends Writable> valueClass, boolean isCompressed,
           Properties properties, Progressable progressable) throws IOException {
      Schema schema;
@@ -62,7 +62,7 @@ public class AvroContainerOutputFormat
      }
      GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>(schema);
      DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(gdw);
-
+
      if (isCompressed) {
        int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
        String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java Sat Sep 28 04:26:55 2013
@@ -18,18 +18,18 @@
  package org.apache.hadoop.hive.ql.io.avro;


+import java.io.IOException;
+
  import org.apache.avro.file.DataFileWriter;
  import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
  import org.apache.hadoop.io.Writable;

-import java.io.IOException;
-
  /**
   * Write an Avro GenericRecord to an Avro data file.
   */
-public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+public class AvroGenericRecordWriter implements FSRecordWriter{
    final private DataFileWriter<GenericRecord> dfw;

    public AvroGenericRecordWriter(DataFileWriter<GenericRecord> dfw) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -17,10 +17,9 @@
   */
  package org.apache.hadoop.hive.ql.io.orc;

-import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
  import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -45,7 +44,7 @@ public class OrcOutputFormat extends Fil

    private static class OrcRecordWriter
        implements RecordWriter<NullWritable, OrcSerdeRow>,
- FileSinkOperator.RecordWriter {
+ FSRecordWriter {
      private Writer writer = null;
      private final Path path;
      private final OrcFile.WriterOptions options;
@@ -105,7 +104,7 @@ public class OrcOutputFormat extends Fil
    }

    @Override
- public FileSinkOperator.RecordWriter
+ public FSRecordWriter
       getHiveRecordWriter(JobConf conf,
                           Path path,
                           Class<? extends Writable> valueClass,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Sat Sep 28 04:26:55 2013
@@ -39,6 +39,19 @@ public interface Reader {
    long getNumberOfRows();

    /**
+ * Get the deserialized data size of the file
+ * @return raw data size
+ */
+ long getRawDataSize();
+
+ /**
+ * Get the deserialized data size of the specified columns
+ * @param colNames
+ * @return raw data size of columns
+ */
+ long getRawDataSizeOfColumns(List<String> colNames);
+
+ /**
     * Get the user metadata keys.
     * @return the set of metadata keys
     */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Sat Sep 28 04:26:55 2013
@@ -343,4 +343,14 @@ final class ReaderImpl implements Reader
          include, footer.getRowIndexStride(), sarg, columnNames);
    }

+ @Override
+ public long getRawDataSize() {
+ return 0;
+ }
+
+ @Override
+ public long getRawDataSizeOfColumns(List<String> colNames) {
+ return 0;
+ }
+
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Sat Sep 28 04:26:55 2013
@@ -47,4 +47,22 @@ public interface Writer {
     * @throws IOException
     */
    void close() throws IOException;
+
+ /**
+ * Return the deserialized data size. Raw data size will be compute when
+ * writing the file footer. Hence raw data size value will be available only
+ * after closing the writer.
+ *
+ * @return raw data size
+ */
+ long getRawDataSize();
+
+ /**
+ * Return the number of rows in file. Row count gets updated when flushing
+ * the stripes. To get accurate row count this method should be called after
+ * closing the writer.
+ *
+ * @return row count
+ */
+ long getNumberOfRows();
  }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat Sep 28 04:26:55 2013
@@ -1871,4 +1871,14 @@ class WriterImpl implements Writer, Memo
        rawWriter.close();
      }
    }
+
+ @Override
+ public long getRawDataSize() {
+ return 0;
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return 0;
+ }
  }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Sat Sep 28 04:26:55 2013
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.permission.FsPermission;
  import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
  import org.apache.hadoop.hive.ql.io.InputFormatChecker;
  import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -521,7 +521,7 @@ public class TestInputOutputFormat {
      }
      SerDe serde = new OrcSerde();
      HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
- FileSinkOperator.RecordWriter writer =
+ FSRecordWriter writer =
          outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
              properties, Reporter.NULL);
      writer.write(serde.serialize(new MyRow(1,2), inspector));
@@ -686,7 +686,7 @@ public class TestInputOutputFormat {
      JobConf job = new JobConf(conf);
      Properties properties = new Properties();
      HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
- FileSinkOperator.RecordWriter writer =
+ FSRecordWriter writer =
          outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
              properties, Reporter.NULL);
      writer.close(true);
@@ -731,7 +731,7 @@ public class TestInputOutputFormat {
      }
      SerDe serde = new OrcSerde();
      HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
- FileSinkOperator.RecordWriter writer =
+ FSRecordWriter writer =
          outFormat.getHiveRecordWriter(conf, testFilePath, StringRow.class,
              true, properties, Reporter.NULL);
      writer.write(serde.serialize(new StringRow("owen"), inspector));

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java Sat Sep 28 04:26:55 2013
@@ -1,7 +1,10 @@
  package org.apache.hadoop.hive.ql.io.udf;

+import java.io.IOException;
+import java.util.Properties;
+
  import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
  import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
  import org.apache.hadoop.io.BytesWritable;
  import org.apache.hadoop.io.LongWritable;
@@ -11,27 +14,24 @@ import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.util.Progressable;

-import java.io.IOException;
-import java.util.Properties;
-
  public class Rot13OutputFormat
    extends HiveIgnoreKeyTextOutputFormat<LongWritable,Text> {

    @Override
- public RecordWriter
+ public FSRecordWriter
      getHiveRecordWriter(JobConf jc,
                          Path outPath,
                          Class<? extends Writable> valueClass,
                          boolean isCompressed,
                          Properties tableProperties,
                          Progressable progress) throws IOException {
- final RecordWriter result =
+ final FSRecordWriter result =
        super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed,
          tableProperties,progress);
      final Reporter reporter = (Reporter) progress;
      reporter.setStatus("got here");
      System.out.println("Got a reporter " + reporter);
- return new RecordWriter() {
+ return new FSRecordWriter() {
        @Override
        public void write(Writable w) throws IOException {
          if (w instanceof Text) {

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java Sat Sep 28 04:26:55 2013
@@ -27,9 +27,11 @@ public class SerDeStats {

    // currently we support only raw data size stat
    private long rawDataSize;
+ private long rowCount;

    public SerDeStats() {
      rawDataSize = 0;
+ rowCount = 0;
    }

    /**
@@ -48,4 +50,20 @@ public class SerDeStats {
      rawDataSize = uSize;
    }

+ /**
+ * Return the row count
+ * @return row count
+ */
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ /**
+ * Set the row count
+ * @param rowCount - count of rows
+ */
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedSep 28, '13 at 4:27a
activeSep 28, '13 at 4:27a
posts1
users1
websitehive.apache.org

1 user in discussion

Hashutosh: 1 post

People

Translate

site design / logo © 2021 Grokbase