FAQ
Repository: hive
Updated Branches:
   refs/heads/master 601a48122 -> d5e8544e7


HIVE-12250 Zookeeper connection leaks in Hive's HBaseHandler (Naveen, reviewed by Aihua and Szehon)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5e8544e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5e8544e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5e8544e

Branch: refs/heads/master
Commit: d5e8544e7106ba0879b176c3524e369833bd844b
Parents: 601a481
Author: Szehon Ho <szehon@cloudera.com>
Authored: Tue Oct 27 11:09:07 2015 -0700
Committer: Szehon Ho <szehon@cloudera.com>
Committed: Tue Oct 27 11:09:07 2015 -0700

----------------------------------------------------------------------
  .../hive/hbase/HiveHBaseTableInputFormat.java | 105 ++++++++++---------
  .../hive/hbase/HiveHBaseTableOutputFormat.java | 9 ++
  2 files changed, 64 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d5e8544e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
index 8e72759..5f4a1e4 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
@@ -107,6 +107,7 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
      try {
        recordReader.initialize(tableSplit, tac);
      } catch (InterruptedException e) {
+ closeTable(); // Free up the HTable connections
        throw new IOException("Failed to initialize RecordReader", e);
      }

@@ -445,65 +446,69 @@ public class HiveHBaseTableInputFormat extends TableInputFormatBase
      String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
      boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);

- if (hbaseColumnsMapping == null) {
- throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
- }
-
- ColumnMappings columnMappings = null;
      try {
- columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
- } catch (SerDeException e) {
- throw new IOException(e);
- }
+ if (hbaseColumnsMapping == null) {
+ throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
+ }

- int iKey = columnMappings.getKeyIndex();
- int iTimestamp = columnMappings.getTimestampIndex();
- ColumnMapping keyMapping = columnMappings.getKeyMapping();
-
- // Take filter pushdown into account while calculating splits; this
- // allows us to prune off regions immediately. Note that although
- // the Javadoc for the superclass getSplits says that it returns one
- // split per region, the implementation actually takes the scan
- // definition into account and excludes regions which don't satisfy
- // the start/stop row conditions (HBASE-1829).
- Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
- HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
- jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
-
- // The list of families that have been added to the scan
- List<String> addedFamilies = new ArrayList<String>();
-
- // REVIEW: are we supposed to be applying the getReadColumnIDs
- // same as in getRecordReader?
- for (ColumnMapping colMap : columnMappings) {
- if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
- continue;
+ ColumnMappings columnMappings = null;
+ try {
+ columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
+ } catch (SerDeException e) {
+ throw new IOException(e);
        }

- if (colMap.qualifierName == null) {
- scan.addFamily(colMap.familyNameBytes);
- addedFamilies.add(colMap.familyName);
- } else {
- if(!addedFamilies.contains(colMap.familyName)){
- // add the column only if the family has not already been added
- scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+ int iKey = columnMappings.getKeyIndex();
+ int iTimestamp = columnMappings.getTimestampIndex();
+ ColumnMapping keyMapping = columnMappings.getKeyMapping();
+
+ // Take filter pushdown into account while calculating splits; this
+ // allows us to prune off regions immediately. Note that although
+ // the Javadoc for the superclass getSplits says that it returns one
+ // split per region, the implementation actually takes the scan
+ // definition into account and excludes regions which don't satisfy
+ // the start/stop row conditions (HBASE-1829).
+ Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
+ HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
+ jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
+
+ // The list of families that have been added to the scan
+ List<String> addedFamilies = new ArrayList<String>();
+
+ // REVIEW: are we supposed to be applying the getReadColumnIDs
+ // same as in getRecordReader?
+ for (ColumnMapping colMap : columnMappings) {
+ if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
+ continue;
+ }
+
+ if (colMap.qualifierName == null) {
+ scan.addFamily(colMap.familyNameBytes);
+ addedFamilies.add(colMap.familyName);
+ } else {
+ if(!addedFamilies.contains(colMap.familyName)){
+ // add the column only if the family has not already been added
+ scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
+ }
          }
        }
- }
- setScan(scan);
+ setScan(scan);

- Job job = new Job(jobConf);
- JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
- Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
+ Job job = new Job(jobConf);
+ JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+ Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);

- List<org.apache.hadoop.mapreduce.InputSplit> splits =
- super.getSplits(jobContext);
- InputSplit [] results = new InputSplit[splits.size()];
+ List<org.apache.hadoop.mapreduce.InputSplit> splits =
+ super.getSplits(jobContext);
+ InputSplit [] results = new InputSplit[splits.size()];

- for (int i = 0; i < splits.size(); i++) {
- results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
- }
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
+ }

- return results;
+ return results;
+ } finally {
+ closeTable();
+ }
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5e8544e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
index 3100885..0715a51 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
@@ -145,5 +145,14 @@ public class HiveHBaseTableOutputFormat extends
        }
        m_table.put(put);
      }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ m_table.close();
+ } finally {
+ super.finalize();
+ }
+ }
    }
  }

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedOct 27, '15 at 6:10p
activeOct 27, '15 at 6:10p
posts1
users1
websitehive.apache.org

1 user in discussion

Szehon: 1 post

People

Translate

site design / logo © 2021 Grokbase