FAQ
Author: gates
Date: Mon Oct 13 21:54:03 2014
New Revision: 1631569

URL: http://svn.apache.org/r1631569
Log:
HIVE-8368 compactor is improperly writing delete records in base file (Alan Gates, reviewed by Eugene Koifman)

Modified:
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
     hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
     hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java Mon Oct 13 21:54:03 2014
@@ -155,6 +155,8 @@ public interface AcidInputFormat<KEY ext
    public static interface RawReader<V>
        extends RecordReader<RecordIdentifier, V> {
      public ObjectInspector getObjectInspector();
+
+ public boolean isDelete(V value);
    }

    /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Mon Oct 13 21:54:03 2014
@@ -664,6 +664,11 @@ public class OrcRawRecordMerger implemen
          (OrcStruct.createObjectInspector(rowType));
    }

+ @Override
+ public boolean isDelete(OrcStruct value) {
+ return OrcRecordUpdater.getOperation(value) == OrcRecordUpdater.DELETE_OPERATION;
+ }
+
    /**
     * Get the number of columns in the underlying rows.
     * @return 0 if there are no base and no deltas.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Mon Oct 13 21:54:03 2014
@@ -506,13 +506,15 @@ public class CompactorMR {
        ValidTxnList txnList =
            new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));

+ boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
        AcidInputFormat.RawReader<V> reader =
- aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(),
+ aif.getRawReader(jobConf, isMajor, split.getBucket(),
                txnList, split.getBaseDir(), split.getDeltaDirs());
        RecordIdentifier identifier = reader.createKey();
        V value = reader.createValue();
        getWriter(reporter, reader.getObjectInspector(), split.getBucket());
        while (reader.next(identifier, value)) {
+ if (isMajor && reader.isDelete(value)) continue;
          writer.write(value);
          reporter.progress();
        }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java Mon Oct 13 21:54:03 2014
@@ -56,6 +56,7 @@ import java.util.List;

  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertNull;

  public class TestOrcRawRecordMerger {
@@ -574,12 +575,14 @@ public class TestOrcRawRecordMerger {
          OrcRecordUpdater.getOperation(event));
      assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
      assertEquals("update 1", getValue(event));
+ assertFalse(merger.isDelete(event));

      assertEquals(true, merger.next(id, event));
      assertEquals(OrcRecordUpdater.INSERT_OPERATION,
          OrcRecordUpdater.getOperation(event));
      assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
      assertEquals("second", getValue(event));
+ assertFalse(merger.isDelete(event));

      assertEquals(true, merger.next(id, event));
      assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
@@ -616,6 +619,7 @@ public class TestOrcRawRecordMerger {
          OrcRecordUpdater.getOperation(event));
      assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
      assertNull(OrcRecordUpdater.getRow(event));
+ assertTrue(merger.isDelete(event));

      assertEquals(true, merger.next(id, event));
      assertEquals(OrcRecordUpdater.DELETE_OPERATION,

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Mon Oct 13 21:54:03 2014
@@ -339,6 +339,7 @@ public abstract class CompactorTest {
      private final Configuration conf;
      private FSDataInputStream is = null;
      private final FileSystem fs;
+ private boolean lastWasDelete = true;

      MockRawReader(Configuration conf, List<Path> files) throws IOException {
        filesToRead = new Stack<Path>();
@@ -353,6 +354,15 @@ public abstract class CompactorTest {
      }

      @Override
+ public boolean isDelete(Text value) {
+ // Alternate between returning deleted and not. This is easier than actually
+ // tracking operations. We test that this is getting properly called by checking that only
+ // half the records show up in base files after major compactions.
+ lastWasDelete = !lastWasDelete;
+ return lastWasDelete;
+ }
+
+ @Override
      public boolean next(RecordIdentifier identifier, Text text) throws IOException {
        if (is == null) {
          // Open the next file

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Mon Oct 13 21:54:03 2014
@@ -418,8 +418,8 @@ public class TestWorker extends Compacto
          Assert.assertEquals(2, buckets.length);
          Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
          Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(1248L, buckets[0].getLen());
- Assert.assertEquals(1248L, buckets[1].getLen());
+ Assert.assertEquals(624L, buckets[0].getLen());
+ Assert.assertEquals(624L, buckets[1].getLen());
        } else {
          LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
        }
@@ -464,8 +464,8 @@ public class TestWorker extends Compacto
          Assert.assertEquals(2, buckets.length);
          Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
          Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(1248L, buckets[0].getLen());
- Assert.assertEquals(1248L, buckets[1].getLen());
+ Assert.assertEquals(624L, buckets[0].getLen());
+ Assert.assertEquals(624L, buckets[1].getLen());
        } else {
          LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
        }
@@ -507,8 +507,8 @@ public class TestWorker extends Compacto
          Assert.assertEquals(2, buckets.length);
          Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
          Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(208L, buckets[0].getLen());
- Assert.assertEquals(208L, buckets[1].getLen());
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
        } else {
          LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
        }
@@ -551,8 +551,8 @@ public class TestWorker extends Compacto
          Assert.assertEquals(2, buckets.length);
          Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
          Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(1248L, buckets[0].getLen());
- Assert.assertEquals(1248L, buckets[1].getLen());
+ Assert.assertEquals(624L, buckets[0].getLen());
+ Assert.assertEquals(624L, buckets[1].getLen());
        } else {
          LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
        }
@@ -606,9 +606,10 @@ public class TestWorker extends Compacto
      Table t = newTable("default", "mapwbmb", true);
      Partition p = newPartition(t, "today");

+
      addBaseFile(t, p, 20L, 20, 2, false);
      addDeltaFile(t, p, 21L, 22L, 2, 2, false);
- addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 23L, 26L, 4);

      burnThroughTransactions(25);

@@ -631,7 +632,7 @@ public class TestWorker extends Compacto
      // Find the new delta file and make sure it has the right contents
      boolean sawNewBase = false;
      for (int i = 0; i < stat.length; i++) {
- if (stat[i].getPath().getName().equals("base_0000024")) {
+ if (stat[i].getPath().getName().equals("base_0000026")) {
          sawNewBase = true;
          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
          Assert.assertEquals(2, buckets.length);
@@ -640,10 +641,12 @@ public class TestWorker extends Compacto
          // Bucket 0 should be small and bucket 1 should be large, make sure that's the case
          Assert.assertTrue(
              ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen()
- && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L == buckets[1] .getLen())
+ && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L == buckets[1]
+ .getLen())
              ("bucket_00000".equals(buckets[1].getPath().getName()) && 104L == buckets[1].getLen()
- && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L == buckets[0] .getLen())
+ && "bucket_00001".equals(buckets[0].getPath().getName()) && 676L == buckets[0]
+ .getLen())
          );
        } else {
          LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 13, '14 at 9:54p
activeOct 13, '14 at 9:54p
posts1
users1
websitehive.apache.org

1 user in discussion

Gates: 1 post

People

Translate

site design / logo © 2021 Grokbase