FAQ
Author: vgumashta
Date: Thu Apr 2 20:59:42 2015
New Revision: 1670966

URL: http://svn.apache.org/r1670966
Log:
HIVE-9693: Introduce a stats cache for aggregate stats in HBase metastore (Vaibhav Gumashta reviewed by Alan Gates)

Added:
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BloomFilter.java
     hive/branches/hbase-metastore/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestAggregateStatsCache.java
     hive/branches/hbase-metastore/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/
     hive/branches/hbase-metastore/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBitVector.java
     hive/branches/hbase-metastore/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBloomFilter.java
Removed:
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java
     hive/branches/hbase-metastore/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestStatsCache.java
Modified:
     hive/branches/hbase-metastore/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
     hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java

Modified: hive/branches/hbase-metastore/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1670966&r1=1670965&r2=1670966&view=diff
==============================================================================
--- hive/branches/hbase-metastore/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/hbase-metastore/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Apr 2 20:59:42 2015
@@ -377,12 +377,27 @@ public class HiveConf extends Configurat
          "Used to avoid all of the proxies and object copies in the metastore. Note, if this is " +
              "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " +
              "undefined and most likely undesired behavior will result"),
- METASTORE_HBASE_CACHE_SIZE("hive.metastore.hbase.cache.size", 100000, "Maximum number of " +
- "objects we will place in the hbase metastore cache. The objects will be divided up by " +
+ METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000, "Maximum number of " +
+ "objects we will place in the hbase metastore catalog cache. The objects will be divided up by " +
          "types that we need to cache."),
- METASTORE_HBASE_CACHE_TIME_TO_LIVE("hive.metastore.hbase.cache.ttl", "600s",
- new TimeValidator(TimeUnit.SECONDS),
- "Number of seconds for stats items to live in the cache"),
+ METASTORE_HBASE_AGGREGATE_STATS_CACHE_SIZE("hive.metastore.hbase.aggregate.stats.cache.size", 10000,
+ "Maximum number of aggregate stats nodes that we will place in the hbase metastore aggregate stats cache."),
+ METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS("hive.metastore.hbase.aggregate.stats.max.partitions", 10000,
+ "Maximum number of partitions that are aggregated per cache node."),
+ METASTORE_HBASE_AGGREGATE_STATS_CACHE_FALSE_POSITIVE_PROBABILITY("hive.metastore.hbase.aggregate.stats.false.positive.probability",
+ (float) 0.01, "Maximum false positive probability for the Bloom Filter used in each aggregate stats cache node (default 1%)."),
+ METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_VARIANCE("hive.metastore.hbase.aggregate.stats.max.variance", (float) 0.1,
+ "Maximum tolerable variance in number of partitions between a cached node and our request (default 10%)."),
+ METASTORE_HBASE_CACHE_TIME_TO_LIVE("hive.metastore.hbase.cache.ttl", "600s", new TimeValidator(TimeUnit.SECONDS),
+ "Number of seconds for a cached node to be active in the cache before they become stale."),
+ METASTORE_HBASE_CACHE_MAX_WRITER_WAIT("hive.metastore.hbase.cache.max.writer.wait", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS),
+ "Number of milliseconds a writer will wait to acquire the writelock before giving up."),
+ METASTORE_HBASE_CACHE_MAX_READER_WAIT("hive.metastore.hbase.cache.max.reader.wait", "1000ms", new TimeValidator(TimeUnit.MILLISECONDS),
+ "Number of milliseconds a reader will wait to acquire the readlock before giving up."),
+ METASTORE_HBASE_CACHE_MAX_FULL("hive.metastore.hbase.cache.max.full", (float) 0.9,
+ "Maximum cache full % after which the cache cleaner thread kicks in."),
+ METASTORE_HBASE_CACHE_CLEAN_UNTIL("hive.metastore.hbase.cache.clean.until", (float) 0.8,
+ "The cleaner thread cleans until cache reaches this % full size."),
      METASTORE_HBASE_CONNECTION_CLASS("hive.metastore.hbase.connection.class",
          "org.apache.hadoop.hive.metastore.hbase.VanillaHBaseConnection",
          "Class used to connection to HBase"),

Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1670966&r1=1670965&r2=1670966&view=diff
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Thu Apr 2 20:59:42 2015
@@ -4272,8 +4272,7 @@ public class HiveMetaStore extends Thrif
      @Override
      public boolean delete_table_column_statistics(String dbName, String tableName, String colName)
        throws NoSuchObjectException, MetaException, InvalidObjectException, TException,
- InvalidInputException
- {
+ InvalidInputException {
        dbName = dbName.toLowerCase();
        tableName = tableName.toLowerCase();

@@ -4290,7 +4289,33 @@ public class HiveMetaStore extends Thrif
          endFunction("delete_column_statistics_by_table: ", ret != false, null, tableName);
        }
        return ret;
- }
+ }
+
+ @Override
+ public AggrStats get_aggr_stats_for(PartitionsStatsRequest request)
+ throws NoSuchObjectException, MetaException, TException {
+ startFunction("get_aggr_stats_for: db=" + request.getDbName() + " table=" + request.getTblName());
+ AggrStats aggrStats = null;
+ try {
+ aggrStats = new AggrStats(getMS().get_aggr_stats_for(request.getDbName(),
+ request.getTblName(), request.getPartNames(), request.getColNames()));
+ return aggrStats;
+ } finally {
+ endFunction("get_partitions_statistics_req: ", aggrStats == null, null, request.getTblName());
+ }
+
+ }
+
+ @Override
+ public boolean set_aggr_stats_for(SetPartitionsStatsRequest request)
+ throws NoSuchObjectException, InvalidObjectException, MetaException, InvalidInputException,
+ TException {
+ boolean ret = true;
+ for (ColumnStatistics colStats : request.getColStats()) {
+ ret = ret && update_partition_column_statistics(colStats);
+ }
+ return ret;
+ }

      @Override
      public List<Partition> get_partitions_by_filter(final String dbName,
@@ -5419,32 +5444,6 @@ public class HiveMetaStore extends Thrif
      }

      @Override
- public AggrStats get_aggr_stats_for(PartitionsStatsRequest request)
- throws NoSuchObjectException, MetaException, TException {
- startFunction("get_aggr_stats_for: db=" + request.getDbName() + " table=" + request.getTblName());
- AggrStats aggrStats = null;
- try {
- aggrStats = new AggrStats(getMS().get_aggr_stats_for(request.getDbName(),
- request.getTblName(), request.getPartNames(), request.getColNames()));
- return aggrStats;
- } finally {
- endFunction("get_partitions_statistics_req: ", aggrStats == null, null, request.getTblName());
- }
-
- }
-
- @Override
- public boolean set_aggr_stats_for(SetPartitionsStatsRequest request)
- throws NoSuchObjectException, InvalidObjectException, MetaException, InvalidInputException,
- TException {
- boolean ret = true;
- for (ColumnStatistics colStats : request.getColStats()) {
- ret = ret && update_partition_column_statistics(colStats);
- }
- return ret;
- }
-
- @Override
      public NotificationEventResponse get_next_notification(NotificationEventRequest rqst)
          throws TException {
        RawStore ms = getMS();

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.hbase.utils.BloomFilter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class AggregateStatsCache {
+
+ private static final Log LOG = LogFactory.getLog(AggregateStatsCache.class.getName());
+ private static AggregateStatsCache self = null;
+
+ // Backing store for this cache
+ private final ConcurrentHashMap<Key, AggrColStatsList> cacheStore;
+ // Cache size
+ private final int maxCacheNodes;
+ // Current nodes in the cache
+ // TODO: Make access threadsafe!!!
+ private int currentNodes = 0;
+ // Run the cleaner thread when the cache is maxFull% full
+ private final float maxFull;
+ // Run the cleaner thread until cache is cleanUntil% occupied
+ private final float cleanUntil;
+ // Nodes go stale after this
+ private final long timeToLive;
+ // Max time when waiting for write locks on node list
+ private final long maxWriterWaitTime;
+ // Max time when waiting for read locks on node list
+ private final long maxReaderWaitTime;
+ // Maximum number of paritions aggregated per cache node
+ private final int maxPartsPerCacheNode;
+ // Bloom filter false positive probability
+ private final float falsePositiveProbability;
+ // Max tolerable variance for matches
+ private final float maxVariance;
+ // Used to determine if cleaner thread is already running
+ private boolean isCleaning = false;
+
+ private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLive,
+ float falsePositiveProbability, float maxVariance, long maxWriterWaitTime,
+ long maxReaderWaitTime, float maxFull, float cleanUntil) {
+ this.maxCacheNodes = maxCacheNodes;
+ this.maxPartsPerCacheNode = maxPartsPerCacheNode;
+ this.timeToLive = timeToLive;
+ this.falsePositiveProbability = falsePositiveProbability;
+ this.maxVariance = maxVariance;
+ this.maxWriterWaitTime = maxWriterWaitTime;
+ this.maxReaderWaitTime = maxReaderWaitTime;
+ this.maxFull = maxFull;
+ this.cleanUntil = cleanUntil;
+ this.cacheStore = new ConcurrentHashMap<Key, AggrColStatsList>();
+ }
+
+ static synchronized AggregateStatsCache getInstance(Configuration conf) {
+ if (self == null) {
+ int maxCacheNodes =
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_SIZE);
+ // The number of partitions aggregated per cache node
+ // If the number of partitions requested is > this value, we'll fetch directly from Metastore
+ int maxPartitionsPerCacheNode =
+ HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS);
+ long timeToLive =
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_TIME_TO_LIVE,
+ TimeUnit.SECONDS);
+ // False positives probability we are ready to tolerate for the underlying bloom filter
+ float falsePositiveProbability =
+ HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_FALSE_POSITIVE_PROBABILITY);
+ // Maximum tolerable variance in number of partitions between cached node and our request
+ float maxVariance =
+ HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_VARIANCE);
+ long maxWriterWaitTime =
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_MAX_WRITER_WAIT,
+ TimeUnit.MILLISECONDS);
+ long maxReaderWaitTime =
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_MAX_READER_WAIT,
+ TimeUnit.MILLISECONDS);
+ float maxFull = HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_MAX_FULL);
+ float cleanUntil =
+ HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_CLEAN_UNTIL);
+ self =
+ new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLive,
+ falsePositiveProbability, maxVariance, maxWriterWaitTime, maxReaderWaitTime, maxFull,
+ cleanUntil);
+ }
+ return self;
+ }
+
+ int getMaxCacheNodes() {
+ return maxCacheNodes;
+ }
+
+ int getCurrentNodes() {
+ return currentNodes;
+ }
+
+ int getMaxPartsPerCacheNode() {
+ return maxPartsPerCacheNode;
+ }
+
+ float getFalsePositiveProbability() {
+ return falsePositiveProbability;
+ }
+
+ /**
+ * Return aggregate stats for a column from the cache or null.
+ * While reading from the nodelist for a key, we wait maxReaderWaitTime to acquire the lock,
+ * failing which we return a cache miss (i.e. null)
+ *
+ * @param dbName
+ * @param tblName
+ * @param colName
+ * @param partNames
+ * @return
+ */
+ AggrColStatsCached get(String dbName, String tblName, String colName, List<String> partNames) {
+ // Cache key
+ Key key = new Key(dbName, tblName, colName);
+ AggrColStatsList candidateList = cacheStore.get(key);
+ // No key, or no nodes in candidate list
+ if ((candidateList == null) || (candidateList.nodes.size() == 0)) {
+ LOG.info("No aggregate stats cached for " + key.toString());
+ return null;
+ }
+ // Find the value object
+ // Update the timestamp of the key,value if value matches the criteria
+ // Return the value
+ AggrColStatsCached match = null;
+ boolean isLocked = false;
+ try {
+ // Try to readlock the candidateList; timeout after maxReaderWaitTime
+ isLocked = candidateList.readLock.tryLock(maxReaderWaitTime, TimeUnit.MILLISECONDS);
+ if (isLocked) {
+ match = findBestMatch(partNames, candidateList.nodes);
+ }
+ if (match != null) {
+ // Ok to not lock the list for this and use a volatile lastAccessTime instead
+ candidateList.updateLastAccessTime();
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(e);
+ match = null;
+ } finally {
+ if (isLocked) {
+ candidateList.readLock.unlock();
+ }
+ }
+ return match;
+ }
+
+ /**
+ * Find the best match using the configurable error tolerance and time to live value
+ *
+ * @param partNames
+ * @param candidates
+ * @return best matched node or null
+ */
+ private AggrColStatsCached findBestMatch(List<String> partNames, List<AggrColStatsCached> candidates) {
+ // Hits, misses, shouldSkip for a node
+ MatchStats matchStats;
+ // MatchStats for each candidate
+ Map<AggrColStatsCached, MatchStats> candidateMatchStats = new HashMap<AggrColStatsCached, MatchStats>();
+ // The final match we intend to return
+ AggrColStatsCached bestMatch = null;
+ // To compare among potentially multiple matches
+ int bestMatchHits = 0;
+ int numPartsRequested = partNames.size();
+ // 1st pass at marking invalid candidates
+ // Checks based on variance and TTL
+ // Note: we're not creating a copy of the list for saving memory
+ for (AggrColStatsCached candidate : candidates) {
+ // Variance check
+ if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested)
+ / numPartsRequested) > maxVariance) {
+ candidateMatchStats.put(candidate, new MatchStats(0, 0, true));
+ }
+ // TTL check
+ if (isExpired(candidate)) {
+ candidateMatchStats.put(candidate, new MatchStats(0, 0, true));
+ }
+ candidateMatchStats.put(candidate, new MatchStats(0, 0, false));
+ }
+ // We'll count misses as we iterate
+ int maxMisses = (int) maxVariance * numPartsRequested;
+ for (String partName : partNames) {
+ for (AggrColStatsCached candidate : candidates) {
+ matchStats = candidateMatchStats.get(candidate);
+ if (matchStats.shouldSkip) {
+ continue;
+ }
+ if (candidate.getBloomFilter().contains(partName.getBytes())) {
+ ++matchStats.hits;
+ } else {
+ ++matchStats.misses;
+ }
+ // 2nd pass at marking invalid candidates
+ // If misses so far exceed max tolerable misses
+ if (matchStats.misses > maxMisses) {
+ matchStats.shouldSkip = true;
+ continue;
+ }
+ // Check if this is the best match so far
+ if (matchStats.hits > bestMatchHits) {
+ bestMatch = candidate;
+ }
+ }
+ }
+ if (bestMatch != null) {
+ // Update the last access time for this node
+ bestMatch.updateLastAccessTime();
+ }
+ return bestMatch;
+ }
+
+ /**
+ * Add a new node to the cache; may trigger the cleaner thread if the cache is near full capacity.
+ * We'll however add the node even if we temporaily exceed maxCacheNodes, because the cleaner
+ * will eventually create space from expired nodes or by removing LRU nodes.
+ *
+ * @param dbName
+ * @param tblName
+ * @param colName
+ * @param numPartsCached
+ * @param colStats
+ * @param bloomFilter
+ */
+ // TODO: make add asynchronous: add shouldn't block the higher level calls
+ void add(String dbName, String tblName, String colName, int numPartsCached,
+ ColumnStatisticsObj colStats, BloomFilter bloomFilter) {
+ // If we have no space in the cache, run cleaner thread
+ if (currentNodes / maxCacheNodes > maxFull) {
+ clean();
+ }
+ // Cache key
+ Key key = new Key(dbName, tblName, colName);
+ // Add new node to the cache
+ AggrColStatsCached node = new AggrColStatsCached(numPartsCached, bloomFilter, colStats);
+ AggrColStatsList nodeList;
+ AggrColStatsList newNodeList = new AggrColStatsList();
+ newNodeList.nodes = new ArrayList<AggrColStatsCached>();
+ nodeList = cacheStore.putIfAbsent(key, newNodeList);
+ if (nodeList == null) {
+ nodeList = newNodeList;
+ }
+ boolean isLocked = false;
+ try {
+ isLocked = nodeList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
+ if (isLocked) {
+ nodeList.nodes.add(node);
+ node.updateLastAccessTime();
+ nodeList.updateLastAccessTime();
+ ++currentNodes;
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(e);
+ } finally {
+ if (isLocked) {
+ nodeList.writeLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Cleans the expired nodes or removes LRU nodes of the cache,
+ * until the cache size reduces to cleanUntil% full.
+ */
+ private void clean() {
+ // This spawns a separate thread to walk through the cache and removes expired nodes.
+ // Only one cleaner thread should be running at any point.
+ synchronized (this) {
+ if (isCleaning) {
+ return;
+ }
+ isCleaning = true;
+ }
+ Thread cleaner = new Thread() {
+ @Override
+ public void run() {
+ Iterator<Map.Entry<Key, AggrColStatsList>> mapIterator = cacheStore.entrySet().iterator();
+ while (mapIterator.hasNext()) {
+ Map.Entry<Key, AggrColStatsList> pair =
+ (Map.Entry<Key, AggrColStatsList>) mapIterator.next();
+ AggrColStatsCached node;
+ AggrColStatsList candidateList = (AggrColStatsList) pair.getValue();
+ List<AggrColStatsCached> nodes = candidateList.nodes;
+ if (nodes.size() == 0) {
+ mapIterator.remove();
+ continue;
+ }
+ boolean isLocked = false;
+ try {
+ isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
+ if (isLocked) {
+ for (Iterator<AggrColStatsCached> listIterator = nodes.iterator(); listIterator.hasNext();) {
+ node = listIterator.next();
+ // Remove the node if it has expired
+ if (isExpired(node)) {
+ listIterator.remove();
+ --currentNodes;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(e);
+ } finally {
+ if (isLocked) {
+ candidateList.writeLock.unlock();
+ }
+ }
+ // We want to make sure this runs at a low priority in the background
+ Thread.yield();
+ }
+ // If the expired nodes did not result in cache being cleanUntil% in size,
+ // start removing LRU nodes
+ while (currentNodes / maxCacheNodes > cleanUntil) {
+ evictOneNode();
+ }
+ }
+ };
+ cleaner.setPriority(Thread.MIN_PRIORITY);
+ cleaner.setDaemon(true);
+ cleaner.start();
+ }
+
+ /**
+ * Evict an LRU node or expired node whichever we find first
+ */
+ private void evictOneNode() {
+ // Get the LRU key, value
+ Key lruKey = null;
+ AggrColStatsList lruValue = null;
+ for (Map.Entry<Key, AggrColStatsList> entry : cacheStore.entrySet()) {
+ Key key = entry.getKey();
+ AggrColStatsList value = entry.getValue();
+ if (lruKey == null) {
+ lruKey = key;
+ lruValue = value;
+ continue;
+ }
+ if ((value.lastAccessTime < lruValue.lastAccessTime) && !(value.nodes.isEmpty())) {
+ lruKey = key;
+ lruValue = value;
+ }
+ }
+ // Now delete a node for this key's list
+ AggrColStatsList candidateList = cacheStore.get(lruKey);
+ boolean isLocked = false;
+ try {
+ isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
+ if (isLocked) {
+ AggrColStatsCached candidate;
+ AggrColStatsCached lruNode = null;
+ int currentIndex = 0;
+ int deleteIndex = 0;
+ for (Iterator<AggrColStatsCached> iterator = candidateList.nodes.iterator(); iterator.hasNext();) {
+ candidate = iterator.next();
+ // Since we have to create space for 1, if we find an expired node we will remove it &
+ // return
+ if (isExpired(candidate)) {
+ iterator.remove();
+ --currentNodes;
+ return;
+ }
+ // Sorry, too many ifs but this form looks optimal
+ // Update the LRU node from what we've seen so far
+ if (lruNode == null) {
+ lruNode = candidate;
+ ++currentIndex;
+ continue;
+ }
+ if (lruNode != null) {
+ if (candidate.lastAccessTime < lruNode.lastAccessTime) {
+ lruNode = candidate;
+ deleteIndex = currentIndex;
+ }
+ }
+ }
+ candidateList.nodes.remove(deleteIndex);
+ --currentNodes;
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(e);
+ } finally {
+ if (isLocked) {
+ candidateList.writeLock.unlock();
+ }
+ }
+ }
+
+ // TODO: store and print metrics
+ void printMetrics() {
+
+ }
+
+ private boolean isExpired(AggrColStatsCached aggrColStats) {
+ return System.currentTimeMillis() - aggrColStats.lastAccessTime > timeToLive;
+ }
+
+ /**
+ * Key object for the stats cache hashtable
+ */
+ private static class Key {
+ private final String dbName;
+ private final String tblName;
+ private final String colName;
+
+ Key(String db, String table, String col) {
+ // Don't construct an illegal cache key
+ if ((db == null) || (table == null) || (col == null)) {
+ throw new IllegalArgumentException("dbName, tblName, colName can't be null");
+ }
+ dbName = db;
+ tblName = table;
+ colName = col;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if ((other == null) || !(other instanceof Key)) {
+ return false;
+ }
+ Key that = (Key) other;
+ return dbName.equals(that.dbName) && tblName.equals(that.tblName)
+ && colName.equals(that.colName);
+ }
+
+ @Override
+ public int hashCode() {
+ return dbName.hashCode() * 31 + tblName.hashCode() * 31 + colName.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "Database: " + dbName + ", Table: " + tblName + ", Column: " + colName;
+ }
+
+ }
+
+ static class AggrColStatsList {
+ // TODO: figure out a better data structure for node list(?)
+ private List<AggrColStatsCached> nodes = new ArrayList<AggrColStatsCached>();
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+ // Read lock for get operation
+ private Lock readLock = lock.readLock();
+ // Write lock for add, evict and clean operation
+ private Lock writeLock = lock.writeLock();
+ // Using volatile instead of locking updates to this variable,
+ // since we can rely on approx lastAccessTime but don't want a performance hit
+ private volatile long lastAccessTime = 0;
+
+ List<AggrColStatsCached> getNodes() {
+ return nodes;
+ }
+
+ void updateLastAccessTime() {
+ this.lastAccessTime = System.currentTimeMillis();
+ }
+ }
+
+ static class AggrColStatsCached {
+ private final int numPartsCached;
+ private final BloomFilter bloomFilter;
+ private final ColumnStatisticsObj colStats;
+ private volatile long lastAccessTime;
+
+ AggrColStatsCached(int numPartsCached, BloomFilter bloomFilter,
+ ColumnStatisticsObj colStats) {
+ this.numPartsCached = numPartsCached;
+ this.bloomFilter = bloomFilter;
+ this.colStats = colStats;
+ this.lastAccessTime = System.currentTimeMillis();
+ }
+
+ int getNumPartsCached() {
+ return numPartsCached;
+ }
+
+ ColumnStatisticsObj getColStats() {
+ updateLastAccessTime();
+ return colStats;
+ }
+
+ BloomFilter getBloomFilter() {
+ return bloomFilter;
+ }
+
+ void updateLastAccessTime() {
+ this.lastAccessTime = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * TODO: capture some metrics for the cache
+ */
+ class Metrics {
+
+ }
+
+ /**
+ * Intermediate object, used to collect hits & misses for each cache node that is evaluate for an
+ * incoming request
+ */
+ private static class MatchStats {
+ private int hits = 0;
+ private int misses = 0;
+ private boolean shouldSkip = false;
+
+ MatchStats(int hits, int misses, boolean shouldSkip) {
+ this.hits = hits;
+ this.misses = misses;
+ this.shouldSkip = shouldSkip;
+ }
+ }
+
+ /**
+ * TODO: implement memory management for the cache
+ */
+ static class MemoryManager {
+
+ }
+}

Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java?rev=1670966&r1=1670965&r2=1670966&view=diff
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java (original)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java Thu Apr 2 20:59:42 2015
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.filter.Re
  import org.apache.hadoop.hbase.filter.RowFilter;
  import org.apache.hadoop.hive.common.ObjectPair;
  import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
  import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
  import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
  import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -51,6 +52,10 @@ import org.apache.hadoop.hive.metastore.
  import org.apache.hadoop.hive.metastore.api.Role;
  import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
  import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.hbase.AggregateStatsCache.AggrColStatsCached;
+import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregator;
+import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregatorFactory;
+import org.apache.hadoop.hive.metastore.hbase.utils.BloomFilter;

  import java.io.IOException;
  import java.security.MessageDigest;
@@ -114,17 +119,17 @@ class HBaseReadWrite {
    private ObjectCache<ObjectPair<String, String>, Table> tableCache;
    private ObjectCache<ByteArrayWrapper, StorageDescriptor> sdCache;
    private PartitionCache partCache;
- private StatsCache statsCache;
- private final Counter tableHits;
- private final Counter tableMisses;
- private final Counter tableOverflows;
- private final Counter partHits;
- private final Counter partMisses;
- private final Counter partOverflows;
- private final Counter sdHits;
- private final Counter sdMisses;
- private final Counter sdOverflows;
- private final List<Counter> counters;
+ private AggregateStatsCache aggrStatsCache;
+ private Counter tableHits;
+ private Counter tableMisses;
+ private Counter tableOverflows;
+ private Counter partHits;
+ private Counter partMisses;
+ private Counter partOverflows;
+ private Counter sdHits;
+ private Counter sdMisses;
+ private Counter sdOverflows;
+ private List<Counter> counters;
    // roleCache doesn't use ObjectCache because I don't want to limit the size. I am assuming
    // that the number of roles will always be small (< 100) so caching the whole thing should not
    // be painful.
@@ -182,8 +187,8 @@ class HBaseReadWrite {
      } catch (NoSuchAlgorithmException e) {
        throw new RuntimeException(e);
      }
- int totalObjectsToCache =
- HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_SIZE);
+ int totalCatalogObjectsToCache =
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CATALOG_CACHE_SIZE);

      tableHits = new Counter("table cache hits");
      tableMisses = new Counter("table cache misses");
@@ -205,24 +210,21 @@ class HBaseReadWrite {
      counters.add(sdMisses);
      counters.add(sdOverflows);

- // Divide 50/50 between catalog and stats, then give 1% of catalog space to storage
- // descriptors (storage descriptors are shared, so 99% should be the same for a
- // given table).
- int sdsCacheSize = totalObjectsToCache / 100;
+ // Give 1% of catalog cache space to storage descriptors
+ // (storage descriptors are shared, so 99% should be the same for a given table)
+ int sdsCacheSize = totalCatalogObjectsToCache / 100;
      if (conf.getBoolean(NO_CACHE_CONF, false)) {
        tableCache = new BogusObjectCache<ObjectPair<String, String>, Table>();
        sdCache = new BogusObjectCache<ByteArrayWrapper, StorageDescriptor>();
        partCache = new BogusPartitionCache();
- statsCache = StatsCache.getBogusStatsCache();
      } else {
        tableCache = new ObjectCache<ObjectPair<String, String>, Table>(TABLES_TO_CACHE, tableHits,
            tableMisses, tableOverflows);
        sdCache = new ObjectCache<ByteArrayWrapper, StorageDescriptor>(sdsCacheSize, sdHits,
            sdMisses, sdOverflows);
- partCache = new PartitionCache(totalObjectsToCache / 2, partHits, partMisses, partOverflows);
- statsCache = StatsCache.getInstance(conf);
+ partCache = new PartitionCache(totalCatalogObjectsToCache, partHits, partMisses, partOverflows);
+ aggrStatsCache = AggregateStatsCache.getInstance(conf);
      }
-
      roleCache = new HashMap<String, HbaseMetastoreProto.RoleGrantInfoList>();
      entireRoleTableInCache = false;
    }
@@ -1416,195 +1418,242 @@ class HBaseReadWrite {

    /**
     * Update statistics for one or more columns for a table or a partition.
+ *
     * @param dbName database the table is in
     * @param tableName table to update statistics for
     * @param partName name of the partition, can be null if these are table level statistics.
- * @param partVals partition values that define partition to update statistics for. If this is
- * null, then these will be assumed to be table level statistics.
- * @param stats Stats object with stats for one or more columns.
+ * @param partVals partition values that define partition to update statistics for. If this is
+ * null, then these will be assumed to be table level statistics
+ * @param stats Stats object with stats for one or more columns
     * @throws IOException
     */
    void updateStatistics(String dbName, String tableName, String partName, List<String> partVals,
- ColumnStatistics stats) throws IOException {
+ ColumnStatistics stats) throws IOException {
      byte[] key = getStatisticsKey(dbName, tableName, partVals);
      String hbaseTable = getStatisticsTable(partVals);
-
      byte[][] colnames = new byte[stats.getStatsObjSize()][];
- byte[][] serializeds = new byte[stats.getStatsObjSize()][];
- for (int i = 0; i < stats.getStatsObjSize(); i++) {
- ColumnStatisticsObj obj = stats.getStatsObj().get(i);
- serializeds[i] = HBaseUtils.serializeStatsForOneColumn(stats, obj);
- String colname = obj.getColName();
- colnames[i] = HBaseUtils.buildKey(colname);
- statsCache.put(dbName, tableName, partName, colname, obj,
- stats.getStatsDesc().getLastAnalyzed());
- }
- store(hbaseTable, key, STATS_CF, colnames, serializeds);
+ byte[][] serialized = new byte[stats.getStatsObjSize()][];
+ store(hbaseTable, key, STATS_CF, colnames, serialized);
    }

    /**
- * Get Statistics for a table
+ * Get statistics for a table
+ *
     * @param dbName name of database table is in
     * @param tableName name of table
     * @param colNames list of column names to get statistics for
     * @return column statistics for indicated table
     * @throws IOException
     */
- ColumnStatistics getTableStatistics(String dbName, String tableName, List<String> colNames)
+ ColumnStatistics getTableStatistics(String dbName, String tblName, List<String> colNames)
        throws IOException {
- byte[] key = HBaseUtils.buildKey(dbName, tableName);
- ColumnStatistics stats = new ColumnStatistics();
- ColumnStatisticsDesc desc = new ColumnStatisticsDesc();
- desc.setIsTblLevel(true);
- desc.setDbName(dbName);
- desc.setTableName(tableName);
- stats.setStatsDesc(desc);
-
- // First we have to go through and see what's in the cache and fetch what we can from there.
- // Then we'll fetch the rest from HBase
- List<String> stillLookingFor = new ArrayList<String>();
- for (int i = 0; i < colNames.size(); i++) {
- StatsCache.StatsInfo info =
- statsCache.getTableStatistics(dbName, tableName, colNames.get(i));
- if (info == null) {
- stillLookingFor.add(colNames.get(i));
- } else {
- info.stats.setColName(colNames.get(i));
- stats.addToStatsObj(info.stats);
- stats.getStatsDesc().setLastAnalyzed(Math.max(stats.getStatsDesc().getLastAnalyzed(),
- info.lastAnalyzed));
- }
- }
- if (stillLookingFor.size() == 0) return stats;
-
- byte[][] colKeys = new byte[stillLookingFor.size()][];
+ byte[] tabKey = HBaseUtils.buildKey(dbName, tblName);
+ ColumnStatistics tableStats = new ColumnStatistics();
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setIsTblLevel(true);
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ tableStats.setStatsDesc(statsDesc);
+ byte[][] colKeys = new byte[colNames.size()][];
      for (int i = 0; i < colKeys.length; i++) {
- colKeys[i] = HBaseUtils.buildKey(stillLookingFor.get(i));
+ colKeys[i] = HBaseUtils.buildKey(colNames.get(i));
      }
- Result res = read(TABLE_TABLE, key, STATS_CF, colKeys);
+ Result result = read(TABLE_TABLE, tabKey, STATS_CF, colKeys);
      for (int i = 0; i < colKeys.length; i++) {
- byte[] serialized = res.getValue(STATS_CF, colKeys[i]);
- if (serialized == null) {
+ byte[] serializedColStats = result.getValue(STATS_CF, colKeys[i]);
+ if (serializedColStats == null) {
          // There were no stats for this column, so skip it
          continue;
        }
- ColumnStatisticsObj obj = HBaseUtils.deserializeStatsForOneColumn(stats, serialized);
- statsCache.put(dbName, tableName, null, stillLookingFor.get(i), obj,
- stats.getStatsDesc().getLastAnalyzed());
- obj.setColName(stillLookingFor.get(i));
- stats.addToStatsObj(obj);
+ ColumnStatisticsObj obj =
+ HBaseUtils.deserializeStatsForOneColumn(tableStats, serializedColStats);
+ obj.setColName(colNames.get(i));
+ tableStats.addToStatsObj(obj);
      }
- return stats;
+ return tableStats;
    }

    /**
     * Get statistics for a set of partitions
+ *
     * @param dbName name of database table is in
     * @param tableName table partitions are in
     * @param partNames names of the partitions, used only to set values inside the return stats
- * objects.
- * @param partVals partition values for each partition, needed because this class doesn't know
- * how to translate from partName to partVals
- * @param colNames column names to fetch stats for. These columns will be fetched for all
- * requested partitions.
- * @return list of ColumnStats, one for each partition. The values will be in the same order
- * as the partNames list that was passed in.
+ * objects
+ * @param partVals partition values for each partition, needed because this class doesn't know how
+ * to translate from partName to partVals
+ * @param colNames column names to fetch stats for. These columns will be fetched for all
+ * requested partitions
+ * @return list of ColumnStats, one for each partition. The values will be in the same order as
+ * the partNames list that was passed in
     * @throws IOException
     */
- List<ColumnStatistics> getPartitionStatistics(String dbName, String tableName,
- List<String> partNames,
- List<List<String>> partVals,
- List<String> colNames) throws IOException {
- // Go through the cache first, see what we can fetch from there. This is complicated because
- // we may have different columns for different partitions
+ List<ColumnStatistics> getPartitionStatistics(String dbName, String tblName,
+ List<String> partNames, List<List<String>> partVals, List<String> colNames)
+ throws IOException {
      List<ColumnStatistics> statsList = new ArrayList<ColumnStatistics>(partNames.size());
- List<PartStatsInfo> stillLookingFor = new ArrayList<PartStatsInfo>();
- for (int pOff = 0; pOff < partVals.size(); pOff++) {
- // Add an entry for this partition in the list
- ColumnStatistics stats = new ColumnStatistics();
- ColumnStatisticsDesc desc = new ColumnStatisticsDesc();
- desc.setIsTblLevel(false);
- desc.setDbName(dbName);
- desc.setTableName(tableName);
- desc.setPartName(partNames.get(pOff));
- stats.setStatsDesc(desc);
- statsList.add(stats);
- PartStatsInfo missing = null;
-
- for (int cOff = 0; cOff < colNames.size(); cOff++) {
- StatsCache.StatsInfo info = statsCache.getPartitionStatistics(dbName, tableName,
- partNames.get(pOff), colNames.get(cOff));
- if (info == null) {
- if (missing == null) {
- // We haven't started an entry for this one yet
- missing = new PartStatsInfo(stats, partVals.get(pOff), partNames.get(pOff));
- stillLookingFor.add(missing);
- }
- missing.colNames.add(colNames.get(cOff));
- } else {
- info.stats.setColName(colNames.get(cOff));
- stats.addToStatsObj(info.stats);
- stats.getStatsDesc().setLastAnalyzed(Math.max(stats.getStatsDesc().getLastAnalyzed(),
- info.lastAnalyzed));
- }
- }
- }
- if (stillLookingFor.size() == 0) return statsList;
-
- // Build the list of gets. It may be different for each partition now depending on what we
- // found in the cache.
+ ColumnStatistics partitionStats;
+ ColumnStatisticsDesc statsDesc;
+ byte[][] colKeys = new byte[colNames.size()][];
      List<Get> gets = new ArrayList<Get>();
- for (PartStatsInfo pi : stillLookingFor) {
- byte[][] colKeys = new byte[pi.colNames.size()][];
+ // Initialize the list and build the Gets
+ for (int pOff = 0; pOff < partNames.size(); pOff++) {
+ // Add an entry for this partition in the stats list
+ partitionStats = new ColumnStatistics();
+ statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setIsTblLevel(false);
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ statsDesc.setPartName(partNames.get(pOff));
+ partitionStats.setStatsDesc(statsDesc);
+ statsList.add(partitionStats);
+ // Build the list of Gets
        for (int i = 0; i < colKeys.length; i++) {
- colKeys[i] = HBaseUtils.buildKey(pi.colNames.get(i));
+ colKeys[i] = HBaseUtils.buildKey(colNames.get(i));
        }
- pi.colKeys = colKeys;
-
- byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, pi.partVals);
- Get g = new Get(key);
- for (byte[] colName : colKeys) g.addColumn(STATS_CF, colName);
- gets.add(g);
+ byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(pOff));
+ Get get = new Get(partKey);
+ for (byte[] colName : colKeys) {
+ get.addColumn(STATS_CF, colName);
+ }
+ gets.add(get);
      }
+
      HTableInterface htab = conn.getHBaseTable(PART_TABLE);
+ // Get results from HBase
      Result[] results = htab.get(gets);
-
+ // Deserialize the stats objects and add to stats list
      for (int pOff = 0; pOff < results.length; pOff++) {
- PartStatsInfo pi = stillLookingFor.get(pOff);
- for (int cOff = 0; cOff < pi.colNames.size(); cOff++) {
- byte[] serialized = results[pOff].getValue(STATS_CF, pi.colKeys[cOff]);
- if (serialized == null) {
+ for (int cOff = 0; cOff < colNames.size(); cOff++) {
+ byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKeys[cOff]);
+ if (serializedColStats == null) {
            // There were no stats for this column, so skip it
            continue;
          }
- ColumnStatisticsObj obj = HBaseUtils.deserializeStatsForOneColumn(pi.stats, serialized);
- statsCache.put(dbName, tableName, pi.partName, pi.colNames.get(cOff), obj,
- pi.stats.getStatsDesc().getLastAnalyzed());
- obj.setColName(pi.colNames.get(cOff));
- pi.stats.addToStatsObj(obj);
+ partitionStats = statsList.get(pOff);
+ ColumnStatisticsObj colStats =
+ HBaseUtils.deserializeStatsForOneColumn(partitionStats, serializedColStats);
+ colStats.setColName(colNames.get(cOff));
+ partitionStats.addToStatsObj(colStats);
        }
      }
      return statsList;
    }

- private static class PartStatsInfo {
- ColumnStatistics stats;
- String partName;
- List<String> colNames;
- List<String> partVals;
- byte[][] colKeys;
-
- PartStatsInfo(ColumnStatistics s, List<String> pv, String pn) {
- stats = s; partVals = pv; partName = pn;
- colNames = new ArrayList<String>();
- colKeys = null;
+ /**
+ * Get aggregate stats for a column from the DB and populate the bloom filter if it's not null
+ * @param dbName
+ * @param tblName
+ * @param partNames
+ * @param partVals
+ * @param colNames
+ * @return
+ * @throws IOException
+ */
+ AggrStats getAggrStats(String dbName, String tblName, List<String> partNames,
+ List<List<String>> partVals, List<String> colNames) throws IOException {
+ // One ColumnStatisticsObj per column
+ List<ColumnStatisticsObj> colStatsList = new ArrayList<ColumnStatisticsObj>();
+ AggrColStatsCached colStatsAggrCached;
+ ColumnStatisticsObj colStatsAggr;
+ int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
+ float falsePositiveProbability = aggrStatsCache.getFalsePositiveProbability();
+ int partitionsRequested = partNames.size();
+ // TODO: Steal extrapolation logic from current MetaStoreDirectSql code
+ // Right now doing nothing and keeping partitionsFound == partitionsRequested
+ int partitionsFound = partitionsRequested;
+ for (String colName : colNames) {
+ if (partitionsRequested > maxPartitionsPerCacheNode) {
+ // Read from HBase but don't add to cache since it doesn't qualify the criteria
+ colStatsAggr = getAggrStatsFromDB(dbName, tblName, colName, partNames, partVals, null);
+ colStatsList.add(colStatsAggr);
+ } else {
+ // Check the cache first
+ colStatsAggrCached = aggrStatsCache.get(dbName, tblName, colName, partNames);
+ if (colStatsAggrCached != null) {
+ colStatsList.add(colStatsAggrCached.getColStats());
+ } else {
+ // Bloom filter for the new node that we will eventually add to the cache
+ BloomFilter bloomFilter =
+ new BloomFilter(maxPartitionsPerCacheNode, falsePositiveProbability);
+ colStatsAggr =
+ getAggrStatsFromDB(dbName, tblName, colName, partNames, partVals, bloomFilter);
+ colStatsList.add(colStatsAggr);
+ // Update the cache to add this new aggregate node
+ aggrStatsCache.add(dbName, tblName, colName, partitionsFound, colStatsAggr, bloomFilter);
+ }
+ }
+ }
+ return new AggrStats(colStatsList, partitionsFound);
+ }
+
+ /**
+ *
+ * @param dbName
+ * @param tblName
+ * @param partNames
+ * @param partVals
+ * @param colName
+ * @param bloomFilter
+ * @return
+ */
+ private ColumnStatisticsObj getAggrStatsFromDB(String dbName, String tblName, String colName,
+ List<String> partNames, List<List<String>> partVals, BloomFilter bloomFilter)
+ throws IOException {
+ ColumnStatisticsObj colStatsAggr = new ColumnStatisticsObj();
+ boolean colStatsAggrInited = false;
+ ColumnStatsAggregator colStatsAggregator = null;
+ List<Get> gets = new ArrayList<Get>();
+ byte[] colKey = HBaseUtils.buildKey(colName);
+ // Build a list of Gets, one per partition
+ for (int pOff = 0; pOff < partNames.size(); pOff++) {
+ byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(pOff));
+ Get get = new Get(partKey);
+ get.addColumn(STATS_CF, colKey);
+ gets.add(get);
+ }
+ HTableInterface htab = conn.getHBaseTable(PART_TABLE);
+ // Get results from HBase
+ Result[] results = htab.get(gets);
+ // Iterate through the results
+ // The results size and order is the same as the number and order of the Gets
+ // If the column is not present in a partition, the Result object will be empty
+ for (int pOff = 0; pOff < partNames.size(); pOff++) {
+ if (results[pOff].isEmpty()) {
+ // There were no stats for this column, so skip it
+ continue;
+ }
+ byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKey);
+ if (serializedColStats == null) {
+ // There were no stats for this column, so skip it
+ continue;
+ }
+ ColumnStatisticsObj colStats =
+ HBaseUtils.deserializeStatsForOneColumn(null, serializedColStats);
+ if (!colStatsAggrInited) {
+ // This is the 1st column stats object we got
+ colStatsAggr.setColName(colName);
+ colStatsAggr.setColType(colStats.getColType());
+ colStatsAggr.setStatsData(colStats.getStatsData());
+ colStatsAggregator =
+ ColumnStatsAggregatorFactory.getColumnStatsAggregator(colStats.getStatsData()
+ .getSetField());
+ colStatsAggrInited = true;
+ } else {
+ // Perform aggregation with whatever we've already aggregated
+ colStatsAggregator.aggregate(colStatsAggr, colStats);
+ }
+ // Add partition to the bloom filter if it's requested
+ if (bloomFilter != null) {
+ bloomFilter.addToFilter(partNames.get(pOff).getBytes());
+ }
      }
+ return colStatsAggr;
    }

    private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) {
- return partVals == null ?
- HBaseUtils.buildKey(dbName, tableName) :
- HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+ return partVals == null ? HBaseUtils.buildKey(dbName, tableName) : HBaseUtils
+ .buildPartitionKey(dbName, tableName, partVals);
    }

    private String getStatisticsTable(List<String> partVals) {

Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java?rev=1670966&r1=1670965&r2=1670966&view=diff
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java (original)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java Thu Apr 2 20:59:42 2015
@@ -62,6 +62,7 @@ import org.apache.hadoop.hive.metastore.
  import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult;
  import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan;
  import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
+import org.apache.hadoop.hive.metastore.hbase.AggregateStatsCache.AggrColStatsCached;
  import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
  import org.apache.thrift.TException;

@@ -1228,8 +1229,8 @@ public class HBaseStore implements RawSt
    }

    @Override
- public boolean updateTableColumnStatistics(ColumnStatistics colStats) throws
- NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ public boolean updateTableColumnStatistics(ColumnStatistics colStats)
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
      try {
        getHBase().updateStatistics(colStats.getStatsDesc().getDbName(),
            colStats.getStatsDesc().getTableName(), null, null, colStats);
@@ -1241,13 +1242,12 @@ public class HBaseStore implements RawSt
    }

    @Override
- public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj,
- List<String> partVals) throws
- NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, List<String> partVals)
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
      try {
        getHBase().updateStatistics(statsObj.getStatsDesc().getDbName(),
- statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(),
- partVals, statsObj);
+ statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(), partVals,
+ statsObj);
        return true;
      } catch (IOException e) {
        LOG.error("Unable to update column statistics", e);
@@ -1257,8 +1257,7 @@ public class HBaseStore implements RawSt

    @Override
    public ColumnStatistics getTableColumnStatistics(String dbName, String tableName,
- List<String> colName) throws MetaException,
- NoSuchObjectException {
+ List<String> colName) throws MetaException, NoSuchObjectException {
      try {
        return getHBase().getTableStatistics(dbName, tableName, colName);
      } catch (IOException e) {
@@ -1269,11 +1268,11 @@ public class HBaseStore implements RawSt

    @Override
    public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, String tblName,
- List<String> partNames,
- List<String> colNames) throws
- MetaException, NoSuchObjectException {
+ List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
      List<List<String>> partVals = new ArrayList<List<String>>(partNames.size());
- for (String partName : partNames) partVals.add(partNameToVals(partName));
+ for (String partName : partNames) {
+ partVals.add(partNameToVals(partName));
+ }
      try {
        return getHBase().getPartitionStatistics(dbName, tblName, partNames, partVals, colNames);
      } catch (IOException e) {
@@ -1284,8 +1283,8 @@ public class HBaseStore implements RawSt

    @Override
    public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName,
- List<String> partVals, String colName) throws
- NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ List<String> partVals, String colName) throws NoSuchObjectException, MetaException,
+ InvalidObjectException, InvalidInputException {
      // NOP, stats will be deleted along with the partition when it is dropped.
      return true;
    }
@@ -1297,6 +1296,26 @@ public class HBaseStore implements RawSt
      return true;
    }

+ /**
+ * Return aggregated statistics for each column in the colNames list aggregated over partitions in
+ * the partNames list
+ *
+ */
+ @Override
+ public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames,
+ List<String> colNames) throws MetaException, NoSuchObjectException {
+ List<List<String>> partVals = new ArrayList<List<String>>(partNames.size());
+ for (String partName : partNames) {
+ partVals.add(partNameToVals(partName));
+ }
+ try {
+ return getHBase().getAggrStats(dbName, tblName, partNames, partVals, colNames);
+ } catch (IOException e) {
+ LOG.error("Unable to fetch aggregate column statistics", e);
+ throw new MetaException("Failed fetching aggregate column statistics, " + e.getMessage());
+ }
+ }
+
    @Override
    public long cleanupEvents() {
      throw new UnsupportedOperationException();
@@ -1629,13 +1648,6 @@ public class HBaseStore implements RawSt
    }

    @Override
- public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames,
- List<String> colNames) throws MetaException,
- NoSuchObjectException {
- throw new UnsupportedOperationException();
- }
-
- @Override
    public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
      throw new UnsupportedOperationException();
    }

Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java?rev=1670966&r1=1670965&r2=1670966&view=diff
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java (original)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java Thu Apr 2 20:59:42 2015
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.
  import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
  import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
  import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
  import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
  import org.apache.hadoop.hive.metastore.api.Database;
  import org.apache.hadoop.hive.metastore.api.Decimal;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hive.metastore.
  import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
  import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
  import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TFieldIdEnum;

  import java.io.IOException;
  import java.nio.charset.Charset;
@@ -900,15 +902,15 @@ class HBaseUtils {
      return sdParts;
    }

- static byte[] serializeStatsForOneColumn(ColumnStatistics stats, ColumnStatisticsObj obj)
+ static byte[] serializeStatsForOneColumn(ColumnStatistics partitionColumnStats, ColumnStatisticsObj colStats)
        throws IOException {
      HbaseMetastoreProto.ColumnStats.Builder builder = HbaseMetastoreProto.ColumnStats.newBuilder();
- builder.setLastAnalyzed(stats.getStatsDesc().getLastAnalyzed());
- if (obj.getColType() == null) {
+ builder.setLastAnalyzed(partitionColumnStats.getStatsDesc().getLastAnalyzed());
+ if (colStats.getColType() == null) {
        throw new RuntimeException("Column type must be set");
      }
- builder.setColumnType(obj.getColType());
- ColumnStatisticsData colData = obj.getStatsData();
+ builder.setColumnType(colStats.getColType());
+ ColumnStatisticsData colData = colStats.getStatsData();
      switch (colData.getSetField()) {
        case BOOLEAN_STATS:
          BooleanColumnStatsData boolData = colData.getBooleanStats();
@@ -988,14 +990,16 @@ class HBaseUtils {
      return builder.build().toByteArray();
    }

- static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics stats,
- byte[] bytes) throws IOException {
+ static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics partitionColumnStats,
+ byte[] bytes) throws IOException {
      HbaseMetastoreProto.ColumnStats proto = HbaseMetastoreProto.ColumnStats.parseFrom(bytes);
- ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ ColumnStatisticsObj colStats = new ColumnStatisticsObj();
      long lastAnalyzed = proto.getLastAnalyzed();
- stats.getStatsDesc().setLastAnalyzed(
- Math.max(lastAnalyzed, stats.getStatsDesc().getLastAnalyzed()));
- obj.setColType(proto.getColumnType());
+ if (partitionColumnStats != null) {
+ partitionColumnStats.getStatsDesc().setLastAnalyzed(
+ Math.max(lastAnalyzed, partitionColumnStats.getStatsDesc().getLastAnalyzed()));
+ }
+ colStats.setColType(proto.getColumnType());

      ColumnStatisticsData colData = new ColumnStatisticsData();
      if (proto.hasBoolStats()) {
@@ -1059,9 +1063,8 @@ class HBaseUtils {
      } else {
        throw new RuntimeException("Woh, bad. Unknown stats type!");
      }
-
- obj.setStatsData(colData);
- return obj;
+ colStats.setStatsData(colData);
+ return colStats;
    }

    /**
@@ -1078,5 +1081,4 @@ class HBaseUtils {
      keyEnd[keyEnd.length - 1]++;
      return keyEnd;
    }
-
  }

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+
+public class BinaryColumnStatsAggregator implements ColumnStatsAggregator{
+
+ @Override
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats();
+ BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats();
+ aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+ aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+
+public class BooleanColumnStatsAggregator implements ColumnStatsAggregator {
+
+ @Override
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats();
+ BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats();
+ aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues());
+ aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses());
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ }
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+
+public interface ColumnStatsAggregator {
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats);
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
+
+public class ColumnStatsAggregatorFactory {
+
+ private ColumnStatsAggregatorFactory() {
+ }
+
+ public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type) {
+ switch (type) {
+ case BOOLEAN_STATS:
+ return new BooleanColumnStatsAggregator();
+ case LONG_STATS:
+ return new LongColumnStatsAggregator();
+ case DOUBLE_STATS:
+ return new DoubleColumnStatsAggregator();
+ case STRING_STATS:
+ return new StringColumnStatsAggregator();
+ case BINARY_STATS:
+ return new BinaryColumnStatsAggregator();
+ case DECIMAL_STATS:
+ return new DecimalColumnStatsAggregator();
+ default:
+ throw new RuntimeException("Woh, bad. Unknown stats type!");
+ }
+ }
+
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+
+public class DecimalColumnStatsAggregator implements ColumnStatsAggregator {
+
+ @Override
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats();
+ DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats();
+ Decimal lowValue =
+ (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData
+ .getLowValue() : newData.getLowValue();
+ aggregateData.setLowValue(lowValue);
+ Decimal highValue =
+ (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData
+ .getHighValue() : newData.getHighValue();
+ aggregateData.setHighValue(highValue);
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+
+public class DoubleColumnStatsAggregator implements ColumnStatsAggregator {
+
+ @Override
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats();
+ DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats();
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+
+public class LongColumnStatsAggregator implements ColumnStatsAggregator {
+
+ @Override
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats();
+ LongColumnStatsData newData = newColStats.getStatsData().getLongStats();
+ aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue()));
+ aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+
+public class StringColumnStatsAggregator implements ColumnStatsAggregator {
+
+ @Override
+ public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats();
+ StringColumnStatsData newData = newColStats.getStatsData().getStringStats();
+ aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen()));
+ aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen()));
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ }
+}

Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java
URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java?rev=1670966&view=auto
==============================================================================
--- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java (added)
+++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java Thu Apr 2 20:59:42 2015
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.utils;
+
+import java.util.Arrays;
+
+/**
+ * Barebones fixed length bit vector using a byte array
+ */
+public class BitVector {
+ // We'll use this as the bit vector container
+ private byte data[];
+ public static int ELEMENT_SIZE = Byte.SIZE;
+
+ public BitVector(int size) {
+ data = new byte[size/ELEMENT_SIZE];
+ }
+
+ /**
+ * Total bits -> num elements * size of each element
+ *
+ */
+ public long getSize() {
+ return data.length * ELEMENT_SIZE;
+ }
+
+ /**
+ * Set the bit at the given index to 1
+ *
+ * @param bitIndex
+ */
+ public void setBit(int bitIndex) {
+ validateBitIndex(bitIndex);
+ int dataIndex = bitIndex / ELEMENT_SIZE;
+ int elementIndex = ELEMENT_SIZE - bitIndex % ELEMENT_SIZE - 1;
+ // Set the elementIndex'th bit of data[dataIndex]'th element
+ data[dataIndex] = (byte) (data[dataIndex] | (1 << elementIndex));
+ }
+
+ /**
+ * Set the bit at the given index to 0
+ *
+ * @param bitIndex
+ */
+ public void unSetBit(int bitIndex) {
+ validateBitIndex(bitIndex);
+ int dataIndex = bitIndex / ELEMENT_SIZE;
+ int elementIndex = ELEMENT_SIZE - bitIndex % ELEMENT_SIZE - 1;
+ // Unset the elementIndex'th bit of data[dataIndex]'th element
+ data[dataIndex] = (byte) (data[dataIndex] & ~(1 << elementIndex));
+ }
+
+ /**
+ * Check if a bit at the given index is 1
+ * @param bitIndex
+ */
+ public boolean isBitSet(int bitIndex) {
+ validateBitIndex(bitIndex);
+ int dataIndex = bitIndex / ELEMENT_SIZE;
+ int elementIndex = ELEMENT_SIZE - bitIndex % ELEMENT_SIZE - 1;
+ if ((data[dataIndex] & (1 << elementIndex)) > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Set all bits to 0
+ *
+ */
+ public void clearAll() {
+ Arrays.fill(data, (byte) 0x00);
+ }
+
+ /**
+ * Set all bits to 1
+ *
+ */
+ public void setAll() {
+ Arrays.fill(data, (byte) 0xFF);
+ }
+
+ /**
+ * Prints the bit vector as a string of bit values (e.g. 01010111)
+ */
+ @Override
+ public String toString() {
+ StringBuilder str = new StringBuilder();
+ for(byte b : data) {
+ str.append(Integer.toBinaryString((b & 0xFF) + 0x100).substring(1));
+ }
+ return str.toString();
+ }
+
+ /**
+ * Check if queried bitIndex is in valid range
+ * @param bitIndex
+ */
+ private void validateBitIndex(int bitIndex) {
+ if ((bitIndex >= getSize()) || (bitIndex < 0)) {
+ throw new IllegalArgumentException("Bit index out of range: " + bitIndex);
+ }
+ }
+
+}

Search Discussions

Discussion Posts

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 2 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedApr 2, '15 at 8:59p
activeApr 2, '15 at 8:59p
posts2
users1
websitehive.apache.org

1 user in discussion

Vgumashta: 2 posts

People

Translate

site design / logo © 2021 Grokbase