FAQ
Repository: hive
Updated Branches:
   refs/heads/spark 1a87bcc0f -> e4b8cf43c


HIVE-12515: Clean the SparkCounters related code after remove counter based stats collection[Spark Branch] (Rui reviewed by Chengxiang & Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e4b8cf43
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e4b8cf43
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e4b8cf43

Branch: refs/heads/spark
Commit: e4b8cf43c385049fcfabacfec592d2ed2038ab66
Parents: 1a87bcc
Author: Rui Li <rui.li@intel.com>
Authored: Thu Dec 3 16:37:05 2015 +0800
Committer: Rui Li <rui.li@intel.com>
Committed: Thu Dec 3 16:37:30 2015 +0800

----------------------------------------------------------------------
  .../test/resources/testconfiguration.properties | 4 -
  .../hadoop/hive/ql/exec/spark/SparkTask.java | 146 +------------------
  2 files changed, 1 insertion(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e4b8cf43/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 0af7644..c605488 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1076,8 +1076,6 @@ spark.query.files=add_part_multiple.q, \
    stats7.q, \
    stats8.q, \
    stats9.q, \
- stats_counter.q, \
- stats_counter_partitioned.q, \
    stats_noscan_1.q, \
    stats_noscan_2.q, \
    stats_only_null.q, \
@@ -1282,8 +1280,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
    schemeAuthority2.q,\
    scriptfile1.q,\
    scriptfile1_win.q,\
- stats_counter.q,\
- stats_counter_partitioned.q,\
    temp_table_external.q,\
    truncate_column_buckets.q,\
    uber_reduce.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/e4b8cf43/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index eaeffee..6e13f7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,8 +19,6 @@
  package org.apache.hadoop.hive.ql.exec.spark;

  import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
  import java.util.Iterator;
@@ -30,10 +28,7 @@ import java.util.Map;

  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.StatsSetupConst;
  import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
  import org.apache.hadoop.hive.ql.DriverContext;
  import org.apache.hadoop.hive.ql.QueryPlan;
  import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -42,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.MapOperator;
  import org.apache.hadoop.hive.ql.exec.Operator;
  import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
  import org.apache.hadoop.hive.ql.exec.ScriptOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
  import org.apache.hadoop.hive.ql.exec.Task;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
@@ -56,25 +50,15 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
  import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
  import org.apache.hadoop.hive.ql.log.PerfLogger;
  import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
  import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
  import org.apache.hadoop.hive.ql.plan.MapWork;
  import org.apache.hadoop.hive.ql.plan.OperatorDesc;
  import org.apache.hadoop.hive.ql.plan.ReduceWork;
  import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
  import org.apache.hadoop.hive.ql.plan.api.StageType;
  import org.apache.hadoop.hive.ql.session.SessionState;
  import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
  import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.spark.counter.SparkCounters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;

  import com.google.common.collect.Lists;

@@ -84,7 +68,6 @@ public class SparkTask extends Task<SparkWork> {
    private static final LogHelper console = new LogHelper(LOG);
    private final PerfLogger perfLogger = SessionState.getPerfLogger();
    private static final long serialVersionUID = 1L;
- private SparkCounters sparkCounters;

    @Override
    public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
@@ -103,7 +86,7 @@ public class SparkTask extends Task<SparkWork> {
        sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);

        SparkWork sparkWork = getWork();
- sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+ sparkWork.setRequiredCounterPrefix(getOperatorCounters());

        perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
        SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
@@ -113,8 +96,6 @@ public class SparkTask extends Task<SparkWork> {
        rc = jobRef.monitorJob();
        SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
        if (rc == 0) {
- sparkCounters = sparkJobStatus.getCounter();
- // for RSC, we should get the counters after job has finished
          SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
          if (LOG.isInfoEnabled() && sparkStatistics != null) {
            LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
@@ -230,10 +211,6 @@ public class SparkTask extends Task<SparkWork> {
      return ((ReduceWork) children.get(0)).getReducer();
    }

- public SparkCounters getSparkCounters() {
- return sparkCounters;
- }
-
    /**
     * Set the number of reducers for the spark work.
     */
@@ -247,127 +224,6 @@ public class SparkTask extends Task<SparkWork> {
      console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
    }

- private Map<String, List<String>> getCounterPrefixes() throws HiveException, MetaException {
- Map<String, List<String>> counters = getOperatorCounters();
- StatsTask statsTask = getStatsTaskInChildTasks(this);
- String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- // fetch table prefix if SparkTask try to gather table statistics based on counter.
- if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
- List<String> prefixes = getRequiredCounterPrefix(statsTask);
- for (String prefix : prefixes) {
- List<String> counterGroup = counters.get(prefix);
- if (counterGroup == null) {
- counterGroup = new LinkedList<String>();
- counters.put(prefix, counterGroup);
- }
- counterGroup.add(StatsSetupConst.ROW_COUNT);
- counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
- }
- }
- return counters;
- }
-
- private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
- List<String> prefixs = new LinkedList<String>();
- StatsWork statsWork = statsTask.getWork();
- String tablePrefix = getTablePrefix(statsWork);
- List<Map<String, String>> partitionSpecs = getPartitionSpecs(statsWork);
- int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf);
-
- if (partitionSpecs == null) {
- prefixs.add(Utilities.getHashedStatsPrefix(tablePrefix, maxPrefixLength));
- } else {
- for (Map<String, String> partitionSpec : partitionSpecs) {
- String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partitionSpec));
- prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength));
- }
- }
-
- return prefixs;
- }
-
- private String getTablePrefix(StatsWork work) throws HiveException {
- String tableName;
- if (work.getLoadTableDesc() != null) {
- tableName = work.getLoadTableDesc().getTable().getTableName();
- } else if (work.getTableSpecs() != null) {
- tableName = work.getTableSpecs().tableName;
- } else {
- tableName = work.getLoadFileDesc().getDestinationCreateTable();
- }
- Table table;
- try {
- table = db.getTable(tableName);
- } catch (HiveException e) {
- LOG.warn("Failed to get table:" + tableName);
- // For CTAS query, table does not exist in this period, just use table name as prefix.
- return tableName.toLowerCase();
- }
- return table.getDbName() + "." + table.getTableName();
- }
-
- private static StatsTask getStatsTaskInChildTasks(Task<? extends Serializable> rootTask) {
-
- List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
- if (childTasks == null) {
- return null;
- }
- for (Task<? extends Serializable> task : childTasks) {
- if (task instanceof StatsTask) {
- return (StatsTask) task;
- } else {
- Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
- if (childTask instanceof StatsTask) {
- return (StatsTask) childTask;
- } else {
- continue;
- }
- }
- }
-
- return null;
- }
-
- private List<Map<String, String>> getPartitionSpecs(StatsWork work) throws HiveException {
- if (work.getLoadFileDesc() != null) {
- return null; //we are in CTAS, so we know there are no partitions
- }
- Table table;
- List<Map<String, String>> partitionSpecs = new ArrayList<Map<String, String>>();
-
- if (work.getTableSpecs() != null) {
-
- // ANALYZE command
- TableSpec tblSpec = work.getTableSpecs();
- table = tblSpec.tableHandle;
- if (!table.isPartitioned()) {
- return null;
- }
- // get all partitions that matches with the partition spec
- List<Partition> partitions = tblSpec.partitions;
- if (partitions != null) {
- for (Partition partition : partitions) {
- partitionSpecs.add(partition.getSpec());
- }
- }
- } else if (work.getLoadTableDesc() != null) {
-
- // INSERT OVERWRITE command
- LoadTableDesc tbd = work.getLoadTableDesc();
- table = db.getTable(tbd.getTable().getTableName());
- if (!table.isPartitioned()) {
- return null;
- }
- DynamicPartitionCtx dpCtx = tbd.getDPCtx();
- if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
- // we could not get dynamic partition information before SparkTask execution.
- } else { // static partition
- partitionSpecs.add(tbd.getPartitionSpec());
- }
- }
- return partitionSpecs;
- }
-
    private Map<String, List<String>> getOperatorCounters() {
      String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
      Map<String, List<String>> counters = new HashMap<String, List<String>>();

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedDec 3, '15 at 8:38a
activeDec 3, '15 at 8:38a
posts1
users1
websitehive.apache.org

1 user in discussion

Lirui: 1 post

People

Translate

site design / logo © 2021 Grokbase