Grokbase Groups Hive commits May 2012
FAQ
Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ExternalInputsHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ExternalInputsHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ExternalInputsHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ExternalInputsHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Implementation of a compile time hook that updates the inputs to include the managed objects
+ * which have data external inputs are pointing to
+ */
+public class ExternalInputsHook extends AbstractSemanticAnalyzerHook {
+
+ private static final int SQL_NUM_RETRIES = 3 ;
+ private static final int RETRY_MAX_INTERVAL_SEC = 60;
+
+ // Does nothing
+ @Override
+ public ASTNode preAnalyze(
+ HiveSemanticAnalyzerHookContext context,
+ ASTNode ast) throws SemanticException {
+ //do nothing
+ return ast;
+ }
+
+ // Updates the inputs to include managed tables/partitions whose data is pointed to by external
+ // inputs
+ @Override
+ public void postAnalyze(
+ HiveSemanticAnalyzerHookContext context,
+ List<Task<? extends Serializable>> rootTasks) throws SemanticException {
+
+ HiveSemanticAnalyzerHookContextImpl ctx = (HiveSemanticAnalyzerHookContextImpl)context;
+ HiveConf conf = (HiveConf)ctx.getConf();
+
+ Set<String> externalLocations = new HashSet<String>();
+
+ for (ReadEntity input : ctx.getInputs()) {
+
+ // If this input is either an external table or a partition in an external table, add its
+ // location to the set of locations
+ if (input.getTable().getTableType() == TableType.EXTERNAL_TABLE) {
+ String location = null;
+ try {
+ location = input.getLocation().toString();
+ } catch (Exception e) {
+ throw new SemanticException("GetLocation failed", e);
+ }
+
+ // We assume all managed tables exist in /user/facebook/warehouse/
+ // This helps to avoid having to look up if there are managed tables pointing to the data
+ // being pointed to by scrape and scribe staging tables, which point to directories like
+ // /user/facebook/scrape_staging (staging tables) and /user/facebook/scribe_staging
+ // (current tables) and /tmp (signal tables)
+ // We are also excluding inputs which are partitioned tables (without their partitions)
+ // If the input is a partitioned table, it can only be a metadata change, and hence only
+ // needs the external table, not the underlying managed table. If any data was queried
+ // the partition queried will also be in the inputs and we can get the managed
+ // table/partition from this.
+ if (location.contains("/user/facebook/warehouse/") &&
+ (!input.getTable().isPartitioned() || input.getType() != ReadEntity.Type.TABLE)) {
+ externalLocations.add(location);
+ }
+ }
+ }
+
+ // If there were some external inputs, get the managed tables/partitions whose data they
+ // point to
+ if (!externalLocations.isEmpty()) {
+ // The 2 cases in the select are as follows:
+ // d1.name, t1.tbl_name, p1.part_name
+ // 1) The external entity's location is such that there are one or more partitions whose
+ // location is a subdirectory, this includes if the external entity's location is the same
+ // as the location of a partitioned table, in which case all partitions whose location has
+ // the table's location as a prefix will be returned, not the table (If the location of
+ // the table was ever changed, this means only the subset of partitions created after the
+ // location was changed will be included)
+ // d2.name, t2.tbl_name, NULL
+ // 2) The external entity's location is such that there is an unpartitioned whose location is
+ // a prefix. In this case the table is returned.
+
+ String sql = "SELECT IF(p1.part_name IS NOT NULL, d1.name, d2.name), " +
+ " IF(p1.part_name IS NOT NULL, t1.tbl_name, t2.tbl_name), " +
+ " p1.part_name " +
+ "FROM SDS s LEFT JOIN PARTITIONS p1 ON s.sd_id = p1.sd_id " +
+ "LEFT JOIN TBLS t1 ON t1.tbl_id = p1.tbl_id " +
+ "LEFT JOIN DBS d1 ON t1.db_id = d1.db_id " +
+ "LEFT JOIN TBLS t2 ON t2.sd_id = s.sd_id " +
+ "LEFT JOIN DBS d2 ON d2.db_id = t2.db_id " +
+ "LEFT JOIN PARTITION_KEYS k on t2.tbl_id = k.tbl_id " +
+ "WHERE ((p1.part_name IS NOT NULL AND t1.tbl_type = 'MANAGED_TABLE') OR " +
+ " (p1.part_name IS NULL AND t2.tbl_type = 'MANAGED_TABLE' AND" +
+ " k.tbl_id IS NULL)) AND (";
+
+ List<Object> sqlParams = new ArrayList<Object>();
+
+ boolean firstLocation = true;
+ for (String location : externalLocations) {
+ if (!firstLocation) {
+ sql += "OR ";
+ } else {
+ firstLocation = false;
+ }
+
+ sql += "s.location LIKE ? ";
+ sql += "OR s.location = ? ";
+ // Adding the / ensures that we will only get locations which are subdirectories of the
+ // external entities location, rather than just having it as a prefix
+ sqlParams.add(location + "/%");
+ // Also check if it is equal, in which case the final / will not be in the location or it
+ // will be captured by the LIKE
+ sqlParams.add(location);
+ }
+
+ sql += ");";
+ ConnectionUrlFactory metastoreDbUrlFactory =
+ HookUtils.getUrlFactory(
+ conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.METASTORE_CONNECTION_FACTORY,
+ FBHiveConf.METASTORE_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.METASTORE_HOST_DATABASE_VAR_NAME);
+ List<List<Object>> results = null;
+ try {
+ results = HookUtils.runInsertSelect(conf,
+ metastoreDbUrlFactory, sql, sqlParams, false, SQL_NUM_RETRIES,
+ RETRY_MAX_INTERVAL_SEC, false);
+ } catch (Exception e) {
+ throw new SemanticException("SQL query to retrieve names of managed tables/partitions " +
+ "pointed to by externals failed", e);
+ }
+
+ // Construct a mapping to pass to updateInputs, the structure of the mapping is described in
+ // updateInputs's method description
+ Map<String[], List<String>> tableToPartitions = new HashMap<String[], List<String>>();
+
+ for (List<Object> result : results) {
+
+ String[] dbTable = {(String)result.get(0), (String)result.get(1)};
+ if (!tableToPartitions.containsKey(dbTable)) {
+ tableToPartitions.put(dbTable, new ArrayList<String>());
+ }
+
+ String partitionName = (String)result.get(2);
+ if (partitionName != null) {
+ tableToPartitions.get(dbTable).add(partitionName);
+ }
+ }
+
+ try {
+ updateInputs(ctx.getInputs(), tableToPartitions, ctx.getHive());
+ } catch (HiveException e) {
+ throw new SemanticException("Failed to retrieve managed Table(s)/Partition(s) mapped to " +
+ "by externals from the metastore.", e);
+ }
+ }
+ }
+
+ /**
+ * Given a set of inputs and a map from db/table name to a list of partition names, and an
+ * instance of Hive it updates the inputs to include for each db/table name the partitions, or if
+ * the list of partitions is empty, the table
+ * @param inputs A set of ReadEntities
+ * @param tableToPartitions A map, whose keys are arrays of strings of length 2, the first index
+ * should correspond to the db name and the second to the table name,
+ * the values are lists of Strings representing partition names, if the
+ * list is empty it is assumed the table is unpartitioned
+ * @param db An instance of Hive, used to connect to the metastore.
+ * @throws HiveException
+ */
+ private void updateInputs(Set<ReadEntity> inputs,
+ Map<String[], List<String>> tableToPartitions, Hive db)
+ throws HiveException {
+ for (Entry<String[], List<String>> entry : tableToPartitions.entrySet()) {
+
+ Table table = db.getTable(entry.getKey()[0], entry.getKey()[1]);
+
+ if (entry.getValue().isEmpty()) {
+ inputs.add(new ReadEntity(table));
+ } else {
+ List<Partition> partitions = db.getPartitionsByNames(table, entry.getValue());
+ for (Partition partition : partitions) {
+ inputs.add(new ReadEntity(partition));
+ }
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FbUpdateInputAccessTimeHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FbUpdateInputAccessTimeHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FbUpdateInputAccessTimeHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FbUpdateInputAccessTimeHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.HashMap;
+import java.util.Set;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+/**
+ * Implementation of a pre AND post execute hook that updates the access times
+ * for all the inputs.
+ *
+ * It is required that this hook is put as the last pre-hook and the first
+ * post-hook. Invoked as pre-hook, it will start a background thread to update
+ * update time of all partitions and tables in the input set. Invoked as
+ * post-hook, it will wait the background thread to finish. And fail the query
+ * if the background thread fails.
+ */
+public class FbUpdateInputAccessTimeHook implements ExecuteWithHookContext {
+ static final private Log LOG = LogFactory
+ .getLog("hive.ql.hooks.FbUpdateInputAccessTimeHook");
+
+ private Hive db;
+ static private Object staticLock = new Object();
+ static private HookThread hookThread = null;
+ static private HookContext lastHookContext = null;
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ assert (hookContext != null);
+
+ // if no input, there is no need to start the backgrond thread.
+ if (hookContext.getInputs() == null || hookContext.getInputs().isEmpty()) {
+ return;
+ }
+
+ // This race condition should never happen. But since we use static
+ // member to keep some global states, we lock it in case it happens
+ // because of a bug, we won't produce unpredictable results
+ synchronized (staticLock) {
+ // there is no flag to determine it is pre-hook or post-hook.
+ // We just simply make the assumption that if one hook context
+ // is passed again, it is post hook.
+ if (lastHookContext == hookContext) {
+ lastHookContext = null;
+ runPosthook(hookContext);
+ } else if (lastHookContext != null || hookThread != null) {
+ // If we don't forget to put the hook in post-execution hooks,
+ // likely the previous task failed so that post-hook didn't have
+ // chance to be executed.
+ //
+ // Ideally this error message should print to SessionState's error
+ // stream if assigned. However, it is not in HookContext.
+ // We use standard error message for now.
+ System.err.println(
+ "WARNING: FbUpdateInputAccessTimeHook doesn't start with a clear " +
+ "state. Ignore this message if the previous query failed. If " +
+ "previous task succeeded, check whether " +
+ "FbUpdateInputAccessTimeHook is among the post-execution hooks");
+
+ if (hookThread != null) {
+ System.err.println("Waiting for pending background thread of " +
+ "FbUpdateInputAccessTimeHook to finish...");
+ hookThread.join();
+ System.err.println("Background thread of FbUpdateInputAccessTimeHook" +
+ " finished.");
+ hookThread = null;
+ }
+ lastHookContext = hookContext;
+ runPrehook(hookContext);
+ } else {
+ if (!hookContext.getCompleteTaskList().isEmpty()) {
+ throw new HiveException(
+ "FbUpdateInputAccessTimeHook is not a part of "
+ + "pre-execution hook?");
+ }
+ lastHookContext = hookContext;
+ runPrehook(hookContext);
+ }
+ }
+ }
+
+ private void runPrehook(HookContext hookContext) {
+ LOG.info("run as pre-execution hook");
+ hookThread = new HookThread(hookContext.getConf(), hookContext.getInputs(),
+ hookContext.getOutputs());
+ hookThread.start();
+ }
+
+ private void runPosthook(HookContext hookContext) throws HiveException {
+ LOG.info("run as post-execution hook");
+ if (hookThread != null) {
+ HookThread pendingThread = hookThread;
+ try {
+ pendingThread.join();
+ } catch (InterruptedException e) {
+ throw new HiveException(
+ "Background thread in FbUpdateInputAccessTimeHook failed", e);
+ } finally {
+ hookThread = null;
+ }
+
+ if (!pendingThread.isSuccessful()) {
+ if (pendingThread.getHiveException() != null) {
+ throw new HiveException("FbUpdateInputAccessTimeHook failed",
+ pendingThread.getHiveException());
+ } else if (pendingThread.getInvalidOperationException() != null) {
+ throw new HiveException("FbUpdateInputAccessTimeHook failed",
+ pendingThread.getInvalidOperationException());
+ } else {
+ throw new HiveException("FbUpdateInputAccessTimeHook failed with "
+ + "Unhandled Exception.");
+ }
+ }
+ } else {
+ throw new HiveException(
+ "FbUpdateInputAccessTimeHook is not one of pre-execution hook, "
+ + "but it is one of the post-execution hook.");
+ }
+ }
+
+ /**
+ * class for the background thread
+ *
+ * @author sdong
+ *
+ */
+ class HookThread extends Thread {
+ Set<ReadEntity> inputs;
+ Set<WriteEntity> outputs;
+ HiveConf hiveConf;
+ boolean success;
+
+ HiveException hiveException;
+ InvalidOperationException invalidOperationException;
+
+ HookThread(HiveConf hiveConf, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs) {
+ this.hiveConf = hiveConf;
+ this.inputs = inputs;
+ this.outputs = outputs;
+ success = false;
+ }
+
+ public boolean isSuccessful() {
+ return success;
+ }
+
+ public HiveException getHiveException() {
+ return hiveException;
+ }
+
+ public InvalidOperationException getInvalidOperationException() {
+ return invalidOperationException;
+ }
+
+ private void updateTableAccessTime(HashMap<String, Table> tableMap,
+ Table table, int lastAccessTime) throws HiveException,
+ InvalidOperationException {
+ if (!tableMap.containsKey(table.getTableName())) {
+ Table t = db.getTable(table.getTableName());
+ t.setLastAccessTime(lastAccessTime);
+ db.alterTable(t.getTableName(), t);
+ tableMap.put(table.getTableName(), t);
+ }
+ }
+
+ public void run() {
+ try {
+ if (db == null) {
+ try {
+ db = Hive.get(hiveConf);
+ } catch (HiveException e) {
+ // ignore
+ db = null;
+ return;
+ }
+ }
+
+ int lastAccessTime = (int) (System.currentTimeMillis() / 1000);
+
+ HashMap<String, Table> tableMap = new HashMap<String, Table> ();
+
+ for (ReadEntity re : inputs) {
+ // Set the last query time
+ ReadEntity.Type typ = re.getType();
+ switch (typ) {
+ // It is possible that read and write entities contain a old
+ // version
+ // of the object, before it was modified by StatsTask.
+ // Get the latest versions of the object
+ case TABLE: {
+ updateTableAccessTime(tableMap, re.getTable(),
+ lastAccessTime);
+ break;
+ }
+ case PARTITION: {
+ Partition p = re.getPartition();
+ updateTableAccessTime(tableMap, p.getTable(), lastAccessTime);
+ // table already in the map after updating tables' access time
+ Table t = tableMap.get(p.getTable().getTableName());
+ p = db.getPartition(t, p.getSpec(), false);
+ p.setLastAccessTime(lastAccessTime);
+ db.alterPartition(t.getTableName(), p);
+ break;
+ }
+ default:
+ // ignore dummy inputs
+ break;
+ }
+ }
+ success = true;
+ } catch (HiveException e) {
+ hiveException = e;
+ } catch (InvalidOperationException e) {
+ invalidOperationException = e;
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FifoPoolHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FifoPoolHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FifoPoolHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/FifoPoolHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.HookUtils.InputInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * The implementation of the hook that based on the input size of the query
+ * submits big jobs into a fifo pool.
+ */
+public class FifoPoolHook {
+
+ static final private Log LOG = LogFactory.getLog(FifoPoolHook.class.getName());
+ static private boolean fifoed = false;
+
+ static final private String failure = "FifoHook failure: ";
+
+ public static class PreExec implements ExecuteWithHookContext {
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ assert(hookContext.getHookType() == HookContext.HookType.PRE_EXEC_HOOK);
+ SessionState sess = SessionState.get();
+ Set<ReadEntity> inputs = hookContext.getInputs();
+ Map<String, ContentSummary> inputToCS = hookContext.getInputPathToContentSummary();
+
+ QueryPlan queryPlan = hookContext.getQueryPlan();
+ List<Task<? extends Serializable>> rootTasks = queryPlan.getRootTasks();
+
+ // If it is a pure DDL task,
+ if (rootTasks == null) {
+ return;
+ }
+ if (rootTasks.size() == 1) {
+ Task<? extends Serializable> tsk = rootTasks.get(0);
+ if (tsk instanceof DDLTask) {
+ return;
+ }
+ }
+
+ HiveConf conf = sess.getConf();
+
+ // In case posthook of the previous query was not triggered,
+ // we revert job tracker to clean state first.
+ if (fifoed) {
+ conf.set("mapred.fairscheduler.pool", "");
+ fifoed = false;
+ }
+ // if the pool is specified already - bailout
+ String poolValue = conf.get("mapred.fairscheduler.pool", null);
+ if ((poolValue != null) && !poolValue.isEmpty()){
+ return;
+ }
+
+ // if we are set on local mode execution (via user or auto) bail
+ if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
+ return;
+ }
+
+ // check if we need to run at all
+ if (!conf.getBoolean("fbhive.fifopool.auto", false)) {
+ return;
+ }
+
+ long maxGigaBytes = conf.getLong("fbhive.fifopool.GigaBytes", 0L);
+ if (maxGigaBytes == 0) {
+ LOG.info (failure + "fifopool.GigaBytes = 0");
+ return;
+ }
+
+ long maxBytes = maxGigaBytes * 1024 * 1024 * 1024L;
+
+ if (maxGigaBytes < 0) {
+ LOG.warn (failure + "fifopool.GigaBytes value of " + maxGigaBytes +
+ "is invalid");
+ return;
+ }
+
+ // Get the size of the input
+ Map<String, Double> pathToTopPercentage = new HashMap<String, Double>();
+ Set<ReadEntity> nonSampledInputs = new HashSet<ReadEntity>();
+ boolean isThereSampling = HookUtils.checkForSamplingTasks(
+ hookContext.getQueryPlan().getRootTasks(),
+ pathToTopPercentage, nonSampledInputs);
+
+ InputInfo info = HookUtils.getInputInfo(inputs, inputToCS, conf,
+ isThereSampling, pathToTopPercentage, nonSampledInputs,
+ Long.MAX_VALUE, maxBytes);
+
+ if (info.getSize() > maxBytes) {
+ LOG.info ("Submitting to the fifo pool since the input length of " +
+ info.getSize() + " is more than " + maxBytes);
+ } else {
+ LOG.info("Not submitting to the fifo pool since the input length " +
+ info.getSize() + " is less than " + maxBytes);
+ return;
+ }
+
+ // The job meets at least one of the requirements to be submitted into the
+ // fifo pool
+ String fifoPool = conf.get("fbhive.fifopool.name", "fifo");
+ fifoed = true;
+ conf.set("mapred.fairscheduler.pool", fifoPool);
+ }
+ }
+
+ public static class PostExec implements ExecuteWithHookContext {
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ assert(hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK);
+ SessionState ss = SessionState.get();
+ this.run(ss);
+ }
+
+ public void run(SessionState sess) throws Exception {
+ HiveConf conf = sess.getConf();
+
+ if (fifoed) {
+ conf.set("mapred.fairscheduler.pool", "");
+ fifoed = false;
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HiveConfigLoggingHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HiveConfigLoggingHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HiveConfigLoggingHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HiveConfigLoggingHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A post-execute hook that will log the overridden Hive Config
+ * for this query to the audit_log database.
+ */
+public class HiveConfigLoggingHook implements ExecuteWithHookContext{
+
+ protected ConnectionUrlFactory urlFactory = null;
+
+ public static ConnectionUrlFactory getQueryConfigUrlFactory(HiveConf conf) {
+ // Writes to the same database as BaseReplicationHook.
+ return HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.REPLICATION_CONNECTION_FACTORY,
+ FBHiveConf.REPLICATION_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.REPLICATION_HOST_DATABASE_VAR_NAME);
+ }
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ SessionState ss = SessionState.get();
+ if (ss == null) {
+ // QueryId not present. Nothing to do.
+ return;
+ }
+ HiveConf conf = ss.getConf();
+ String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
+ HiveOperation op = ss.getHiveOperation();
+
+ if ((op == null) ||
+ (!op.equals(HiveOperation.CREATETABLE_AS_SELECT) &&
+ !op.equals(HiveOperation.LOAD) &&
+ !op.equals(HiveOperation.QUERY))) {
+ return;
+ }
+
+ Map<String, String> overriddenConfig = ss.getOverriddenConfigurations();
+ String sql = "insert into query_config_log set queryId = ?, config_key = ?, config_value = ?";
+ List<Object> sqlParams = new ArrayList<Object>();
+ urlFactory = getQueryConfigUrlFactory(conf);
+ if (urlFactory == null) {
+ throw new RuntimeException("DB parameters not set!");
+ }
+
+ for (Map.Entry<String, String> e : overriddenConfig.entrySet()) {
+ String key = e.getKey();
+ String val = conf.get(key);
+ if (val != null) {
+ sqlParams.clear();
+ sqlParams.add(StringEscapeUtils.escapeJava(queryId));
+ sqlParams.add(StringEscapeUtils.escapeJava(key));
+ sqlParams.add(StringEscapeUtils.escapeJava(val));
+ HookUtils.runInsert(conf, urlFactory, sql, sqlParams, HookUtils
+ .getSqlNumRetry(conf));
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java Tue May 1 22:52:38 2012
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+
+/**
+ * Utilities for writing hooks.
+ */
+public class HookUtils {
+ static final private Log LOG = LogFactory.getLog(HookUtils.class.getName());
+
+ public static final String TABLE_CREATION_CLUSTER = "creation_cluster";
+
+ static final String POST_HOOK_DB_MAX_RETRY_VAR_NAME =
+ "fbhive.posthook.mysql.max_retries";
+
+ // The default value is to retry 20 times with maximum retry interval
+ // 60 seconds. The expectation is about 22 minutes. After 7 retries, it
+ // reaches 60 seconds.
+ static final int DEFAULT_SQL_NUM_RETRIES = 20;
+ static final int DEFAULT_RETRY_MAX_INTERVAL_SEC = 60;
+
+ private static Connection getConnection(HiveConf conf, String url)
+ throws SQLException {
+ return DriverManager.getConnection(
+ url,
+ conf.get(FBHiveConf.FBHIVE_DB_USERNAME),
+ conf.get(FBHiveConf.FBHIVE_DB_PASSWORD));
+ }
+
+ public static int getSqlNumRetry(HiveConf conf) {
+ return conf.getInt(POST_HOOK_DB_MAX_RETRY_VAR_NAME, 30);
+ }
+
+ public static void runInsert(HiveConf conf,
+ ConnectionUrlFactory urlFactory,
+ String sql,
+ List<Object> sqlParams) throws Exception {
+ runInsert(conf, urlFactory, sql, sqlParams, DEFAULT_SQL_NUM_RETRIES);
+ }
+
+ public static List<List<Object>> runInsertSelect(HiveConf conf,
+ ConnectionUrlFactory urlFactory,
+ String sql,
+ List<Object> sqlParams)
+ throws Exception {
+ return runInsertSelect(conf, urlFactory, sql, sqlParams, true);
+
+ }
+
+ public static List<List<Object>> runInsertSelect(HiveConf conf,
+ ConnectionUrlFactory urlFactory,
+ String sql,
+ List<Object> sqlParams,
+ boolean isWrite)
+ throws Exception {
+ return runInsertSelect(conf, urlFactory, sql, sqlParams, isWrite,
+ DEFAULT_SQL_NUM_RETRIES,
+ DEFAULT_RETRY_MAX_INTERVAL_SEC, false);
+ }
+
+ public static void runInsert(HiveConf conf,
+ ConnectionUrlFactory urlFactory,
+ String sql,
+ List<Object> sqlParams,
+ int numRetries)
+ throws Exception {
+ runInsertSelect(conf, urlFactory, sql, sqlParams, true, numRetries,
+ DEFAULT_RETRY_MAX_INTERVAL_SEC, true);
+ }
+
+ /*
+ * @param conf -
+ * @param parentTierName - the factory to create
+ * @param tierName - the factory to create
+ * @param tierParam1Name - the name of the first parameter
+ * @param tierParam2Name - the name of the second parameter
+ */
+ public static ConnectionUrlFactory getUrlFactory(
+ HiveConf conf,
+ String parentTierName,
+ String childTierName,
+ String tierParam1Name,
+ String tierParam2Name) {
+ return getUrlFactory(conf, parentTierName, childTierName, tierParam1Name, tierParam2Name, null);
+ }
+
+ /*
+ * @param conf -
+ * @param parentTierName - the factory to create
+ * @param tierName - the factory to create
+ * @param tierParam1Name - the name of the first parameter
+ * @param tierParam2Name - the name of the second parameter
+ */
+ public static ConnectionUrlFactory getUrlFactory(
+ Configuration conf,
+ String parentTierName,
+ String childTierName,
+ String tierParam1Name,
+ String tierParam2Name,
+ String commonParam) {
+
+ String parentTierValue =
+ parentTierName == null ? null : conf.get(parentTierName);
+ String childTierValue =
+ childTierName == null ? null : conf.get(childTierName);
+
+ String tierValue =
+ childTierValue != null && !childTierValue.isEmpty() ? childTierValue :
+ (parentTierValue != null && !parentTierValue.isEmpty() ? parentTierValue :
+ null);
+
+ if (tierValue == null) {
+ return null;
+ }
+
+ ConnectionUrlFactory conn =
+ (ConnectionUrlFactory)getObject(conf, tierValue);
+
+ String tierParamValue =
+ tierParam1Name == null ? null : conf.get(tierParam1Name);
+
+ if ((tierParamValue == null) || tierParamValue.isEmpty()) {
+ tierParamValue = tierParam2Name == null ? null : conf.get(tierParam2Name);
+ }
+
+ String commonParamValue =
+ commonParam == null ? null : conf.get(commonParam);
+
+ conn.init(tierParamValue, commonParamValue);
+ return conn;
+ }
+
+ // In the case of a select returns a list of lists, where each inner list represents a row
+ // returned by the query. In the case of an insert, returns null.
+ public static List<List<Object>> runInsertSelect(
+ HiveConf conf,
+ ConnectionUrlFactory urlFactory, String sql,
+ List<Object> sqlParams, boolean isWrite, int numRetries,
+ int retryMaxInternalSec, boolean insert)
+ throws Exception {
+
+ // throwing an exception
+ int waitMS = 300; // wait for at least 300 msec before next retry.
+ Random rand = new Random();
+ for (int i = 0; i < numRetries; ++i) {
+ try {
+ String url = urlFactory.getUrl(isWrite);
+ LOG.info("Attepting connection with URL " + url);
+ Connection conn = getConnection(conf, url);
+ PreparedStatement pstmt = conn.prepareStatement(sql);
+ int pos = 1;
+ for (Object param : sqlParams) {
+ if (param instanceof Integer) {
+ pstmt.setInt(pos++, ((Integer) param).intValue());
+ } else {
+ pstmt.setString(pos++, (String) param);
+ }
+ }
+ if (insert) {
+ int recordsUpdated = pstmt.executeUpdate();
+ LOG.info("rows inserted: " + recordsUpdated + " sql: " + sql);
+ pstmt.close();
+ return null;
+ }
+ else {
+ ResultSet result = pstmt.executeQuery();
+ List<List<Object>> results = new ArrayList<List<Object>>();
+ int numColumns = result.getMetaData().getColumnCount();
+ while (result.next()) {
+ List<Object> row = new ArrayList<Object>();
+ results.add(row);
+ for (int index = 1; index <= numColumns; index++) {
+ row.add(result.getObject(index));
+ }
+ }
+ pstmt.clearBatch();
+ pstmt.close();
+
+ LOG.info("rows selected: " + results.size() + " sql: " + sql);
+ return results;
+ }
+ } catch (Exception e) {
+ // We should catch a better exception than Exception, but since
+ // ConnectionUrlFactory.getUrl() defines throws Exception, it's hard
+ // for us to figure out the complete set it can throw. We follow
+ // ConnectionUrlFactory.getUrl()'s definition to catch Exception.
+ // It shouldn't be a big problem as after numRetries, we anyway exit.
+ LOG.info("Exception " + e + ". Will retry " + (numRetries - i)
+ + " times.");
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a SQLException, the wait time
+ // is a random number between [0,300] msec. If the first retry
+ // still fails, we will wait 300 msec grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 600 msec
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 600 msec grace period before retry and the waiting window
+ // is
+ // expanded to 1200 msec.
+
+ waitMS += waitMS;
+ if (waitMS > retryMaxInternalSec * 1000) {
+ waitMS = retryMaxInternalSec * 1000;
+ }
+ double waitTime = waitMS + waitMS * rand.nextDouble();
+ Thread.sleep((long) waitTime);
+ if (i + 1 == numRetries) {
+ LOG.error("Still got Exception after " + numRetries + " retries.",
+ e);
+ throw e;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Populates inputToCS with a mapping from the input paths to their respective ContentSummary
+ * objects. If an input is in a subdirectory of another's location, or in the same location,
+ * the input is not included in the total size of the inputs. If it is not already present in
+ * the mapping, it will not be added.
+ *
+ * @param inputs
+ * @param inputToCS
+ * @param conf
+ * @throws IOException
+ * @throws Exception
+ */
+ public static long getInputSize(Set<ReadEntity> inputs,
+ Map<String, ContentSummary> inputToCS, HiveConf conf)
+ throws IOException, Exception {
+
+ URI defaultPathUri = new URI(conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE));
+ String defaultPath = defaultPathUri.getPath();
+ String defaultPrefix =
+ defaultPathUri.toString().substring(0, defaultPathUri.toString().lastIndexOf(defaultPath));
+
+ // A mapping from the location as a String, formatted as a String for sorting, to the original
+ // path of the object
+ Map<String, Path> locationToPath = new HashMap<String, Path>();
+
+ for (ReadEntity re : inputs) {
+ Path p = null;
+ switch (re.getType()) {
+ case TABLE:
+ Table table = re.getTable();
+
+ if (table.isPartitioned()) {
+ // If the input is a partitioned table, do not include its content summary, as data will
+ // never be read from a partitioned table, only its partitions, so it must be a metadata
+ // change to the table.
+ continue;
+ }
+ if (table.isView()) {
+ // If the input is a view, it does not have a content summary as it is only a logical
+ // construct.
+ continue;
+ }
+
+ p = table.getPath();
+ break;
+ case PARTITION:
+ Partition partition = re.getPartition();
+
+ if (partition.getTable().isView()) {
+ // If the input is a partition of a view, it does not have a content summary as it is
+ // only a logical construct.
+ continue;
+ }
+
+ p = partition.getPartitionPath();
+ break;
+ default:
+ continue;
+ }
+
+ String location = re.getLocation().toString();
+
+ // If the location is something like /user/facebook/warehouse/ we want it to start with
+ // hdfs://... to make ensure using prefixes we can identify subdirectories
+ if (location.equals(defaultPath) ||
+ location.startsWith(defaultPath.endsWith("/") ? defaultPath : defaultPath + "/")) {
+ location = defaultPrefix + location;
+ }
+
+ // If the location does not end with / add it, this ensures /a/b/cd is not considered a
+ // subdirectory of /a/b/c
+ if (!location.endsWith("/")) {
+ location += "/";
+ }
+
+ locationToPath.put(location, p);
+ }
+
+ String[] locations = new String[locationToPath.size()];
+ locations = locationToPath.keySet().toArray(locations);
+ Arrays.sort(locations);
+
+ String lastLocation = null;
+ long totalInputSize = 0;
+ for (String formattedLocation : locations) {
+
+ // Since the locations have been sorted, if this location is a subdirectory of another, that
+ // directory must either be immediately before this location, or every location in between is
+ // also a subdirectory
+ if (lastLocation != null && formattedLocation.startsWith(lastLocation)) {
+ continue;
+ }
+
+ Path p = locationToPath.get(formattedLocation);
+ lastLocation = formattedLocation;
+
+ String pathStr = p.toString();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Finding from cache Content Summary for " + pathStr);
+ }
+
+ ContentSummary cs = (inputToCS == null) ? null : inputToCS.get(pathStr);
+ if (cs == null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Fetch Content Summary for " + pathStr);
+ }
+ FileSystem fs = p.getFileSystem(conf);
+ cs = fs.getContentSummary(p);
+ inputToCS.put(pathStr, cs);
+ }
+
+ totalInputSize += cs.getLength();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Length for file: " + pathStr + " = " + cs.getLength());
+ }
+ }
+
+ return totalInputSize;
+ }
+
+ /**
+ * Goes through the list of tasks, and populates a map from each path used
+ * by a mapRedTask to the highest percentage to which it is sampled, or 100
+ * if it is ever not sampled.
+ *
+ * Also, if a task is not a map reduce task or has a null or empty
+ * NameToSplitSample map, it adds all of its inputs to a
+ * set so they can be treated as unsampled.
+ *
+ * Calls itself recursively on each task's list of dependent tasks
+ *
+ * @return whether or not there is any sampling performed in the query
+ */
+ static public boolean checkForSamplingTasks(
+ List<Task<? extends Serializable>> tasks,
+ Map<String, Double> topPercentages,
+ Set<ReadEntity> nonSampledInputs) {
+ boolean isThereSampling = false;
+
+ for (Task<? extends Serializable> task : tasks) {
+ MapredWork work;
+
+ // Only look for sampled inputs in MapRedTasks with non-null, non-empty
+ // NameToSplitSample maps
+ if (task.getWork() instanceof MapredWork &&
+ (work = (MapredWork)task.getWork()).getNameToSplitSample() != null &&
+ !work.getNameToSplitSample().isEmpty()) {
+
+ isThereSampling = true;
+
+ // If the task is a map reduce task, go through each of the paths
+ // used by its work, if it is sampled check if it is the highest
+ // sampling percentage yet seen for that path. If it is not
+ // sampled, set the highest percentage to 100.
+ for (Map.Entry<String, ArrayList<String>> entry : work.getPathToAliases().entrySet()) {
+ double percentage = 0;
+
+ for (String alias : entry.getValue()) {
+ if (work.getNameToSplitSample().containsKey(alias)) {
+ if (work.getNameToSplitSample().get(alias).getPercent() > percentage) {
+ percentage = work.getNameToSplitSample().get(alias).getPercent();
+ }
+ } else {
+ percentage = 100;
+ break;
+ }
+ }
+
+ String path = entry.getKey();
+ if (!topPercentages.containsKey(path) || percentage > topPercentages.get(path)) {
+ topPercentages.put(path, percentage);
+ }
+ }
+ } else if (task.getQueryPlan() != null) {
+ nonSampledInputs.addAll(task.getQueryPlan().getInputs());
+ }
+
+ if (task.getDependentTasks() != null) {
+ isThereSampling |= checkForSamplingTasks(task.getDependentTasks(),
+ topPercentages,
+ nonSampledInputs);
+ }
+ }
+
+ return isThereSampling;
+ }
+
+ /**
+ * Helper class used to pass from getObjectSize back to the caller.
+ * This contains the total size of the objects passed in, as well as
+ * the type, size and number of files for each object. For eg, if a query
+ * references 2 partitions T1@p1 and T1@p2 of size 10 and 20, and 2 and 5
+ * files respectively, the totalSize will be 30, and the object map will be
+ * like:
+ * T1@p1 -> <MANAGED_TABLE, 10, 2)
+ * T1@p2 -> <MANAGED_TABLE, 20, 5)
+ * Currently, this is logged in the job_stats_log table, and used for
+ * downstream analysis.
+ */
+ public static class ObjectSize {
+ long totalSize;
+ Map<String, Triple<String, String, String>> objectTypeLengths;
+
+ ObjectSize() {
+ }
+
+ ObjectSize(long totalSize,
+ Map<String, Triple<String, String, String>> objectTypeLengths) {
+ this.totalSize = totalSize;
+ this.objectTypeLengths = objectTypeLengths;
+ }
+
+ long getTotalSize() {
+ return totalSize;
+ }
+
+ void setTotalSize(long totalSize) {
+ this.totalSize = totalSize;
+ }
+
+ Map<String, Triple<String, String, String>> getObjectTypeLengths() {
+ return objectTypeLengths;
+ }
+
+ void setObjectTypeLengths(Map<String, Triple<String, String, String>> objectTypeLengths) {
+ this.objectTypeLengths = objectTypeLengths;
+ }
+ }
+
+
+ static public HookUtils.ObjectSize getObjectSize(HiveConf conf,
+ Set<Entity> objects,
+ boolean loadObjects)
+ throws Exception {
+ // The objects may need to be loaded again since StatsTask is executed after
+ // the move task, and the object in the write entity may not have the size
+ long totalSize = 0;
+ Map<String, Triple<String, String, String>> objectLengths =
+ new HashMap<String, Triple<String, String, String>>();
+ Hive db = null;
+ if (loadObjects) {
+ db = Hive.get();
+ }
+
+ for (Entity object: objects) {
+ // We are computing sizes only for tables and partitions
+ Entity.Type objectType = object.getTyp();
+ Table table = null;
+ String size = null;
+ String numFiles = null;
+ Path path = null;
+
+ switch (objectType) {
+ case TABLE:
+ table = object.getTable();
+
+ if (table.isPartitioned() && !table.isView()) {
+ // If the input is a partitioned table, do not include its content summary, as data will
+ // never be read from a partitioned table, only its partitions, so it must be a metadata
+ // change to the table.
+ //
+ // However, if the table is a view, a view's partitions are not included in the inputs,
+ // so do not skip it so that we have some record of it.
+ continue;
+ }
+
+ if (loadObjects) {
+ table = db.getTable(table.getTableName());
+ }
+
+ if (table.isView()) {
+ // Views are logical, so they have no size or files
+ path = null;
+ size = "0";
+ numFiles = "0";
+ } else {
+ path = table.getPath();
+ size = table.getProperty("totalSize");
+ numFiles = table.getProperty("numFiles");
+ }
+ break;
+ case PARTITION:
+ Partition partition = object.getPartition();
+
+ if (loadObjects) {
+ partition =
+ db.getPartition(partition.getTable(), partition.getSpec(), false);
+ }
+ table = partition.getTable();
+
+ if (table.isView()) {
+ // Views are logical, so they have no size or files
+ // Currently view partitions are not included in the inputs, but this is included so
+ // that if that changes in open source, it will not cause an NPE. It should not cause
+ // any double counting as the size of the view and its partitions are both 0.
+ path = null;
+ size = "0";
+ numFiles = "0";
+ } else {
+ path = partition.getPartitionPath();
+ size = partition.getParameters().get("totalSize");
+ numFiles = partition.getParameters().get("numFiles");
+ }
+ break;
+ default:
+ // nothing to do
+ break;
+ }
+
+ // Counting needed
+ if (table != null) {
+ if (size == null) {
+ // If the size is not present in the metastore (old
+ // legacy tables), get it from hdfs
+ FileSystem fs = path.getFileSystem(conf);
+ size = String.valueOf(fs.getContentSummary(path).getLength());
+ }
+
+ if (numFiles == null) {
+ numFiles = String.valueOf(0);
+ }
+
+ Triple<String, String, String> triple =
+ new Triple<String, String, String>(
+ table.getTableType().toString(), size, numFiles);
+
+ objectLengths.put(object.getName(), triple);
+
+ // If the input/output is a external table or a view, dont add it to
+ // the total size. The managed tables/partitions for those locations
+ // should also be part of the object list passed in. It is true for
+ // inputs, whereas outputs should not external tables or views in a
+ // query. So, the totalSize may be less than the sum of all individual
+ // sizes
+ if ((table.getTableType() != TableType.EXTERNAL_TABLE) &&
+ (table.getTableType() != TableType.VIRTUAL_VIEW)) {
+ totalSize += Long.valueOf(size);
+ }
+ }
+ }
+
+ ObjectSize objectSize = new ObjectSize(totalSize, objectLengths);
+ return objectSize;
+ }
+
+ /**
+ * A helper class used to pass info from getInputInfo back to the caller.
+ */
+ public static class InputInfo {
+ long size;
+ long fileCount;
+ long directoryCount;
+ double estimatedNumSplits;
+
+ InputInfo(long size, long fileCount, long directoryCount,
+ double estimatedNumSplits) {
+ this.size = size;
+ this.fileCount = fileCount;
+ this.directoryCount = directoryCount;
+ this.estimatedNumSplits = estimatedNumSplits;
+ }
+
+ long getSize() {
+ return size;
+ }
+
+ long getFileCount() {
+ return fileCount;
+ }
+
+ long getDirectoryCount() {
+ return directoryCount;
+ }
+
+ double getEstimatedNumSplits() {
+ return estimatedNumSplits;
+ }
+ }
+
+ /**
+ * Returns the sizes of the inputs while taking sampling into account.
+ *
+ * @param inputs - entities used for the query input
+ * @param inputToCS - already known mappings from paths to content summaries.
+ * If a path is not in this mapping, it will be looked up
+ * @param conf - hadoop conf for constructing filesystem
+ * @param isThereSampling - whether the query includes sampling
+ * @param pathToTopPercentage - a mapping from the path to the highest
+ * sampled percentage. If not in the map, defaults to 100%
+ * @param nonSampledInputs - entities that are not sampled
+ * @param maxSplits - if the number of splits exceeds this number as the
+ * splits are incrementally summed, return early
+ * @param maxSize - if the size exceeds this number as the sizes are being
+ * incrementally summed, return early
+ * @return an InputInfo object about the net input
+ * @throws IOException
+ */
+
+ static public InputInfo getInputInfo(Collection<ReadEntity> inputs,
+ Map<String, ContentSummary> inputToCS, Configuration conf,
+ boolean isThereSampling, Map<String, Double> pathToTopPercentage,
+ Set<ReadEntity> nonSampledInputs,
+ long maxSplits, long maxSize) throws IOException {
+
+ double estimatedNumSplits = 0;
+ long size = 0;
+ long fileCount = 0;
+ long directoryCount = 0;
+
+ // Go over the input paths and calculate size
+ for(ReadEntity re: inputs) {
+ Path p = null;
+ switch(re.getType()) {
+ case TABLE:
+ p = re.getTable().getPath();
+ break;
+ case PARTITION:
+ p = re.getPartition().getPartitionPath();
+ break;
+ default:
+ break;
+ }
+
+ if (p != null) {
+ String pathStr = p.toString();
+ LOG.debug("Finding from cache Content Summary for " + pathStr);
+ ContentSummary cs = (inputToCS == null) ? null : inputToCS
+ .get(pathStr);
+ if (cs == null) {
+ LOG.debug("Fetch Content Summary for " + pathStr);
+ FileSystem fs = p.getFileSystem(conf);
+ cs = fs.getContentSummary(p);
+ inputToCS.put(pathStr, cs);
+ }
+
+ if (isThereSampling) {
+ // If the input is used in a map reduce task get the highest
+ // percentage to which it is sampled, otherwise, set the
+ // sampling percentage to 100
+ double samplePercentage = 100;
+ if (pathToTopPercentage.containsKey(pathStr) &&
+ !nonSampledInputs.contains(re)) {
+ samplePercentage = pathToTopPercentage.get(pathStr);
+ }
+ size += (long)(cs.getLength() * samplePercentage / 100D);
+ estimatedNumSplits += samplePercentage / 100;
+
+ if (estimatedNumSplits > maxSplits) {
+ break;
+ }
+ } else {
+ size += cs.getLength();
+ fileCount += cs.getFileCount();
+ directoryCount += cs.getDirectoryCount();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug ("Length for file: " + p.toString() + " = " + cs.getLength());
+ }
+ }
+
+ if (size > maxSize){
+ break;
+ }
+ }
+
+ return new InputInfo(size, fileCount, directoryCount,
+ estimatedNumSplits);
+ }
+
+ //Returns true approximately <percentageObj*100>% of the time
+ public static boolean rollDice(float percentage) throws Exception {
+
+ Random randGen = new Random();
+ float randVal = randGen.nextFloat();
+
+ if (percentage < 0 || percentage > 1) {
+ throw new Exception("Percentages must be >=0% and <= 100%. Got " + percentage);
+ }
+
+ if (randVal < percentage) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public static <T> T getObject(Configuration conf, String className) {
+ if ((className == null) || (className.isEmpty())) {
+ return null;
+ }
+
+ T clazz = null;
+ try {
+ clazz = (T) ReflectionUtils.newInstance(conf.getClassByName(className), conf);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ return clazz;
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobStatsHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobStatsHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobStatsHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobStatsHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.HiveStatsMetricsPublisher;
+import org.apache.hadoop.hive.ql.stats.HiveStatsMetricsPublisher.QueryTag;
+import org.apache.hadoop.mapred.Counters;
+import org.json.JSONObject;
+
+/**
+ * A hook which populates the job_stats_log MySQL table with
+ * stats for each job which has run for this query, the query ID,
+ * and whether or not the query succeeded.
+ *
+ * It also sets the query attributes in HiveStatsMetricsPublisher and logs
+ * the stats through it as well.
+ */
+public class JobStatsHook implements ExecuteWithHookContext {
+
+ public static final String HIVE_QUERY_SOURCE = "hive.query.source";
+
+ public static ConnectionUrlFactory getJobStatsUrlFactory(HiveConf conf){
+ return HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.JOBSTATS_CONNECTION_FACTORY,
+ FBHiveConf.JOBSTATS_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.JOBSTATS_HOST_DATABASE_VAR_NAME);
+ }
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+
+ assert(hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK ||
+ hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK);
+
+ String queryId = "";
+ String querySrc = "";
+ String queryTagsStr = "";
+ String statsString = "";
+ SessionState sess = SessionState.get();
+ String queryFailed = hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK ? "1"
+ : "0";
+ HiveConf conf = sess.getConf();
+ HiveStatsMetricsPublisher metricsPublisher =
+ (HiveStatsMetricsPublisher)HookUtils.getObject(conf,
+ conf.get(FBHiveConf.HIVE_METRICS_PUBLISHER));
+ if (metricsPublisher == null) {
+ return;
+ }
+
+ metricsPublisher.extractAndOverwriteQueryAttributes(hookContext);
+
+ JSONObject jobStats = new JSONObject();
+
+ ConnectionUrlFactory urlFactory = getJobStatsUrlFactory(conf);
+ if (urlFactory == null) {
+ throw new RuntimeException("DB parameters for audit_log not set!");
+ }
+
+ if (sess != null) {
+ queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
+ querySrc = conf.get(HIVE_QUERY_SOURCE, "");
+
+ List<TaskRunner> completedTasks = hookContext.getCompleteTaskList();
+ Map<String, String> jobToStageMap = new HashMap<String, String>();
+
+ if (completedTasks != null) {
+ for (TaskRunner taskRunner : completedTasks) {
+ Task<? extends Serializable> task = taskRunner.getTask();
+ // If the Job ID is null, this indicates the task is not a map
+ // reduce task, or it was run locally
+ if (task.getJobID() != null) {
+ String jobID = StringEscapeUtils.escapeJava(task.getJobID());
+ String stageID = StringEscapeUtils.escapeJava(task.getId());
+ jobToStageMap.put(jobID, stageID);
+ }
+ }
+ }
+
+ List<MapRedStats> listStats = sess.getLastMapRedStatsList();
+ if (listStats != null && listStats.size() > 0) {
+ Map[] perJobStats = new Map[listStats.size()];
+ for (int i = 0; i < listStats.size(); i++) {
+ MapRedStats mps = listStats.get(i);
+ Counters ctrs = mps.getCounters();
+ Map<String, String> counterList = new HashMap<String, String>();
+ Map<String, Double> metrics = new HashMap<String,Double>();
+
+ counterList.put("job_ID", mps.getJobId());
+
+ if (jobToStageMap.containsKey(mps.getJobId())) {
+ counterList.put("stage", jobToStageMap.get(mps.getJobId()));
+ }
+
+ addJobStat(counterList, metrics, "cpu_msec", "cpu_sec", mps.getCpuMSec(), 1000);
+ addJobStat(counterList, metrics, "map", mps.getNumMap());
+ addJobStat(counterList, metrics, "reduce", mps.getNumReduce());
+ if (ctrs != null) {
+ conditionalAddJobStat(counterList, metrics, "hdfs_read_bytes", "hdfs_read_mbytes",
+ ctrs.findCounter("FileSystemCounters", "HDFS_BYTES_READ"), 1000000);
+ conditionalAddJobStat(counterList, metrics, "hdfs_write_bytes", "hdfs_write_mbytes",
+ ctrs.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN"), 1000000);
+ conditionalAddJobStat(counterList, metrics, "hdfs_local_read_bytes",
+ "hdfs_read_local_mbytes", ctrs.findCounter("FileSystemCounters",
+ "HDFS_BYTES_READ_LOCAL"), 1000000);
+ conditionalAddJobStat(counterList, metrics, "hdfs_rack_read_bytes",
+ "hdfs_rack_read_mbytes", ctrs.findCounter("FileSystemCounters",
+ "HDFS_BYTES_READ_RACK"), 1000000);
+ conditionalAddJobStat(counterList, metrics, "hdfs_read_exceptions",
+ ctrs.findCounter("FileSystemCounters", "HDFS_READ_EXCEPTIONS"));
+ conditionalAddJobStat(counterList, metrics,
+ "hdfs_write_exceptions",
+ ctrs.findCounter("FileSystemCounters", "HDFS_WRITE_EXCEPTIONS"));
+ conditionalAddJobStat(counterList, metrics, "map_input_records",
+ "map_input_million_records",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "map_output_records",
+ "map_output_million_records",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "reduce_input_records",
+ "reduce_input_million_records",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_INPUT_RECORDS"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "reduce_output_records",
+ "reduce_output_million_records",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_OUTPUT_RECORDS"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "shuffle_bytes", "shuffle_mbytes",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_SHUFFLE_BYTES"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "map_input_bytes", "map_input_mbytes",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_BYTES"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "map_spill_cpu_msecs",
+ "map_spill_cpu_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_SPILL_CPU"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "map_spill_wallclock_msecs",
+ "map_spill_walllclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_SPILL_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "map_spill_number", "map_spill_number",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_SPILL_NUMBER"), 1);
+ conditionalAddJobStat(counterList, metrics, "map_spill_bytes", "map_spill_mbytes",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_SPILL_BYTES"),
+ 1000000);
+ conditionalAddJobStat(counterList, metrics, "map_mem_sort_cpu_msecs",
+ "map_mem_sort_cpu_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_MEM_SORT_CPU"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "map_mem_sort_wallclock_msecs",
+ "map_mem_sort_wallclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ "MAP_MEM_SORT_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "map_merge_cpu_msecs",
+ "map_merge_cpu_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_MERGE_CPU"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "map_merge_wallclock_msecs",
+ "map_merge_wallclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_MERGE_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "reduce_copy_cpu_msecs",
+ "reduce_copy_cpu_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_COPY_CPU"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "reduce_copy_wallclock_msecs",
+ "reduce_copy_wallclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_COPY_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "reduce_sort_cpu_msecs",
+ "reduce_sort_cpu_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_SORT_CPU"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "redcue_sort_wallclock_msecs",
+ "reduce_sort_wallclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_SORT_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "map_task_wallclock_msecs",
+ "map_task_wallclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_TASK_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "reduce_task_wallclock_msecs",
+ "reduce_task_wallclock_secs",
+ ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", "REDUCE_TASK_WALLCLOCK"),
+ 1000);
+ conditionalAddJobStat(counterList, metrics, "slots_millis_maps", "slots_secs_maps",
+ ctrs.findCounter("org.apache.hadoop.mapred.JobInProgress$Counter",
+ "SLOTS_MILLIS_MAPS"), 1000);
+ conditionalAddJobStat(counterList, metrics, "slots_millis_reduces",
+ "slots_secs_reduces", ctrs.findCounter(
+ "org.apache.hadoop.mapred.JobInProgress$Counter", "SLOTS_MILLIS_REDUCES"),
+ 1000);
+ }
+ addJobStat(counterList, metrics, "success", mps.isSuccess() ? 1 : 0);
+ perJobStats[i] = counterList;
+
+ metricsPublisher.publishMetricsWithQueryTags(metrics);
+ }
+ jobStats.put("per_job_stats", perJobStats);
+ }
+ }
+
+ HiveOperation op = sess.getHiveOperation();
+
+ // If input was read, log the input and output size
+ if ((op != null) &&
+ ((op.equals(HiveOperation.CREATETABLE_AS_SELECT)) ||
+ (op.equals(HiveOperation.LOAD)) ||
+ (op.equals(HiveOperation.QUERY)))) {
+
+ // We are depending on the stats to be present in the metastore.
+ // If that is not true, we might end up calling getContentSummary for
+ // all the inputs and outputs, which may create a problem for HDFS
+ // Allow the user to manually turn it off.
+ if (!conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+ if (SessionState.get().getOverriddenConfigurations().containsKey(
+ HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname)) {
+ SessionState.getConsole().printInfo("WARNING: hive.stats.autogather is set to false." +
+ " Stats were not populated for any outputs of this query. If any tables or " +
+ "partitions were overwritten as part of this query, their stats may be incorrect");
+ } else {
+ throw new RuntimeException("hive.stats.autogather is set to false");
+ }
+ }
+
+ // Log the total size and the individual sizes for each input and output
+ HookUtils.ObjectSize inputSizes =
+ HookUtils.getObjectSize(conf,
+ new HashSet<Entity>(hookContext.getInputs()),
+ false);
+ jobStats.put("input_size", String.valueOf(inputSizes.getTotalSize()));
+ if (!inputSizes.getObjectTypeLengths().isEmpty()) {
+ jobStats.put("inputs", inputSizes.getObjectTypeLengths());
+ }
+
+ // Log the pool specified in the conf. May be overwritten by the conf
+ // if we enable the feature on the JT to disallow non-standard pools.
+ String specifiedPool = conf.get("mapred.fairscheduler.pool", "");
+ if (specifiedPool.length() > 0) {
+ jobStats.put("pool", conf.get("mapred.fairscheduler.pool"));
+ }
+
+ if (hookContext.getHookType() != HookContext.HookType.ON_FAILURE_HOOK) {
+ // The object for the outputs was created before the statistics in that
+ // object was populated. So, reload the outputs from the metastore to get
+ // the size for outputs
+ HookUtils.ObjectSize outputSizes =
+ HookUtils.getObjectSize(conf,
+ new HashSet<Entity>(hookContext.getOutputs()),
+ true);
+ jobStats.put("output_size", String.valueOf(outputSizes.getTotalSize()));
+ if (!outputSizes.getObjectTypeLengths().isEmpty()) {
+ jobStats.put("outputs", outputSizes.getObjectTypeLengths());
+ }
+ }
+ }
+
+ statsString = jobStats.toString();
+
+ Set<QueryTag> queryTags = metricsPublisher.getQueryAttributes();
+ queryTagsStr = StringUtils.join(queryTags, ',');
+
+ List<Object> sqlParams = new ArrayList<Object>();
+ sqlParams.add(StringEscapeUtils.escapeJava(queryId));
+ sqlParams.add(StringEscapeUtils.escapeJava(querySrc));
+ sqlParams.add(queryFailed);
+ sqlParams.add(queryTagsStr);
+ sqlParams.add(statsString);
+
+ // Assertion at beginning of method guarantees this string will not remain empty
+ String sql = "insert into job_stats_log set queryId = ?, query_src = ?, query_failed = ?, " +
+ "query_tags = ?, job_stats = ?";
+
+ HookUtils.runInsert(conf, urlFactory, sql, sqlParams, HookUtils
+ .getSqlNumRetry(conf));
+ }
+
+ private void conditionalAddJobStat(Map<String, String> counterList, Map<String, Double> metrics,
+ String key, Counters.Counter cntr) {
+ conditionalAddJobStat(counterList, metrics, key, key, cntr, 1);
+ }
+
+ private void conditionalAddJobStat(Map<String, String> counterList, Map<String, Double> metrics,
+ String exactKey, String approximateKey, Counters.Counter cntr, int divisor) {
+ if (cntr != null) {
+ conditionalAddJobStat(counterList, metrics, exactKey,
+ approximateKey, cntr.getValue(), divisor);
+ }
+ }
+
+ private void conditionalAddJobStat(Map<String, String> counterList, Map<String, Double> metrics,
+ String exactKey, String approximateKey, long cntrValue, int divisor) {
+ if (cntrValue >= 0) {
+ addJobStat(counterList, metrics, exactKey, approximateKey, cntrValue, divisor);
+ }
+ }
+
+ private void addJobStat(Map<String, String> counterList, Map<String, Double> metrics,
+ String key, long value) {
+ addJobStat(counterList, metrics, key, key, value, 1);
+ }
+
+ // Method that adds a key value pair to a map, as well as to a list of OdsKeyValuePairs
+ // with average aggregation
+ private void addJobStat(Map<String, String> counterList, Map<String, Double> metrics,
+ String exactKey,
+ String approximatedKey, long value, int divisor) {
+ counterList.put(exactKey, String.valueOf(value));
+ metrics.put(approximatedKey, (double)value/divisor);
+ }
+}

Search Discussions

Discussion Posts

Previous

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 4 of 4 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 1, '12 at 10:53p
activeMay 1, '12 at 10:53p
posts4
users1
websitehive.apache.org

1 user in discussion

Kevinwilfong: 4 posts

People

Translate

site design / logo © 2021 Grokbase