FAQ
Repository: hive
Updated Branches:
   refs/heads/master 3038b05ed -> 968620932


http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java
new file mode 100644
index 0000000..f4e55ed
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java
@@ -0,0 +1,717 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TestHBaseAggregateStatsExtrapolation {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestHBaseAggregateStatsExtrapolation.class.getName());
+
+ @Mock
+ HTableInterface htable;
+ private HBaseStore store;
+ SortedMap<String, Cell> rows = new TreeMap<>();
+
+ // NDV will be 3 for the bitVectors
+ String bitVectors = "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}";
+
+ @Before
+ public void before() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ HiveConf conf = new HiveConf();
+ conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true);
+ store = MockUtils.init(conf, htable, rows);
+ store.backdoor().getStatsCache().resetCounters();
+ }
+
+ private static interface Checker {
+ void checkStats(AggrStats aggrStats) throws Exception;
+ }
+
+ @Test
+ public void allPartitionsHaveBitVectorStatusLong() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void allPartitionsHaveBitVectorStatusDecimal() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1_decimal", "decimal", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1_decimal");
+ obj.setColType("decimal");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ DecimalColumnStatsData dcsd = new DecimalColumnStatsData();
+ dcsd.setHighValue(StatObjectConverter.createThriftDecimal("" + (1000 + i)));
+ dcsd.setLowValue(StatObjectConverter.createThriftDecimal("" + (-1000 - i)));
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors);
+ data.setDecimalStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1_decimal", cso.getColName());
+ Assert.assertEquals("decimal", cso.getColType());
+ DecimalColumnStatsData lcsd = cso.getStatsData().getDecimalStats();
+ Assert.assertEquals(1009, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01);
+ Assert.assertEquals(-1009, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col1_decimal"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void allPartitionsHaveBitVectorStatusDouble() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1_double", "double", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1_double");
+ obj.setColType("double");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors);
+ data.setDoubleStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1_double", cso.getColName());
+ Assert.assertEquals("double", cso.getColType());
+ DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats();
+ Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col1_double"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void allPartitionsHaveBitVectorStatusString() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1_string", "string", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1_string");
+ obj.setColType("string");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ StringColumnStatsData dcsd = new StringColumnStatsData();
+ dcsd.setAvgColLen(i + 1);
+ dcsd.setMaxColLen(i + 10);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors);
+ data.setStringStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1_string", cso.getColName());
+ Assert.assertEquals("string", cso.getColType());
+ StringColumnStatsData lcsd = cso.getStatsData().getStringStats();
+ Assert.assertEquals(10, lcsd.getAvgColLen(), 0.01);
+ Assert.assertEquals(19, lcsd.getMaxColLen(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col1_string"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void noPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col2", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col2");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col2", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(90, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col2"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsOfPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col3", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i < 2 || i > 7) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col3");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i);
+ dcsd.setBitVectors(bitVectors);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(4, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col3", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col3"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void MiddleOfPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col4", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i > 2 && i < 7) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col4");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i);
+ dcsd.setBitVectors(bitVectors);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(4, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col4", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1006, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1006, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col4"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusLong() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col5", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col5");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i);
+ dcsd.setBitVectors(bitVectors);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(6, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col5", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(40, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col5"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDouble() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col5_double", "double", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col5_double");
+ obj.setColType("double");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i);
+ dcsd.setBitVectors(bitVectors);
+ data.setDoubleStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(6, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col5_double", cso.getColName());
+ Assert.assertEquals("double", cso.getColType());
+ DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats();
+ Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(40, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col5_double"));
+ statChecker.checkStats(aggrStats);
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
new file mode 100644
index 0000000..62918be
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TestHBaseAggregateStatsNDVUniformDist {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestHBaseAggregateStatsNDVUniformDist.class.getName());
+
+ @Mock
+ HTableInterface htable;
+ private HBaseStore store;
+ SortedMap<String, Cell> rows = new TreeMap<>();
+
+ // NDV will be 3 for bitVectors[0] and 12 for bitVectors[1]
+ String bitVectors[] = {
+ "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}",
+ "{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}" };
+
+ @Before
+ public void before() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ HiveConf conf = new HiveConf();
+ conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true);
+ conf.setBoolean(HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION.varname, true);
+ store = MockUtils.init(conf, htable, rows);
+ store.backdoor().getStatsCache().resetCounters();
+ }
+
+ private static interface Checker {
+ void checkStats(AggrStats aggrStats) throws Exception;
+ }
+
+ @Test
+ public void allPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors[0]);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void noPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col2", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col2");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(10, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col2", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1009, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(91, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col2"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsOfPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col3", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i < 2 || i > 7) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col3");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors[i / 5]);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(4, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col3", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(12, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col3"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void MiddleOfPartitionsHaveBitVectorStatus() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col4", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i > 2 && i < 7) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col4");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors[0]);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(4, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col4", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1006, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1006, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(45, lcsd.getNumNulls());
+ Assert.assertEquals(3, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col4"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusLong() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col5_long", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col5_long");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData dcsd = new LongColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors[i / 5]);
+ data.setLongStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(6, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col5_long", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(40, lcsd.getNumNulls());
+ Assert.assertEquals(12, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col5_long"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDecimal() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col5_decimal", "decimal", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col5_decimal");
+ obj.setColType("decimal");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ DecimalColumnStatsData dcsd = new DecimalColumnStatsData();
+ dcsd.setHighValue(StatObjectConverter.createThriftDecimal("" + (1000 + i)));
+ dcsd.setLowValue(StatObjectConverter.createThriftDecimal("" + (-1000 - i)));
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors[i / 5]);
+ data.setDecimalStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(6, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col5_decimal", cso.getColName());
+ Assert.assertEquals("decimal", cso.getColType());
+ DecimalColumnStatsData lcsd = cso.getStatsData().getDecimalStats();
+ Assert.assertEquals(1010, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01);
+ Assert.assertEquals(-1010, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01);
+ Assert.assertEquals(40, lcsd.getNumNulls());
+ Assert.assertEquals(12, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col5_decimal"));
+ statChecker.checkStats(aggrStats);
+ }
+
+ @Test
+ public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDouble() throws Exception {
+ String dbName = "default";
+ String tableName = "snp";
+ long now = System.currentTimeMillis();
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col5_double", "double", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String> emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String> emptyMap(), null, null, null);
+ store.createTable(table);
+
+ List<List<String>> partVals = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ List<String> partVal = Arrays.asList("" + i);
+ partVals.add(partVal);
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVal);
+ Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String> emptyMap());
+ store.addPartition(part);
+ if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVal);
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col5_double");
+ obj.setColType("double");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ DoubleColumnStatsData dcsd = new DoubleColumnStatsData();
+ dcsd.setHighValue(1000 + i);
+ dcsd.setLowValue(-1000 - i);
+ dcsd.setNumNulls(i);
+ dcsd.setNumDVs(10 * i + 1);
+ dcsd.setBitVectors(bitVectors[i / 5]);
+ data.setDoubleStats(dcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+ store.updatePartitionColumnStatistics(cs, partVal);
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(6, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col5_double", cso.getColName());
+ Assert.assertEquals("double", cso.getColType());
+ DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats();
+ Assert.assertEquals(1010, lcsd.getHighValue(), 0.01);
+ Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01);
+ Assert.assertEquals(40, lcsd.getNumNulls());
+ Assert.assertEquals(12, lcsd.getNumDVs());
+ }
+ };
+ List<String> partNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ partNames.add("ds=" + i);
+ }
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames,
+ Arrays.asList("col5_double"));
+ statChecker.checkStats(aggrStats);
+ }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/explainuser_1.q.out b/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
index b501f97..0eb9132 100644
--- a/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainuser_1.q.out
@@ -426,9 +426,9 @@ Stage-0
                                            <-Map 8 [SIMPLE_EDGE]
                                              SHUFFLE [RS_15]
                                                PartitionCols:_col0, _col1, _col2
- Group By Operator [GBY_14] (rows=1 width=101)
+ Group By Operator [GBY_14] (rows=2 width=101)
                                                  Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
- Filter Operator [FIL_49] (rows=3 width=93)
+ Filter Operator [FIL_49] (rows=5 width=74)
                                                    predicate:((((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                                                    TableScan [TS_11] (rows=20 width=83)
                                                      default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -446,9 +446,9 @@ Stage-0
                                            <-Map 1 [SIMPLE_EDGE]
                                              SHUFFLE [RS_4]
                                                PartitionCols:_col0, _col1, _col2
- Group By Operator [GBY_3] (rows=1 width=101)
+ Group By Operator [GBY_3] (rows=2 width=101)
                                                  Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
- Filter Operator [FIL_48] (rows=3 width=93)
+ Filter Operator [FIL_48] (rows=5 width=74)
                                                    predicate:((((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                                                    TableScan [TS_0] (rows=20 width=83)
                                                      default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1201,11 +1201,11 @@ Stage-0
      Stage-1
        Reducer 3
        File Output Operator [FS_19]
- Select Operator [SEL_18] (rows=21 width=101)
+ Select Operator [SEL_18] (rows=36 width=101)
            Output:["_col0","_col1","_col2","_col3","_col4"]
- Filter Operator [FIL_17] (rows=21 width=101)
+ Filter Operator [FIL_17] (rows=36 width=101)
              predicate:((_col1 > 0) or (_col6 >= 0))
- Merge Join Operator [MERGEJOIN_28] (rows=21 width=101)
+ Merge Join Operator [MERGEJOIN_28] (rows=36 width=101)
                Conds:RS_14._col0=RS_15._col0(Inner),Output:["_col1","_col2","_col3","_col4","_col6"]
              <-Map 5 [SIMPLE_EDGE]
                SHUFFLE [RS_15]
@@ -1219,25 +1219,25 @@ Stage-0
              <-Reducer 2 [SIMPLE_EDGE]
                SHUFFLE [RS_14]
                  PartitionCols:_col0
- Filter Operator [FIL_9] (rows=6 width=182)
+ Filter Operator [FIL_9] (rows=10 width=182)
                    predicate:(((_col1 + _col4) = 2) and ((_col4 + 1) = 2))
- Merge Join Operator [MERGEJOIN_27] (rows=25 width=182)
+ Merge Join Operator [MERGEJOIN_27] (rows=40 width=182)
                      Conds:RS_6._col0=RS_7._col0(Left Outer),Output:["_col0","_col1","_col2","_col3","_col4"]
                    <-Map 1 [SIMPLE_EDGE]
                      SHUFFLE [RS_6]
                        PartitionCols:_col0
- Select Operator [SEL_2] (rows=5 width=74)
+ Select Operator [SEL_2] (rows=9 width=82)
                          Output:["_col0","_col1","_col2"]
- Filter Operator [FIL_24] (rows=5 width=74)
+ Filter Operator [FIL_24] (rows=9 width=82)
                            predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                            TableScan [TS_0] (rows=20 width=83)
                              default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
                    <-Map 4 [SIMPLE_EDGE]
                      SHUFFLE [RS_7]
                        PartitionCols:_col0
- Select Operator [SEL_5] (rows=5 width=71)
+ Select Operator [SEL_5] (rows=9 width=79)
                          Output:["_col0","_col1"]
- Filter Operator [FIL_25] (rows=5 width=74)
+ Filter Operator [FIL_25] (rows=9 width=82)
                            predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                            TableScan [TS_3] (rows=20 width=83)
                              default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1257,27 +1257,27 @@ Stage-0
      Stage-1
        Reducer 2
        File Output Operator [FS_14]
- Select Operator [SEL_13] (rows=24 width=101)
+ Select Operator [SEL_13] (rows=50 width=101)
            Output:["_col0","_col1","_col2","_col3","_col4"]
- Filter Operator [FIL_12] (rows=24 width=101)
+ Filter Operator [FIL_12] (rows=50 width=101)
              predicate:(((_col1 + _col4) = 2) and ((_col1 > 0) or (_col6 >= 0)) and ((_col4 + 1) = 2))
- Merge Join Operator [MERGEJOIN_19] (rows=72 width=101)
+ Merge Join Operator [MERGEJOIN_19] (rows=200 width=101)
                Conds:RS_8._col0=RS_9._col0(Right Outer),RS_8._col0=RS_10._col0(Right Outer),Output:["_col1","_col2","_col3","_col4","_col6"]
              <-Map 1 [SIMPLE_EDGE]
                SHUFFLE [RS_8]
                  PartitionCols:_col0
- Select Operator [SEL_2] (rows=6 width=77)
+ Select Operator [SEL_2] (rows=10 width=83)
                    Output:["_col0","_col1","_col2"]
- Filter Operator [FIL_17] (rows=6 width=77)
+ Filter Operator [FIL_17] (rows=10 width=83)
                      predicate:(((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0)))
                      TableScan [TS_0] (rows=20 width=83)
                        default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
              <-Map 3 [SIMPLE_EDGE]
                SHUFFLE [RS_9]
                  PartitionCols:_col0
- Select Operator [SEL_5] (rows=6 width=74)
+ Select Operator [SEL_5] (rows=10 width=80)
                    Output:["_col0","_col1"]
- Filter Operator [FIL_18] (rows=6 width=77)
+ Filter Operator [FIL_18] (rows=10 width=83)
                      predicate:(((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0)))
                      TableScan [TS_3] (rows=20 width=83)
                        default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1509,53 +1509,53 @@ Stage-0
                                  Output:["_col0","_col1","_col2"]
                                  Filter Operator [FIL_31] (rows=1 width=101)
                                    predicate:((_col1 + _col4) >= 0)
- Merge Join Operator [MERGEJOIN_60] (rows=1 width=101)
+ Merge Join Operator [MERGEJOIN_60] (rows=2 width=101)
                                      Conds:RS_28._col0=RS_29._col0(Inner),Output:["_col0","_col1","_col2","_col4"]
                                    <-Reducer 10 [SIMPLE_EDGE]
                                      SHUFFLE [RS_29]
                                        PartitionCols:_col0
- Filter Operator [FIL_26] (rows=1 width=105)
+ Filter Operator [FIL_26] (rows=2 width=62)
                                          predicate:_col0 is not null
- Limit [LIM_24] (rows=1 width=105)
+ Limit [LIM_24] (rows=3 width=76)
                                            Number of rows:5
- Select Operator [SEL_23] (rows=1 width=105)
+ Select Operator [SEL_23] (rows=3 width=76)
                                              Output:["_col0","_col1"]
                                            <-Reducer 9 [SIMPLE_EDGE]
                                              SHUFFLE [RS_22]
- Select Operator [SEL_20] (rows=1 width=105)
+ Select Operator [SEL_20] (rows=3 width=76)
                                                  Output:["_col0","_col1","_col2","_col3"]
- Group By Operator [GBY_19] (rows=1 width=101)
+ Group By Operator [GBY_19] (rows=3 width=70)
                                                    Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
                                                  <-Map 8 [SIMPLE_EDGE]
                                                    SHUFFLE [RS_18]
                                                      PartitionCols:_col0, _col1, _col2
- Group By Operator [GBY_17] (rows=1 width=101)
+ Group By Operator [GBY_17] (rows=3 width=70)
                                                        Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
- Filter Operator [FIL_58] (rows=4 width=93)
+ Filter Operator [FIL_58] (rows=6 width=77)
                                                          predicate:(((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0)))
                                                          TableScan [TS_14] (rows=20 width=83)
                                                            default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
                                    <-Reducer 3 [SIMPLE_EDGE]
                                      SHUFFLE [RS_28]
                                        PartitionCols:_col0
- Filter Operator [FIL_12] (rows=1 width=97)
+ Filter Operator [FIL_12] (rows=2 width=54)
                                          predicate:_col0 is not null
- Limit [LIM_10] (rows=1 width=97)
+ Limit [LIM_10] (rows=3 width=68)
                                            Number of rows:5
- Select Operator [SEL_9] (rows=1 width=97)
+ Select Operator [SEL_9] (rows=3 width=68)
                                              Output:["_col0","_col1","_col2"]
                                            <-Reducer 2 [SIMPLE_EDGE]
                                              SHUFFLE [RS_8]
- Select Operator [SEL_6] (rows=1 width=97)
+ Select Operator [SEL_6] (rows=3 width=68)
                                                  Output:["_col0","_col1","_col2"]
- Group By Operator [GBY_5] (rows=1 width=101)
+ Group By Operator [GBY_5] (rows=3 width=70)
                                                    Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1, KEY._col2
                                                  <-Map 1 [SIMPLE_EDGE]
                                                    SHUFFLE [RS_4]
                                                      PartitionCols:_col0, _col1, _col2
- Group By Operator [GBY_3] (rows=1 width=101)
+ Group By Operator [GBY_3] (rows=3 width=70)
                                                        Output:["_col0","_col1","_col2","_col3"],aggregations:["sum(c_int)"],keys:key, c_int, c_float
- Filter Operator [FIL_56] (rows=4 width=93)
+ Filter Operator [FIL_56] (rows=6 width=77)
                                                          predicate:(((c_int + 1) >= 0) and ((c_int > 0) or (c_float >= 0.0)))
                                                          TableScan [TS_0] (rows=20 width=83)
                                                            default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1575,16 +1575,16 @@ Stage-0
      Stage-1
        Reducer 2
        File Output Operator [FS_12]
- Select Operator [SEL_11] (rows=6 width=4)
+ Select Operator [SEL_11] (rows=11 width=4)
            Output:["_col0"]
- Merge Join Operator [MERGEJOIN_17] (rows=6 width=4)
+ Merge Join Operator [MERGEJOIN_17] (rows=11 width=4)
              Conds:RS_8._col0=RS_9._col0(Left Semi),Output:["_col1"]
            <-Map 1 [SIMPLE_EDGE]
              SHUFFLE [RS_8]
                PartitionCols:_col0
- Select Operator [SEL_2] (rows=5 width=74)
+ Select Operator [SEL_2] (rows=9 width=82)
                  Output:["_col0","_col1"]
- Filter Operator [FIL_15] (rows=5 width=74)
+ Filter Operator [FIL_15] (rows=9 width=82)
                    predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                    TableScan [TS_0] (rows=20 width=83)
                      default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
@@ -1615,27 +1615,27 @@ Stage-0
      Stage-1
        Reducer 2
        File Output Operator [FS_18]
- Select Operator [SEL_17] (rows=12 width=93)
+ Select Operator [SEL_17] (rows=16 width=93)
            Output:["_col0","_col1","_col2"]
- Merge Join Operator [MERGEJOIN_28] (rows=12 width=93)
+ Merge Join Operator [MERGEJOIN_28] (rows=16 width=93)
              Conds:RS_13._col0=RS_14._col0(Left Semi),RS_13._col0=RS_15._col0(Left Semi),Output:["_col0","_col1","_col2"]
            <-Map 1 [SIMPLE_EDGE]
              SHUFFLE [RS_13]
                PartitionCols:_col0
- Select Operator [SEL_2] (rows=5 width=74)
+ Select Operator [SEL_2] (rows=9 width=82)
                  Output:["_col0","_col1","_col2"]
- Filter Operator [FIL_25] (rows=5 width=74)
+ Filter Operator [FIL_25] (rows=9 width=82)
                    predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                    TableScan [TS_0] (rows=20 width=83)
                      default@cbo_t1,cbo_t1,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]
            <-Map 3 [SIMPLE_EDGE]
              SHUFFLE [RS_14]
                PartitionCols:_col0
- Group By Operator [GBY_10] (rows=2 width=85)
+ Group By Operator [GBY_10] (rows=3 width=85)
                  Output:["_col0"],keys:_col0
- Select Operator [SEL_5] (rows=5 width=68)
+ Select Operator [SEL_5] (rows=9 width=75)
                    Output:["_col0"]
- Filter Operator [FIL_26] (rows=5 width=74)
+ Filter Operator [FIL_26] (rows=9 width=82)
                      predicate:((((c_int + 1) = 2) and ((c_int > 0) or (c_float >= 0.0))) and key is not null)
                      TableScan [TS_3] (rows=20 width=83)
                        default@cbo_t2,cbo_t2,Tbl:COMPLETE,Col:COMPLETE,Output:["key","c_int","c_float"]

Search Discussions

  • Pxiong at Mar 27, 2016 at 7:14 pm
    HIVE-12960: Migrate Column Stats Extrapolation and UniformDistribution to HBaseStore (Pengcheng Xiong, reviewed by Ashutosh Chauhan)


    Project: http://git-wip-us.apache.org/repos/asf/hive/repo
    Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/96862093
    Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/96862093
    Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/96862093

    Branch: refs/heads/master
    Commit: 968620932301dc64cd435292726943a6c0a42551
    Parents: 3038b05
    Author: Pengcheng Xiong <pxiong@apache.org>
    Authored: Sun Mar 27 11:46:17 2016 -0700
    Committer: Pengcheng Xiong <pxiong@apache.org>
    Committed: Sun Mar 27 12:11:39 2016 -0700

    ----------------------------------------------------------------------
      .../hive/metastore/StatObjectConverter.java | 2 +-
      .../hadoop/hive/metastore/hbase/HBaseUtils.java | 8 +-
      .../hadoop/hive/metastore/hbase/StatsCache.java | 20 +-
      .../stats/BinaryColumnStatsAggregator.java | 43 +-
      .../stats/BooleanColumnStatsAggregator.java | 42 +-
      .../hbase/stats/ColumnStatsAggregator.java | 12 +-
      .../stats/ColumnStatsAggregatorFactory.java | 8 +-
      .../stats/DecimalColumnStatsAggregator.java | 340 ++++++++-
      .../stats/DoubleColumnStatsAggregator.java | 307 +++++++-
      .../hbase/stats/IExtrapolatePartStatus.java | 30 +
      .../hbase/stats/LongColumnStatsAggregator.java | 305 +++++++-
      .../stats/StringColumnStatsAggregator.java | 85 ++-
      ...stHBaseAggregateStatsCacheWithBitVector.java | 6 +-
      .../TestHBaseAggregateStatsExtrapolation.java | 717 +++++++++++++++++++
      .../TestHBaseAggregateStatsNDVUniformDist.java | 581 +++++++++++++++
      .../clientpositive/tez/explainuser_1.q.out | 92 +--
      16 files changed, 2454 insertions(+), 144 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
    index b3ceff1..e119dd8 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
    @@ -650,7 +650,7 @@ public class StatObjectConverter {
          }
        }

    - private static Decimal createThriftDecimal(String s) {
    + public static Decimal createThriftDecimal(String s) {
          BigDecimal d = new BigDecimal(s);
          return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short)d.scale());
        }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
    index 9ec7cd5..e0b449b 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
    @@ -19,6 +19,8 @@
      package org.apache.hadoop.hive.metastore.hbase;

      import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.math.BigInteger;
      import java.nio.charset.Charset;
      import java.nio.charset.StandardCharsets;
      import java.security.MessageDigest;
    @@ -88,7 +90,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
      /**
       * Utility functions
       */
    -class HBaseUtils {
    +public class HBaseUtils {

        final static Charset ENCODING = StandardCharsets.UTF_8;
        final static char KEY_SEPARATOR = '\u0001';
    @@ -1421,4 +1423,8 @@ class HBaseUtils {
          b[7] = (byte)(v >>> 0);
          return b;
        }
    +
    + public static double getDoubleValue(Decimal decimal) {
    + return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue();
    + }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
    index f1d2e50..18f8afc 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
    @@ -85,12 +85,12 @@ class StatsCache {
                @Override
                public AggrStats load(StatsCacheKey key) throws Exception {
                  int numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
    + boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
                  HBaseReadWrite hrw = HBaseReadWrite.getInstance();
                  AggrStats aggrStats = hrw.getAggregatedStats(key.hashed);
                  if (aggrStats == null) {
                    misses.incr();
                    ColumnStatsAggregator aggregator = null;
    - ColumnStatisticsObj statsObj = null;
                    aggrStats = new AggrStats();
                    LOG.debug("Unable to find aggregated stats for " + key.colName + ", aggregating");
                    List<ColumnStatistics> css = hrw.getPartitionStatistics(key.dbName, key.tableName,
    @@ -98,19 +98,13 @@ class StatsCache {
                        Collections.singletonList(key.colName));
                    if (css != null && css.size() > 0) {
                      aggrStats.setPartsFound(css.size());
    - for (ColumnStatistics cs : css) {
    - for (ColumnStatisticsObj cso : cs.getStatsObj()) {
    - if (statsObj == null) {
    - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(key.colName,
    - cso.getColType(), cso.getStatsData().getSetField());
    - }
    - if (aggregator == null) {
    - aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(
    - cso.getStatsData().getSetField(), numBitVectors);
    - }
    - aggregator.aggregate(statsObj, cso);
    - }
    + if (aggregator == null) {
    + aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css.iterator()
    + .next().getStatsObj().iterator().next().getStatsData().getSetField(),
    + numBitVectors, useDensityFunctionForNDVEstimation);
                      }
    + ColumnStatisticsObj statsObj = aggregator
    + .aggregate(key.colName, key.partNames, css);
                      aggrStats.addToColStats(statsObj);
                      me.put(key, aggrStats);
                    }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
    index 40340dd..d81d612 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
    @@ -19,17 +19,46 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    +import java.util.List;
    +
      import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
    +import org.apache.hadoop.hive.metastore.api.MetaException;

    -public class BinaryColumnStatsAggregator extends ColumnStatsAggregator{
    +public class BinaryColumnStatsAggregator extends ColumnStatsAggregator {

        @Override
    - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
    - BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats();
    - BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats();
    - aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
    - aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
    - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException {
    + ColumnStatisticsObj statsObj = null;
    + BinaryColumnStatsData aggregateData = null;
    + String colType = null;
    + for (ColumnStatistics cs : css) {
    + if (cs.getStatsObjSize() != 1) {
    + throw new MetaException(
    + "The number of columns should be exactly one in aggrStats, but found "
    + + cs.getStatsObjSize());
    + }
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + if (statsObj == null) {
    + colType = cso.getColType();
    + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
    + .getStatsData().getSetField());
    + }
    + BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats();
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
    + aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + }
    + }
    + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
    + columnStatisticsData.setBinaryStats(aggregateData);
    + statsObj.setStatsData(columnStatisticsData);
    + return statsObj;
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
    index 735d965..e796df2 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
    @@ -19,17 +19,47 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    +import java.util.List;
    +
      import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
    +import org.apache.hadoop.hive.metastore.api.MetaException;

      public class BooleanColumnStatsAggregator extends ColumnStatsAggregator {

        @Override
    - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
    - BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats();
    - BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats();
    - aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
    - aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
    - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException {
    + ColumnStatisticsObj statsObj = null;
    + BooleanColumnStatsData aggregateData = null;
    + String colType = null;
    + for (ColumnStatistics cs : css) {
    + if (cs.getStatsObjSize() != 1) {
    + throw new MetaException(
    + "The number of columns should be exactly one in aggrStats, but found "
    + + cs.getStatsObjSize());
    + }
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + if (statsObj == null) {
    + colType = cso.getColType();
    + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
    + .getStatsData().getSetField());
    + }
    + BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats();
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
    + aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + }
    + }
    + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
    + columnStatisticsData.setBooleanStats(aggregateData);
    + statsObj.setStatsData(columnStatisticsData);
    + return statsObj;
        }
    +
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
    index 694e53b..31955b4 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
    @@ -19,10 +19,16 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    -import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
    +import org.apache.hadoop.hive.metastore.api.MetaException;

      public abstract class ColumnStatsAggregator {
    - NumDistinctValueEstimator ndvEstimator = null;
    - public abstract void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats);
    + public int numBitVectors;
    + public boolean useDensityFunctionForNDVEstimation;
    +
    + public abstract ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException;
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
    index 8eb127b..daf8569 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
    @@ -19,7 +19,6 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    -import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
      import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
      import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
    @@ -35,7 +34,7 @@ public class ColumnStatsAggregatorFactory {
        private ColumnStatsAggregatorFactory() {
        }

    - public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors) {
    + public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors, boolean useDensityFunctionForNDVEstimation) {
          ColumnStatsAggregator agg;
          switch (type) {
          case BOOLEAN_STATS:
    @@ -59,9 +58,8 @@ public class ColumnStatsAggregatorFactory {
          default:
            throw new RuntimeException("Woh, bad. Unknown stats type " + type.toString());
          }
    - if (numBitVectors > 0) {
    - agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    - }
    + agg.numBitVectors = numBitVectors;
    + agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation;
          return agg;
        }


    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
    index 50f4325..36b2c9c 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
    @@ -19,33 +19,333 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
      import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
    +import org.apache.hadoop.hive.metastore.StatObjectConverter;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
    -import org.apache.hadoop.hive.metastore.api.Decimal;
      import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
    +import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;

    -public class DecimalColumnStatsAggregator extends ColumnStatsAggregator {
    +public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements
    + IExtrapolatePartStatus {

        @Override
    - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
    - DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats();
    - DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats();
    - Decimal lowValue = aggregateData.getLowValue() != null
    - && (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData
    - .getLowValue() : newData.getLowValue();
    - aggregateData.setLowValue(lowValue);
    - Decimal highValue = aggregateData.getHighValue() != null
    - && (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData
    - .getHighValue() : newData.getHighValue();
    - aggregateData.setHighValue(highValue);
    - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
    - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException {
    + ColumnStatisticsObj statsObj = null;
    +
    + // check if all the ColumnStatisticsObjs contain stats and all the ndv are
    + // bitvectors
    + boolean doAllPartitionContainStats = partNames.size() == css.size();
    + boolean isNDVBitVectorSet = true;
    + String colType = null;
    + for (ColumnStatistics cs : css) {
    + if (cs.getStatsObjSize() != 1) {
    + throw new MetaException(
    + "The number of columns should be exactly one in aggrStats, but found "
    + + cs.getStatsObjSize());
    + }
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + if (statsObj == null) {
    + colType = cso.getColType();
    + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
    + .getStatsData().getSetField());
    + }
    + if (numBitVectors <= 0 || !cso.getStatsData().getDecimalStats().isSetBitVectors()
    + || cso.getStatsData().getDecimalStats().getBitVectors().length() == 0) {
    + isNDVBitVectorSet = false;
    + break;
    + }
    + }
    + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
    + if (doAllPartitionContainStats || css.size() < 2) {
    + DecimalColumnStatsData aggregateData = null;
    + long lowerBound = 0;
    + long higherBound = 0;
    + double densityAvgSum = 0.0;
    + NumDistinctValueEstimator ndvEstimator = null;
    + if (isNDVBitVectorSet) {
    + ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + }
    + for (ColumnStatistics cs : css) {
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
    + if (useDensityFunctionForNDVEstimation) {
    + lowerBound = Math.max(lowerBound, newData.getNumDVs());
    + higherBound += newData.getNumDVs();
    + densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
    + .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
    + }
    + if (isNDVBitVectorSet) {
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + }
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils
    + .getDoubleValue(newData.getLowValue())) {
    + aggregateData.setLowValue(aggregateData.getLowValue());
    + } else {
    + aggregateData.setLowValue(newData.getLowValue());
    + }
    + if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils
    + .getDoubleValue(newData.getHighValue())) {
    + aggregateData.setHighValue(aggregateData.getHighValue());
    + } else {
    + aggregateData.setHighValue(newData.getHighValue());
    + }
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + }
    + }
    + if (isNDVBitVectorSet) {
    + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
    + // use uniform distribution assumption because we can merge bitvectors
    + // to get a good estimation.
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + } else {
    + if (useDensityFunctionForNDVEstimation) {
    + // We have estimation, lowerbound and higherbound. We use estimation
    + // if it is between lowerbound and higherbound.
    + double densityAvg = densityAvgSum / partNames.size();
    + long estimation = (long) ((HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
    + .getDoubleValue(aggregateData.getLowValue())) / densityAvg);
    + if (estimation < lowerBound) {
    + aggregateData.setNumDVs(lowerBound);
    + } else if (estimation > higherBound) {
    + aggregateData.setNumDVs(higherBound);
    + } else {
    + aggregateData.setNumDVs(estimation);
    + }
    + } else {
    + // Without useDensityFunctionForNDVEstimation, we just use the
    + // default one, which is the max of all the partitions and it is
    + // already done.
    + }
    + }
    + columnStatisticsData.setDecimalStats(aggregateData);
    + } else {
    + // we need extrapolation
    + Map<String, Integer> indexMap = new HashMap<String, Integer>();
    + for (int index = 0; index < partNames.size(); index++) {
    + indexMap.put(partNames.get(index), index);
    + }
    + Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
    + Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
    + // while we scan the css, we also get the densityAvg, lowerbound and
    + // higerbound when useDensityFunctionForNDVEstimation is true.
    + double densityAvgSum = 0.0;
    + if (!isNDVBitVectorSet) {
    + // if not every partition uses bitvector for ndv, we just fall back to
    + // the traditional extrapolation methods.
    + for (ColumnStatistics cs : css) {
    + String partName = cs.getStatsDesc().getPartName();
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils
    + .getDoubleValue(newData.getLowValue())) / newData.getNumDVs();
    + }
    + adjustedIndexMap.put(partName, (double) indexMap.get(partName));
    + adjustedStatsMap.put(partName, cso.getStatsData());
    + }
    + } else {
    + // we first merge all the adjacent bitvectors that we could merge and
    + // derive new partition names and index.
    + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + StringBuilder pseudoPartName = new StringBuilder();
    + double pseudoIndexSum = 0;
    + int length = 0;
    + int curIndex = -1;
    + DecimalColumnStatsData aggregateData = null;
    + for (ColumnStatistics cs : css) {
    + String partName = cs.getStatsDesc().getPartName();
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats();
    + // newData.isSetBitVectors() should be true for sure because we
    + // already checked it before.
    + if (indexMap.get(partName) != curIndex) {
    + // There is bitvector, but it is not adjacent to the previous ones.
    + if (length > 0) {
    + // we have to set ndv
    + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + ColumnStatisticsData csd = new ColumnStatisticsData();
    + csd.setDecimalStats(aggregateData);
    + adjustedStatsMap.put(pseudoPartName.toString(), csd);
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
    + .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs();
    + }
    + // reset everything
    + pseudoPartName = new StringBuilder();
    + pseudoIndexSum = 0;
    + length = 0;
    + }
    + aggregateData = null;
    + }
    + curIndex = indexMap.get(partName);
    + pseudoPartName.append(partName);
    + pseudoIndexSum += curIndex;
    + length++;
    + curIndex++;
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils
    + .getDoubleValue(newData.getLowValue())) {
    + aggregateData.setLowValue(aggregateData.getLowValue());
    + } else {
    + aggregateData.setLowValue(newData.getLowValue());
    + }
    + if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils
    + .getDoubleValue(newData.getHighValue())) {
    + aggregateData.setHighValue(aggregateData.getHighValue());
    + } else {
    + aggregateData.setHighValue(newData.getHighValue());
    + }
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + }
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + }
    + if (length > 0) {
    + // we have to set ndv
    + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + ColumnStatisticsData csd = new ColumnStatisticsData();
    + csd.setDecimalStats(aggregateData);
    + adjustedStatsMap.put(pseudoPartName.toString(), csd);
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils
    + .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs();
    + }
    + }
    + }
    + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
    + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
    + }
    + statsObj.setStatsData(columnStatisticsData);
    + return statsObj;
    + }
    +
    + @Override
    + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
    + int numPartsWithStats, Map<String, Double> adjustedIndexMap,
    + Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
    + int rightBorderInd = numParts;
    + DecimalColumnStatsData extrapolateDecimalData = new DecimalColumnStatsData();
    + Map<String, DecimalColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
    + for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
    + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats());
    + }
    + List<Map.Entry<String, DecimalColumnStatsData>> list = new LinkedList<Map.Entry<String, DecimalColumnStatsData>>(
    + extractedAdjustedStatsMap.entrySet());
    + // get the lowValue
    + Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
    + public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
    + Map.Entry<String, DecimalColumnStatsData> o2) {
    + return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue());
    + }
    + });
    + double minInd = adjustedIndexMap.get(list.get(0).getKey());
    + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + double lowValue = 0;
    + double min = HBaseUtils.getDoubleValue(list.get(0).getValue().getLowValue());
    + double max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getLowValue());
    + if (minInd == maxInd) {
    + lowValue = min;
    + } else if (minInd < maxInd) {
    + // left border is the min
    + lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
    + } else {
    + // right border is the min
    + lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
    + }
    +
    + // get the highValue
    + Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
    + public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
    + Map.Entry<String, DecimalColumnStatsData> o2) {
    + return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue());
    + }
    + });
    + minInd = adjustedIndexMap.get(list.get(0).getKey());
    + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + double highValue = 0;
    + min = HBaseUtils.getDoubleValue(list.get(0).getValue().getHighValue());
    + max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getHighValue());
    + if (minInd == maxInd) {
    + highValue = min;
    + } else if (minInd < maxInd) {
    + // right border is the max
    + highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
    + } else {
    + // left border is the max
    + highValue = (min + (max - min) * minInd / (minInd - maxInd));
    + }
    +
    + // get the #nulls
    + long numNulls = 0;
    + for (Map.Entry<String, DecimalColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
    + numNulls += entry.getValue().getNumNulls();
    + }
    + // we scale up sumNulls based on the number of partitions
    + numNulls = numNulls * numParts / numPartsWithStats;
    +
    + // get the ndv
    + long ndv = 0;
    + long ndvMin = 0;
    + long ndvMax = 0;
    + Collections.sort(list, new Comparator<Map.Entry<String, DecimalColumnStatsData>>() {
    + public int compare(Map.Entry<String, DecimalColumnStatsData> o1,
    + Map.Entry<String, DecimalColumnStatsData> o2) {
    + return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
    + }
    + });
    + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
    + long higherBound = 0;
    + for (Map.Entry<String, DecimalColumnStatsData> entry : list) {
    + higherBound += entry.getValue().getNumDVs();
    + }
    + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
    + ndv = (long) ((highValue - lowValue) / densityAvg);
    + if (ndv < lowerBound) {
    + ndv = lowerBound;
    + } else if (ndv > higherBound) {
    + ndv = higherBound;
    + }
          } else {
    - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    - ndvEstimator.getnumBitVectors()));
    - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    - aggregateData.setBitVectors(ndvEstimator.serialize().toString());
    + minInd = adjustedIndexMap.get(list.get(0).getKey());
    + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + ndvMin = list.get(0).getValue().getNumDVs();
    + ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
    + if (minInd == maxInd) {
    + ndv = ndvMin;
    + } else if (minInd < maxInd) {
    + // right border is the max
    + ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd));
    + } else {
    + // left border is the max
    + ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
    + }
          }
    + extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String
    + .valueOf(lowValue)));
    + extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String
    + .valueOf(highValue)));
    + extrapolateDecimalData.setNumNulls(numNulls);
    + extrapolateDecimalData.setNumDVs(ndv);
    + extrapolateData.setDecimalStats(extrapolateDecimalData);
        }
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
    index d945ec2..a88ef84 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
    @@ -19,26 +19,307 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
      import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
      import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
    +import org.apache.hadoop.hive.metastore.api.MetaException;

    -public class DoubleColumnStatsAggregator extends ColumnStatsAggregator {
    +public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implements
    + IExtrapolatePartStatus {

        @Override
    - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
    - DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats();
    - DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats();
    - aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
    - aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
    - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
    - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException {
    + ColumnStatisticsObj statsObj = null;
    +
    + // check if all the ColumnStatisticsObjs contain stats and all the ndv are
    + // bitvectors
    + boolean doAllPartitionContainStats = partNames.size() == css.size();
    + boolean isNDVBitVectorSet = true;
    + String colType = null;
    + for (ColumnStatistics cs : css) {
    + if (cs.getStatsObjSize() != 1) {
    + throw new MetaException(
    + "The number of columns should be exactly one in aggrStats, but found "
    + + cs.getStatsObjSize());
    + }
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + if (statsObj == null) {
    + colType = cso.getColType();
    + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
    + .getStatsData().getSetField());
    + }
    + if (numBitVectors <= 0 || !cso.getStatsData().getDoubleStats().isSetBitVectors()
    + || cso.getStatsData().getDoubleStats().getBitVectors().length() == 0) {
    + isNDVBitVectorSet = false;
    + break;
    + }
    + }
    + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
    + if (doAllPartitionContainStats || css.size() < 2) {
    + DoubleColumnStatsData aggregateData = null;
    + long lowerBound = 0;
    + long higherBound = 0;
    + double densityAvgSum = 0.0;
    + NumDistinctValueEstimator ndvEstimator = null;
    + if (isNDVBitVectorSet) {
    + ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + }
    + for (ColumnStatistics cs : css) {
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
    + if (useDensityFunctionForNDVEstimation) {
    + lowerBound = Math.max(lowerBound, newData.getNumDVs());
    + higherBound += newData.getNumDVs();
    + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
    + }
    + if (isNDVBitVectorSet) {
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + }
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
    + aggregateData
    + .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + }
    + }
    + if (isNDVBitVectorSet) {
    + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
    + // use uniform distribution assumption because we can merge bitvectors
    + // to get a good estimation.
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + } else {
    + if (useDensityFunctionForNDVEstimation) {
    + // We have estimation, lowerbound and higherbound. We use estimation
    + // if it is between lowerbound and higherbound.
    + double densityAvg = densityAvgSum / partNames.size();
    + long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
    + if (estimation < lowerBound) {
    + aggregateData.setNumDVs(lowerBound);
    + } else if (estimation > higherBound) {
    + aggregateData.setNumDVs(higherBound);
    + } else {
    + aggregateData.setNumDVs(estimation);
    + }
    + } else {
    + // Without useDensityFunctionForNDVEstimation, we just use the
    + // default one, which is the max of all the partitions and it is
    + // already done.
    + }
    + }
    + columnStatisticsData.setDoubleStats(aggregateData);
    + } else {
    + // we need extrapolation
    + Map<String, Integer> indexMap = new HashMap<String, Integer>();
    + for (int index = 0; index < partNames.size(); index++) {
    + indexMap.put(partNames.get(index), index);
    + }
    + Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
    + Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
    + // while we scan the css, we also get the densityAvg, lowerbound and
    + // higerbound when useDensityFunctionForNDVEstimation is true.
    + double densityAvgSum = 0.0;
    + if (!isNDVBitVectorSet) {
    + // if not every partition uses bitvector for ndv, we just fall back to
    + // the traditional extrapolation methods.
    + for (ColumnStatistics cs : css) {
    + String partName = cs.getStatsDesc().getPartName();
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
    + }
    + adjustedIndexMap.put(partName, (double) indexMap.get(partName));
    + adjustedStatsMap.put(partName, cso.getStatsData());
    + }
    + } else {
    + // we first merge all the adjacent bitvectors that we could merge and
    + // derive new partition names and index.
    + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + StringBuilder pseudoPartName = new StringBuilder();
    + double pseudoIndexSum = 0;
    + int length = 0;
    + int curIndex = -1;
    + DoubleColumnStatsData aggregateData = null;
    + for (ColumnStatistics cs : css) {
    + String partName = cs.getStatsDesc().getPartName();
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats();
    + // newData.isSetBitVectors() should be true for sure because we
    + // already checked it before.
    + if (indexMap.get(partName) != curIndex) {
    + // There is bitvector, but it is not adjacent to the previous ones.
    + if (length > 0) {
    + // we have to set ndv
    + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + ColumnStatisticsData csd = new ColumnStatisticsData();
    + csd.setDoubleStats(aggregateData);
    + adjustedStatsMap.put(pseudoPartName.toString(), csd);
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
    + }
    + // reset everything
    + pseudoPartName = new StringBuilder();
    + pseudoIndexSum = 0;
    + length = 0;
    + }
    + aggregateData = null;
    + }
    + curIndex = indexMap.get(partName);
    + pseudoPartName.append(partName);
    + pseudoIndexSum += curIndex;
    + length++;
    + curIndex++;
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
    + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
    + newData.getHighValue()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + }
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + }
    + if (length > 0) {
    + // we have to set ndv
    + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + ColumnStatisticsData csd = new ColumnStatisticsData();
    + csd.setDoubleStats(aggregateData);
    + adjustedStatsMap.put(pseudoPartName.toString(), csd);
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
    + }
    + }
    + }
    + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
    + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
    + }
    + statsObj.setStatsData(columnStatisticsData);
    + return statsObj;
    + }
    +
    + @Override
    + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
    + int numPartsWithStats, Map<String, Double> adjustedIndexMap,
    + Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
    + int rightBorderInd = numParts;
    + DoubleColumnStatsData extrapolateDoubleData = new DoubleColumnStatsData();
    + Map<String, DoubleColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
    + for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
    + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDoubleStats());
    + }
    + List<Map.Entry<String, DoubleColumnStatsData>> list = new LinkedList<Map.Entry<String, DoubleColumnStatsData>>(
    + extractedAdjustedStatsMap.entrySet());
    + // get the lowValue
    + Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
    + public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
    + Map.Entry<String, DoubleColumnStatsData> o2) {
    + return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1;
    + }
    + });
    + double minInd = adjustedIndexMap.get(list.get(0).getKey());
    + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + double lowValue = 0;
    + double min = list.get(0).getValue().getLowValue();
    + double max = list.get(list.size() - 1).getValue().getLowValue();
    + if (minInd == maxInd) {
    + lowValue = min;
    + } else if (minInd < maxInd) {
    + // left border is the min
    + lowValue = (max - (max - min) * maxInd / (maxInd - minInd));
    + } else {
    + // right border is the min
    + lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
    + }
    +
    + // get the highValue
    + Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
    + public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
    + Map.Entry<String, DoubleColumnStatsData> o2) {
    + return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1;
    + }
    + });
    + minInd = adjustedIndexMap.get(list.get(0).getKey());
    + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + double highValue = 0;
    + min = list.get(0).getValue().getHighValue();
    + max = list.get(list.size() - 1).getValue().getHighValue();
    + if (minInd == maxInd) {
    + highValue = min;
    + } else if (minInd < maxInd) {
    + // right border is the max
    + highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
    + } else {
    + // left border is the max
    + highValue = (min + (max - min) * minInd / (minInd - maxInd));
    + }
    +
    + // get the #nulls
    + long numNulls = 0;
    + for (Map.Entry<String, DoubleColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
    + numNulls += entry.getValue().getNumNulls();
    + }
    + // we scale up sumNulls based on the number of partitions
    + numNulls = numNulls * numParts / numPartsWithStats;
    +
    + // get the ndv
    + long ndv = 0;
    + long ndvMin = 0;
    + long ndvMax = 0;
    + Collections.sort(list, new Comparator<Map.Entry<String, DoubleColumnStatsData>>() {
    + public int compare(Map.Entry<String, DoubleColumnStatsData> o1,
    + Map.Entry<String, DoubleColumnStatsData> o2) {
    + return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
    + }
    + });
    + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
    + long higherBound = 0;
    + for (Map.Entry<String, DoubleColumnStatsData> entry : list) {
    + higherBound += entry.getValue().getNumDVs();
    + }
    + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
    + ndv = (long) ((highValue - lowValue) / densityAvg);
    + if (ndv < lowerBound) {
    + ndv = lowerBound;
    + } else if (ndv > higherBound) {
    + ndv = higherBound;
    + }
          } else {
    - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    - ndvEstimator.getnumBitVectors()));
    - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    - aggregateData.setBitVectors(ndvEstimator.serialize().toString());
    + minInd = adjustedIndexMap.get(list.get(0).getKey());
    + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + ndvMin = list.get(0).getValue().getNumDVs();
    + ndvMax = list.get(list.size() - 1).getValue().getNumDVs();
    + if (minInd == maxInd) {
    + ndv = ndvMin;
    + } else if (minInd < maxInd) {
    + // right border is the max
    + ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd));
    + } else {
    + // left border is the max
    + ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd));
    + }
          }
    + extrapolateDoubleData.setLowValue(lowValue);
    + extrapolateDoubleData.setHighValue(highValue);
    + extrapolateDoubleData.setNumNulls(numNulls);
    + extrapolateDoubleData.setNumDVs(ndv);
    + extrapolateData.setDoubleStats(extrapolateDoubleData);
        }
    +
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
    new file mode 100644
    index 0000000..99af060
    --- /dev/null
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java
    @@ -0,0 +1,30 @@
    +package org.apache.hadoop.hive.metastore.hbase.stats;
    +
    +import java.util.Map;
    +
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
    +
    +public interface IExtrapolatePartStatus {
    + // The following function will extrapolate the stats when the column stats of
    + // some partitions are missing.
    + /**
    + * @param extrapolateData
    + * it will carry back the specific stats, e.g., DOUBLE_STATS or
    + * LONG_STATS
    + * @param numParts
    + * the total number of partitions
    + * @param numPartsWithStats
    + * the number of partitions that have stats
    + * @param adjustedIndexMap
    + * the partition name to index map
    + * @param adjustedStatsMap
    + * the partition name to its stats map
    + * @param densityAvg
    + * the average of ndv density, which is useful when
    + * useDensityFunctionForNDVEstimation is true.
    + */
    + public abstract void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
    + int numPartsWithStats, Map<String, Double> adjustedIndexMap,
    + Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg);
    +
    +}

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
    index 068dd00..8ac6561 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
    @@ -19,26 +19,305 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
      import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
      import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
    +import org.apache.hadoop.hive.metastore.api.MetaException;

    -public class LongColumnStatsAggregator extends ColumnStatsAggregator {
    +public class LongColumnStatsAggregator extends ColumnStatsAggregator implements
    + IExtrapolatePartStatus {

        @Override
    - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
    - LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats();
    - LongColumnStatsData newData = newColStats.getStatsData().getLongStats();
    - aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
    - aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
    - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
    - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException {
    + ColumnStatisticsObj statsObj = null;
    +
    + // check if all the ColumnStatisticsObjs contain stats and all the ndv are
    + // bitvectors
    + boolean doAllPartitionContainStats = partNames.size() == css.size();
    + boolean isNDVBitVectorSet = true;
    + String colType = null;
    + for (ColumnStatistics cs : css) {
    + if (cs.getStatsObjSize() != 1) {
    + throw new MetaException(
    + "The number of columns should be exactly one in aggrStats, but found "
    + + cs.getStatsObjSize());
    + }
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + if (statsObj == null) {
    + colType = cso.getColType();
    + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
    + .getStatsData().getSetField());
    + }
    + if (numBitVectors <= 0 || !cso.getStatsData().getLongStats().isSetBitVectors()
    + || cso.getStatsData().getLongStats().getBitVectors().length() == 0) {
    + isNDVBitVectorSet = false;
    + break;
    + }
    + }
    + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
    + if (doAllPartitionContainStats || css.size() < 2) {
    + LongColumnStatsData aggregateData = null;
    + long lowerBound = 0;
    + long higherBound = 0;
    + double densityAvgSum = 0.0;
    + NumDistinctValueEstimator ndvEstimator = null;
    + if (isNDVBitVectorSet) {
    + ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + }
    + for (ColumnStatistics cs : css) {
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + LongColumnStatsData newData = cso.getStatsData().getLongStats();
    + if (useDensityFunctionForNDVEstimation) {
    + lowerBound = Math.max(lowerBound, newData.getNumDVs());
    + higherBound += newData.getNumDVs();
    + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
    + }
    + if (isNDVBitVectorSet) {
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + }
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
    + aggregateData
    + .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + }
    + }
    + if (isNDVBitVectorSet) {
    + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to
    + // use uniform distribution assumption because we can merge bitvectors
    + // to get a good estimation.
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + } else {
    + if (useDensityFunctionForNDVEstimation) {
    + // We have estimation, lowerbound and higherbound. We use estimation
    + // if it is between lowerbound and higherbound.
    + double densityAvg = densityAvgSum / partNames.size();
    + long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg);
    + if (estimation < lowerBound) {
    + aggregateData.setNumDVs(lowerBound);
    + } else if (estimation > higherBound) {
    + aggregateData.setNumDVs(higherBound);
    + } else {
    + aggregateData.setNumDVs(estimation);
    + }
    + } else {
    + // Without useDensityFunctionForNDVEstimation, we just use the
    + // default one, which is the max of all the partitions and it is
    + // already done.
    + }
    + }
    + columnStatisticsData.setLongStats(aggregateData);
          } else {
    - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    - ndvEstimator.getnumBitVectors()));
    - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    - aggregateData.setBitVectors(ndvEstimator.serialize().toString());
    + // we need extrapolation
    + Map<String, Integer> indexMap = new HashMap<String, Integer>();
    + for (int index = 0; index < partNames.size(); index++) {
    + indexMap.put(partNames.get(index), index);
    + }
    + Map<String, Double> adjustedIndexMap = new HashMap<String, Double>();
    + Map<String, ColumnStatisticsData> adjustedStatsMap = new HashMap<String, ColumnStatisticsData>();
    + // while we scan the css, we also get the densityAvg, lowerbound and
    + // higerbound when useDensityFunctionForNDVEstimation is true.
    + double densityAvgSum = 0.0;
    + if (!isNDVBitVectorSet) {
    + // if not every partition uses bitvector for ndv, we just fall back to
    + // the traditional extrapolation methods.
    + for (ColumnStatistics cs : css) {
    + String partName = cs.getStatsDesc().getPartName();
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + LongColumnStatsData newData = cso.getStatsData().getLongStats();
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs();
    + }
    + adjustedIndexMap.put(partName, (double) indexMap.get(partName));
    + adjustedStatsMap.put(partName, cso.getStatsData());
    + }
    + } else {
    + // we first merge all the adjacent bitvectors that we could merge and
    + // derive new partition names and index.
    + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + StringBuilder pseudoPartName = new StringBuilder();
    + double pseudoIndexSum = 0;
    + int length = 0;
    + int curIndex = -1;
    + LongColumnStatsData aggregateData = null;
    + for (ColumnStatistics cs : css) {
    + String partName = cs.getStatsDesc().getPartName();
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + LongColumnStatsData newData = cso.getStatsData().getLongStats();
    + // newData.isSetBitVectors() should be true for sure because we
    + // already checked it before.
    + if (indexMap.get(partName) != curIndex) {
    + // There is bitvector, but it is not adjacent to the previous ones.
    + if (length > 0) {
    + // we have to set ndv
    + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + ColumnStatisticsData csd = new ColumnStatisticsData();
    + csd.setLongStats(aggregateData);
    + adjustedStatsMap.put(pseudoPartName.toString(), csd);
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
    + }
    + // reset everything
    + pseudoPartName = new StringBuilder();
    + pseudoIndexSum = 0;
    + length = 0;
    + }
    + aggregateData = null;
    + }
    + curIndex = indexMap.get(partName);
    + pseudoPartName.append(partName);
    + pseudoIndexSum += curIndex;
    + length++;
    + curIndex++;
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
    + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(),
    + newData.getHighValue()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + }
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + }
    + if (length > 0) {
    + // we have to set ndv
    + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length);
    + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    + ColumnStatisticsData csd = new ColumnStatisticsData();
    + csd.setLongStats(aggregateData);
    + adjustedStatsMap.put(pseudoPartName.toString(), csd);
    + if (useDensityFunctionForNDVEstimation) {
    + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs();
    + }
    + }
    + }
    + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap,
    + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size());
          }
    + statsObj.setStatsData(columnStatisticsData);
    + return statsObj;
        }
    +
    + @Override
    + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts,
    + int numPartsWithStats, Map<String, Double> adjustedIndexMap,
    + Map<String, ColumnStatisticsData> adjustedStatsMap, double densityAvg) {
    + int rightBorderInd = numParts;
    + LongColumnStatsData extrapolateLongData = new LongColumnStatsData();
    + Map<String, LongColumnStatsData> extractedAdjustedStatsMap = new HashMap<>();
    + for (Map.Entry<String, ColumnStatisticsData> entry : adjustedStatsMap.entrySet()) {
    + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getLongStats());
    + }
    + List<Map.Entry<String, LongColumnStatsData>> list = new LinkedList<Map.Entry<String, LongColumnStatsData>>(
    + extractedAdjustedStatsMap.entrySet());
    + // get the lowValue
    + Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
    + public int compare(Map.Entry<String, LongColumnStatsData> o1,
    + Map.Entry<String, LongColumnStatsData> o2) {
    + return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1;
    + }
    + });
    + double minInd = adjustedIndexMap.get(list.get(0).getKey());
    + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + long lowValue = 0;
    + long min = list.get(0).getValue().getLowValue();
    + long max = list.get(list.size() - 1).getValue().getLowValue();
    + if (minInd == maxInd) {
    + lowValue = min;
    + } else if (minInd < maxInd) {
    + // left border is the min
    + lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd));
    + } else {
    + // right border is the min
    + lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd));
    + }
    +
    + // get the highValue
    + Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
    + public int compare(Map.Entry<String, LongColumnStatsData> o1,
    + Map.Entry<String, LongColumnStatsData> o2) {
    + return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1;
    + }
    + });
    + minInd = adjustedIndexMap.get(list.get(0).getKey());
    + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + long highValue = 0;
    + min = list.get(0).getValue().getHighValue();
    + max = list.get(list.size() - 1).getValue().getHighValue();
    + if (minInd == maxInd) {
    + highValue = min;
    + } else if (minInd < maxInd) {
    + // right border is the max
    + highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
    + } else {
    + // left border is the max
    + highValue = (long) (min + (max - min) * minInd / (minInd - maxInd));
    + }
    +
    + // get the #nulls
    + long numNulls = 0;
    + for (Map.Entry<String, LongColumnStatsData> entry : extractedAdjustedStatsMap.entrySet()) {
    + numNulls += entry.getValue().getNumNulls();
    + }
    + // we scale up sumNulls based on the number of partitions
    + numNulls = numNulls * numParts / numPartsWithStats;
    +
    + // get the ndv
    + long ndv = 0;
    + Collections.sort(list, new Comparator<Map.Entry<String, LongColumnStatsData>>() {
    + public int compare(Map.Entry<String, LongColumnStatsData> o1,
    + Map.Entry<String, LongColumnStatsData> o2) {
    + return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1;
    + }
    + });
    + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs();
    + long higherBound = 0;
    + for (Map.Entry<String, LongColumnStatsData> entry : list) {
    + higherBound += entry.getValue().getNumDVs();
    + }
    + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) {
    + ndv = (long) ((highValue - lowValue) / densityAvg);
    + if (ndv < lowerBound) {
    + ndv = lowerBound;
    + } else if (ndv > higherBound) {
    + ndv = higherBound;
    + }
    + } else {
    + minInd = adjustedIndexMap.get(list.get(0).getKey());
    + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey());
    + min = list.get(0).getValue().getNumDVs();
    + max = list.get(list.size() - 1).getValue().getNumDVs();
    + if (minInd == maxInd) {
    + ndv = min;
    + } else if (minInd < maxInd) {
    + // right border is the max
    + ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd));
    + } else {
    + // left border is the max
    + ndv = (long) (min + (max - min) * minInd / (minInd - maxInd));
    + }
    + }
    + extrapolateLongData.setLowValue(lowValue);
    + extrapolateLongData.setHighValue(highValue);
    + extrapolateLongData.setNumNulls(numNulls);
    + extrapolateLongData.setNumDVs(ndv);
    + extrapolateData.setLongStats(extrapolateLongData);
    + }
    +
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
    index aeb6c39..2aa4046 100644
    --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
    +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
    @@ -19,26 +19,87 @@

      package org.apache.hadoop.hive.metastore.hbase.stats;

    +import java.util.List;
    +
      import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
    +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
      import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
    +import org.apache.hadoop.hive.metastore.api.MetaException;
      import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;

      public class StringColumnStatsAggregator extends ColumnStatsAggregator {

        @Override
    - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
    - StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats();
    - StringColumnStatsData newData = newColStats.getStatsData().getStringStats();
    - aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
    - aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
    - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
    - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    - } else {
    - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    - ndvEstimator.getnumBitVectors()));
    + public ColumnStatisticsObj aggregate(String colName, List<String> partNames,
    + List<ColumnStatistics> css) throws MetaException {
    + ColumnStatisticsObj statsObj = null;
    +
    + // check if all the ColumnStatisticsObjs contain stats and all the ndv are
    + // bitvectors. Only when both of the conditions are true, we merge bit
    + // vectors. Otherwise, just use the maximum function.
    + boolean doAllPartitionContainStats = partNames.size() == css.size();
    + boolean isNDVBitVectorSet = true;
    + String colType = null;
    + for (ColumnStatistics cs : css) {
    + if (cs.getStatsObjSize() != 1) {
    + throw new MetaException(
    + "The number of columns should be exactly one in aggrStats, but found "
    + + cs.getStatsObjSize());
    + }
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + if (statsObj == null) {
    + colType = cso.getColType();
    + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso
    + .getStatsData().getSetField());
    + }
    + if (numBitVectors <= 0 || !cso.getStatsData().getStringStats().isSetBitVectors()
    + || cso.getStatsData().getStringStats().getBitVectors().length() == 0) {
    + isNDVBitVectorSet = false;
    + break;
    + }
    + }
    + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData();
    + if (doAllPartitionContainStats && isNDVBitVectorSet) {
    + StringColumnStatsData aggregateData = null;
    + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
    + for (ColumnStatistics cs : css) {
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + StringColumnStatsData newData = cso.getStatsData().getStringStats();
    + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
    + ndvEstimator.getnumBitVectors()));
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData
    + .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
    + aggregateData
    + .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + }
    + }
            aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues());
    - aggregateData.setBitVectors(ndvEstimator.serialize().toString());
    + columnStatisticsData.setStringStats(aggregateData);
    + } else {
    + StringColumnStatsData aggregateData = null;
    + for (ColumnStatistics cs : css) {
    + ColumnStatisticsObj cso = cs.getStatsObjIterator().next();
    + StringColumnStatsData newData = cso.getStatsData().getStringStats();
    + if (aggregateData == null) {
    + aggregateData = newData.deepCopy();
    + } else {
    + aggregateData
    + .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
    + aggregateData
    + .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
    + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
    + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
    + }
    + }
    + columnStatisticsData.setStringStats(aggregateData);
          }
    + statsObj.setStatsData(columnStatisticsData);
    + return statsObj;
        }
    +
      }

    http://git-wip-us.apache.org/repos/asf/hive/blob/96862093/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
    ----------------------------------------------------------------------
    diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
    index 36c7984..e0c4094 100644
    --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
    +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCacheWithBitVector.java
    @@ -156,10 +156,8 @@ public class TestHBaseAggregateStatsCacheWithBitVector {
              Assert.assertEquals(-20.12, dcsd.getLowValue(), 0.01);
              Assert.assertEquals(60, dcsd.getNumNulls());
              Assert.assertEquals(5, dcsd.getNumDVs());
    - Assert
    - .assertEquals(
    - "{0, 1, 4, 5, 7}{0, 1}{0, 1, 2, 4}{0, 1, 2, 4}{0, 1, 2}{0, 2}{0, 1, 3, 4}{0, 1, 2, 3, 4}{0, 1, 4}{0, 1, 3, 4, 6}{0, 2}{0, 1, 3, 8}{0, 2, 3}{0, 2}{0, 1, 9}{0, 1, 4}",
    - dcsd.getBitVectors());
    + // we do not store the bitvector for the aggrStats.
    + // we can store that if it is necessary in the future.
            }
          };

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 27, '16 at 7:14p
activeMar 27, '16 at 7:14p
posts2
users1
websitehive.apache.org

1 user in discussion

Pxiong: 2 posts

People

Translate

site design / logo © 2021 Grokbase