Grokbase Groups Hive commits May 2012
FAQ
Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobTrackerHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobTrackerHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobTrackerHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/JobTrackerHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.HookUtils.InputInfo;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Implementation of a pre execute hook that decides what
+ * cluster to send a given query to based on the size of
+ * query inputs
+ *
+ * TODO: this needs to be optimized once HIVE-1507 is in
+ * place to reuse the patch->summary cache maintained in hive
+ *
+ * TODO: this encodes hadoop cluster info in code. very
+ * undesirable. Need to figure this out better (SMC?)
+ */
+public class JobTrackerHook {
+
+ static final private Log LOG = LogFactory.getLog(JobTrackerHook.class.getName());
+
+ // store the prior location of the hadoop executable. switching this doesn't
+ // matter unless we are using the 'submit via child' feature
+ private static String preHadoopBin = null;
+
+ private static String preJobTracker = null;
+
+ private static Map<String, String> savedValues = null;
+
+ public static class PreExec implements ExecuteWithHookContext {
+
+ private final String dislike = "Not choosing Bronze/Corona because ";
+
+ static final private String POOLS = "pools";
+
+ /**
+ * If the job is on an SLA pool, do not redirect this job.
+ *
+ * @return True if the pool matches an SLA pool, false otherwise
+ */
+ private boolean isOnSlaPool(HiveConf conf) {
+ String pool = conf.get("mapred.fairscheduler.pool");
+
+ // Nothing to be done if pool is not specified
+ if ((pool == null) || (pool.isEmpty())) {
+ return false;
+ }
+
+ // Make sure that SLA jobs are not redirected
+ String[] slaPoolArray =
+ conf.getStrings("mapred.jobtracker.hook.sla.pools");
+ if ((slaPoolArray == null) || (slaPoolArray.length == 0)) {
+ slaPoolArray = new String[]{"rootsla", "incrementalscraping"};
+ }
+ for (int i = 0; i < slaPoolArray.length; ++i) {
+ if (slaPoolArray[i].equals(pool)) {
+ LOG.debug("Pool " + pool + " is on an sla pool");
+ return true;
+ }
+ }
+
+ LOG.debug("Pool " + pool + " is not on an sla pool");
+ return false;
+ }
+
+ /*
+ * The user has specified a mapping table in hive.configs, which is
+ * essentially of the form: pool -> <cluster, hadoopHome, jobTracker>
+ * Since, cluster will be repeated a lot in these scenarios, the exact
+ * mapping is: cluster -> <hadoopHome, jobTracker, array of pools>
+ * Going forward, multiple clusters will be used in these mappings, once
+ * silver is broken into silver and silver2. No code changes will be
+ * required, only configuration change.
+ * @return Whether to use the cluster from the smc
+ */
+ private boolean useClusterFromSmcConfig(HiveConf conf) {
+ try {
+ String pool = conf.get("mapred.fairscheduler.pool");
+
+ // Nothing to be done if pool is not specified
+ if ((pool == null) || (pool.isEmpty())) {
+ return false;
+ }
+
+ ConnectionUrlFactory connectionUrlFactory =
+ HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.JOBTRACKER_CONNECTION_FACTORY,
+ FBHiveConf.JOBTRACKER_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.JOBTRACKER_HOST_DATABASE_VAR_NAME);
+
+ if (connectionUrlFactory == null) {
+ return false;
+ }
+
+ String s = connectionUrlFactory.getValue(conf.get(FBHiveConf.HIVE_CONFIG_TIER), POOLS);
+ if (s == null) {
+ return false;
+ }
+
+ JSONObject poolsJSON = new JSONObject(s);
+
+ Iterator<String> i = (Iterator<String>) poolsJSON.keys();
+ while(i.hasNext()) {
+
+ String clusterName = i.next();
+ JSONObject jo = (JSONObject)poolsJSON.get(clusterName);
+
+ String hadoopHome = null;
+ String jobTracker = null;
+ JSONArray poolsObj = null;
+
+ boolean isCorona = false;
+ if (jo.has("isCorona")) {
+ isCorona = jo.getBoolean("isCorona");
+ }
+
+ if (!jo.has("hadoopHome") || !jo.has("pools")) {
+ LOG.error("hadoopHome and pools need to be specified for " +
+ clusterName);
+ return false;
+ } else {
+ hadoopHome = jo.getString("hadoopHome");
+ poolsObj = (JSONArray)jo.get("pools");
+ }
+ if (!isCorona && !jo.has("jobTracker")) {
+ LOG.error(
+ "jobTracker needs to be specified for non-corona cluster " +
+ clusterName);
+ return false;
+ } else {
+ if (jo.has("jobTracker")) {
+ jobTracker = jo.getString("jobTracker");
+ }
+ }
+
+ // Do the pool match
+ for (int idx = 0; idx < poolsObj.length(); idx++) {
+ if (pool.equals(poolsObj.getString(idx))) {
+
+ LOG.info ("Run it on " + clusterName + " due to pool " + pool);
+
+ if (isCorona) {
+ // Parameters are taken from configuration.
+ runCorona(conf, hadoopHome);
+ } else {
+ // Run it on "clusterName"
+ preHadoopBin = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+ conf.setVar(HiveConf.ConfVars.HADOOPBIN,
+ hadoopHome + "/bin/hadoop");
+ preJobTracker = conf.getVar(HiveConf.ConfVars.HADOOPJT);
+ conf.setVar(HiveConf.ConfVars.HADOOPJT, jobTracker);
+ }
+
+ return true;
+ }
+ }
+ }
+
+ // Found nothing
+ return false;
+ } catch (TException e) {
+ return false;
+ } catch (JSONException e) {
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ assert(hookContext.getHookType() == HookContext.HookType.PRE_EXEC_HOOK);
+ SessionState sess = SessionState.get();
+ Set<ReadEntity> inputs = hookContext.getInputs();
+ Set<WriteEntity> outputs = hookContext.getOutputs();
+ UserGroupInformation ugi = hookContext.getUgi();
+ Map<String, ContentSummary> inputToCS = hookContext.getInputPathToContentSummary();
+
+ QueryPlan queryPlan = hookContext.getQueryPlan();
+ List<Task<? extends Serializable>> rootTasks = queryPlan.getRootTasks();
+
+ // If it is a pure DDL task,
+ if (rootTasks == null) {
+ return;
+ }
+ if (rootTasks.size() == 1) {
+ Task<? extends Serializable> tsk = rootTasks.get(0);
+ if (tsk instanceof DDLTask) {
+ return;
+ }
+ }
+
+ HiveConf conf = sess.getConf();
+
+ // In case posthook of the previous query was not triggered,
+ // we revert job tracker to clean state first.
+ if (preHadoopBin != null) {
+ conf.setVar(HiveConf.ConfVars.HADOOPBIN, preHadoopBin);
+ preHadoopBin = null;
+ }
+
+ if (preJobTracker != null) {
+ conf.setVar(HiveConf.ConfVars.HADOOPJT, preJobTracker);
+ preJobTracker = null;
+ }
+
+ // A map from a path to the highest percentage that it is sampled by a
+ // map reduce task. If any map reduce task which uses this path does not
+ // sample, this percentage is 100.
+ Map<String, Double> pathToTopPercentage = new HashMap<String, Double>();
+ // A set of inputs we know were not sampled for some task, so we should
+ // ignore any entries for them in pathToTopPercentage
+ Set<ReadEntity> nonSampledInputs = new HashSet<ReadEntity>();
+ boolean isThereSampling = false;
+ if (!hookContext.getQueryPlan().getQueryStr().toUpperCase().
+ contains(" JOIN ")) {
+ isThereSampling = HookUtils.checkForSamplingTasks(
+ hookContext.getQueryPlan().getRootTasks(),
+ pathToTopPercentage,
+ nonSampledInputs);
+ }
+
+ // if we are set on local mode execution (via user or auto) bail
+ if ("local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT))) {
+ return;
+ }
+
+ // The smc hive.configs contains a mapping of pools to the map-reduce
+ // cluster. If the user has specified a pool, and the pool belongs to one
+ // of the clusters for the smc, use that cluster
+ if (useClusterFromSmcConfig(conf)) {
+ return;
+ }
+
+ // If this is an SLA pool, bail
+ if (isOnSlaPool(conf)) {
+ return;
+ }
+
+ // check if we need to run at all
+ if (! "true".equals(conf.get("fbhive.jobtracker.auto", ""))) {
+ return;
+ }
+
+ int bronzePercentage = conf.getInt("fbhive.jobtracker.bronze.percentage",
+ 0);
+ boolean isCoronaEnabled = conf.getBoolean("fbhive.jobtracker.corona.enabled", false);
+ int coronaPercentage = 0;
+ if (isCoronaEnabled) {
+ coronaPercentage = conf.getInt("fbhive.jobtracker.corona.percentage",
+ 0);
+ }
+
+ int percents [] = {bronzePercentage, coronaPercentage};
+ int roll = rollDice(percents);
+ LOG.debug("Dice roll is " + roll);
+ boolean tryBronze = false;
+ boolean tryCorona = false;
+
+ if (roll == -1) {
+ // Don't run bronze/corona
+ LOG.info(dislike + "because the coin toss said so");
+ return;
+ } else if (roll == 0) {
+ tryBronze = true;
+ } else if (roll == 1) {
+ tryCorona = true;
+ } else {
+ throw new RuntimeException("Invalid roll! Roll was " + roll);
+ }
+
+ int maxGigaBytes = conf.getInt("fbhive.jobtracker.bronze.maxGigaBytes", 0);
+ if (maxGigaBytes == 0) {
+ LOG.info (dislike + "maxGigaBytes = 0");
+ return;
+ }
+ long maxBytes = maxGigaBytes * 1024L * 1024 * 1024;
+
+ if (maxGigaBytes < 0) {
+ LOG.warn (dislike + "maxGigaBytes value of " + maxGigaBytes + "is invalid");
+ return;
+ }
+
+ String bronzeHadoopHome = conf.get("fbhive.jobtracker.bronze.hadoopHome",
+ "/mnt/vol/hive/sites/bronze/hadoop");
+
+ String bronzeJobTracker = conf.get("fbhive.jobtracker.bronze.tracker",
+ conf.get(FBHiveConf.FBHIVE_BRONZE_JOBTRACKER));
+
+ // assuming we are using combinehiveinputformat - we know the # of splits will _at least_
+ // be >= number of partitions/tables. by indicating the max input size - the
+ // admin is also signalling the max # of splits (maxGig*1000/256MB). So we limit the number
+ // of partitions to the max # of splits.
+
+ int maxSplits = conf.getInt("fbhive.jobtracker.bronze.maxPartitions", maxGigaBytes * 4);
+
+ if (!isThereSampling && inputs.size() > maxSplits) {
+ LOG.info (dislike + "number of input tables/partitions: " + inputs.size() +
+ " exceeded max splits: " + maxSplits);
+ return;
+ }
+
+ if (conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS) > maxSplits) {
+ LOG.info (dislike + "number of reducers: "
+ + conf.getVar(HiveConf.ConfVars.HADOOPNUMREDUCERS)
+ + " exceeded max reducers: " + maxSplits);
+ return;
+ }
+
+ InputInfo info = HookUtils.getInputInfo(inputs, inputToCS, conf,
+ isThereSampling, pathToTopPercentage, nonSampledInputs, maxSplits,
+ maxBytes);
+
+ if (info.getEstimatedNumSplits() > maxSplits) {
+ LOG.info (dislike + "the estimated number of input " +
+ "tables/partitions exceeded max splits: " + maxSplits);
+ return;
+ }
+
+ if (info.getSize() > maxBytes) {
+ LOG.info (dislike + "input length of " + info.getSize() +
+ " is more than " + maxBytes);
+ return;
+ }
+
+ // we have met all the conditions to switch to bronze/corona cluster
+
+ if (tryBronze) {
+ // Run it on Bronze
+ preHadoopBin = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+ conf.setVar(HiveConf.ConfVars.HADOOPBIN, bronzeHadoopHome +
+ "/bin/hadoop");
+ preJobTracker = conf.getVar(HiveConf.ConfVars.HADOOPJT);
+ conf.setVar(HiveConf.ConfVars.HADOOPJT, bronzeJobTracker);
+ } else if (tryCorona){
+ String coronaHadoopHome = conf.get(
+ "fbhive.jobtracker.corona.hadoopHome",
+ "/mnt/vol/hive/sites/corona/hadoop");
+ runCorona(conf, coronaHadoopHome);
+ }
+ }
+
+ private void runCorona(HiveConf conf, String hadoopHome) {
+ // Run it on Corona
+ preHadoopBin = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
+ conf.setVar(HiveConf.ConfVars.HADOOPBIN, hadoopHome + "/bin/hadoop");
+ // No need to set the JT as it's done through the conf
+ Configuration coronaConf = new Configuration(false);
+ // Read the configuration, save old values, replace with new ones
+ coronaConf.addResource("mapred-site-corona.xml");
+ savedValues = new HashMap<String, String>();
+ for (Entry<String, String> e : coronaConf) {
+ String key = e.getKey();
+ String value = e.getValue();
+ LOG.debug("Saving " + key + "(" + conf.get(key) + ")");
+ savedValues.put(key, conf.get(key));
+ LOG.debug("Setting " + key + "(" + key + ")");
+ conf.set(key, value);
+ }
+ }
+ }
+
+ /**
+ * Randomly picks an index with chance that is indicated by the value in
+ * percentages. Returns -1 for the remaining percentage
+ *
+ * E.g. [60, 20] will return 0 (60% of the time) and 1 (20% of the time) and
+ * -1 (20% of the time)
+ * @param percentages
+ * @return
+ */
+ private static int rollDice(int [] percentages) {
+
+ Random randGen = new Random();
+ int randVal = randGen.nextInt(100) + 1;
+
+ // Make sure that percentages add up to <= 100%
+ int sum = 0;
+ for (int i=0; i < percentages.length; i++) {
+ sum += percentages[i];
+ if (percentages[i] < 0) {
+ throw new RuntimeException("Percentages must be >=0. Got " +
+ percentages[i]);
+ }
+ }
+ if (sum > 100) {
+ throw new RuntimeException("Percentages add up to > 100!");
+ }
+
+ for (int i=0; i < percentages.length; i++) {
+ if (randVal <= percentages[i]) {
+ return i;
+ }
+ randVal = randVal - percentages[i];
+ }
+ return -1;
+ }
+
+ public static class PostExec implements ExecuteWithHookContext {
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ assert(hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK);
+ SessionState ss = SessionState.get();
+ Set<ReadEntity> inputs = hookContext.getInputs();
+ Set<WriteEntity> outputs = hookContext.getOutputs();
+ LineageInfo linfo = hookContext.getLinfo();
+ UserGroupInformation ugi = hookContext.getUgi();
+ this.run(ss,inputs,outputs,linfo,ugi);
+ }
+
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, LineageInfo lInfo,
+ UserGroupInformation ugi) throws Exception {
+ HiveConf conf = sess.getConf();
+
+ if (preHadoopBin != null) {
+ conf.setVar(HiveConf.ConfVars.HADOOPBIN, preHadoopBin);
+ preHadoopBin = null;
+ }
+
+ if (preJobTracker != null) {
+ conf.setVar(HiveConf.ConfVars.HADOOPJT, preJobTracker);
+ preJobTracker = null;
+ }
+
+ // Restore values set for Corona
+ if (savedValues != null) {
+ for (Entry<String,String> e : savedValues.entrySet()) {
+ String key = e.getKey();
+ String value = e.getValue();
+ LOG.debug("Restoring " + key + "(" + value + ")");
+ if (value != null) {
+ conf.set(key, value);
+ } else {
+ conf.set(key, "");
+ }
+ }
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/LineageHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/LineageHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/LineageHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/LineageHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+/**
+ * Implementation of a post execute hook that simply prints out its parameters
+ * to standard output.
+ */
+public class LineageHook implements PostExecute {
+
+ ConnectionUrlFactory urlFactory = null;
+ public LineageHook() throws Exception {
+ HiveConf conf = new HiveConf(LineageHook.class);
+ urlFactory = HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.LINEAGE_CONNECTION_FACTORY,
+ FBHiveConf.LINEAGE_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.LINEAGE_HOST_DATABASE_VAR_NAME);
+ }
+
+ @Override
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, LineageInfo linfo,
+ UserGroupInformation ugi) throws Exception {
+
+ HiveConf conf = sess.getConf();
+
+ if (linfo != null) {
+
+ Iterator<Map.Entry<DependencyKey, Dependency>> iter = linfo.entrySet().iterator();
+ while(iter.hasNext()) {
+ Map.Entry<DependencyKey, Dependency> it = iter.next();
+ Dependency dep = it.getValue();
+ DependencyKey depK = it.getKey();
+
+ /**
+ * Generate json values of the following format:
+ *
+ * {"value": {
+ * "type":"SIMPLE",
+ * "baseCols":[{
+ * "column":{
+ * "name":"col",
+ * "comment":"from serde",
+ * "type":"array<string>"
+ * },
+ * "tabAlias":{
+ * "alias":"athusoo_tmp",
+ * "table":{
+ * "dbName":"default",
+ * "tableName":"athusoo_tmp"
+ * }
+ * }
+ * }]
+ * },
+ * "key":{
+ * "fieldSchema":{
+ * "name":"col",
+ * "comment":"from deserializer",
+ * "type":"array<string>"
+ * },
+ * "dataContainer":{
+ * "isPartition":false,
+ * "table":{
+ * "dbName":"default",
+ * "tableName":"athusoo_tmp2"
+ * }
+ * }
+ * }
+ *}
+ */
+ JSONObject out_json = new JSONObject();
+ JSONObject depk_json = new JSONObject();
+ JSONObject field_json = new JSONObject();
+
+ field_json.put("name", depK.getFieldSchema().getName());
+ field_json.put("type", depK.getFieldSchema().getType());
+ field_json.put("comment", depK.getFieldSchema().getComment());
+ depk_json.put("fieldSchema", field_json);
+
+ JSONObject dc_json = new JSONObject();
+ dc_json.put("isPartition", depK.getDataContainer().isPartition());
+ JSONObject tab_json = new JSONObject();
+ if (depK.getDataContainer().isPartition()) {
+ JSONObject part_json = new JSONObject();
+ Partition part = depK.getDataContainer().getPartition();
+ part_json.put("values", part.getValues());
+
+ tab_json.put("tableName", depK.getDataContainer().getTable().getTableName());
+ tab_json.put("dbName", depK.getDataContainer().getTable().getDbName());
+ JSONArray fs_array = new JSONArray();
+ for (FieldSchema fs : depK.getDataContainer().getTable().getPartitionKeys()) {
+ field_json = new JSONObject();
+ field_json.put("name", fs.getName());
+ field_json.put("type", fs.getType());
+ field_json.put("comment", fs.getComment());
+
+ fs_array.put(field_json);
+ }
+ tab_json.put("partitionKeys", fs_array);
+ part_json.put("table", tab_json);
+ dc_json.put("partition", part_json);
+ }
+ else {
+ tab_json.put("tableName", depK.getDataContainer().getTable().getTableName());
+ tab_json.put("dbName", depK.getDataContainer().getTable().getDbName());
+ dc_json.put("table", tab_json);
+ }
+ depk_json.put("dataContainer", dc_json);
+ out_json.put("key", depk_json);
+
+ JSONObject dep_json = new JSONObject();
+ dep_json.put("type", dep.getType().toString());
+ dep_json.put("expr", dep.getExpr());
+ JSONArray basecol_array = new JSONArray();
+ for(BaseColumnInfo col: dep.getBaseCols()) {
+ JSONObject col_json = new JSONObject();
+
+ field_json = new JSONObject();
+ // A column can be null in the case of aggregations like count(1)
+ // where the value is dependent on the entire row.
+ if (col.getColumn() != null) {
+ field_json.put("name", col.getColumn().getName());
+ field_json.put("type", col.getColumn().getType());
+ field_json.put("comment", col.getColumn().getComment());
+ }
+ col_json.put("column", field_json);
+
+ JSONObject tabAlias_json = new JSONObject();
+ tabAlias_json.put("alias", col.getTabAlias().getAlias());
+
+ tab_json = new JSONObject();
+ tab_json.put("tableName", col.getTabAlias().getTable().getTableName());
+ tab_json.put("dbName", col.getTabAlias().getTable().getDbName());
+
+ tabAlias_json.put("table", tab_json);
+ col_json.put("tabAlias", tabAlias_json);
+ basecol_array.put(col_json);
+ }
+ dep_json.put("baseCols", basecol_array);
+ out_json.put("value", dep_json);
+
+ ArrayList<Object> sqlParams = new ArrayList<Object>();
+ sqlParams.add(StringEscapeUtils.escapeJava(out_json.toString()));
+ String sql = "insert into lineage_log set info = ?";
+
+ HookUtils.runInsert(conf, urlFactory, sql, sqlParams, HookUtils
+ .getSqlNumRetry(conf));
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/Pair.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/Pair.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/Pair.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/Pair.java Tue May 1 22:52:38 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+/**
+ * A generic class for pairs.
+ * @param <T1>
+ * @param <T2>
+ */
+public class Pair<T1, T2>
+{
+ protected T1 first = null;
+ protected T2 second = null;
+
+ /**
+ * Default constructor.
+ */
+ public Pair()
+ {
+ }
+
+ /**
+ * Constructor
+ * @param a operand
+ * @param b operand
+ */
+ public Pair(T1 a, T2 b)
+ {
+ this.first = a;
+ this.second = b;
+ }
+
+ /**
+ * Replace the first element of the pair.
+ * @param a operand
+ */
+ public void setFirst(T1 a)
+ {
+ this.first = a;
+ }
+
+ /**
+ * Replace the second element of the pair.
+ * @param b operand
+ */
+ public void setSecond(T2 b)
+ {
+ this.second = b;
+ }
+
+ /**
+ * Return the first element stored in the pair.
+ * @return T1
+ */
+ public T1 getFirst()
+ {
+ return first;
+ }
+
+ /**
+ * Return the second element stored in the pair.
+ * @return T2
+ */
+ public T2 getSecond()
+ {
+ return second;
+ }
+
+ private boolean equals(Object x, Object y)
+ {
+ return (x == null && y == null) || (x != null && x.equals(y));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object other)
+ {
+ return other instanceof Pair && equals(first, ((Pair)other).first) &&
+ equals(second, ((Pair)other).second);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" + getFirst() + "," + getSecond() + "}";
+ }
+}
+

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/PyRulesHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/PyRulesHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/PyRulesHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/PyRulesHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Reader;
+import java.io.File;
+import java.io.FileReader;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * This hook executes python code to update the configuration in
+ * the ContextHook using java scripting abstraction.
+ *
+ * fbhive.pyrules.property has the path of the python file that is to
+ * be executed
+ * Python code has to define a method updateConf that accepts hookContext
+ * as a parameter
+ *
+ * Python code has to also provide a revertConf method that accepts hookContext
+ * and the old configuration object and reverts the changes made to the
+ * configuration in the updateConf
+ */
+public class PyRulesHook implements ExecuteWithHookContext {
+
+ static final private Log LOG = LogFactory.getLog(PyRulesHook.class);
+ static private HiveConf savedConf = null;
+ @Override
+
+ public void run(HookContext hookContext) throws Exception {
+ HiveConf conf = hookContext.getConf();
+ PyRulesHook.savedConf = new HiveConf(conf);
+ ScriptEngine pythonMod = getPythonModifier(hookContext);
+ if (pythonMod == null) {
+ return;
+ }
+ conf.setBoolean("fbhive.pyrules.modified", true);
+ try {
+ pythonMod.put("hookContext", hookContext);
+ pythonMod.eval("updateConf(hookContext)");
+ } catch (Exception ex) {
+ LOG.error("Error updating the conf", ex);
+ }
+ }
+
+ private static ScriptEngine getPythonModifier(HookContext hookContext)
+ throws Exception {
+ String pyFilePath = hookContext.getConf().get("fbhive.pyrules.file");
+ if (pyFilePath == null)
+ return null;
+
+ File pyFile = new File(pyFilePath);
+ if (!pyFile.exists()) {
+ LOG.warn("The python conf file " + pyFile + " does not exist");
+ return null;
+ }
+
+ Reader reader = new FileReader(pyFile);
+ try {
+ ScriptEngine eng = new ScriptEngineManager().getEngineByName("python");
+ if (eng == null) {
+ LOG.warn("Could not initialize jython engine");
+ return null;
+ }
+ eng.eval(reader);
+
+ return eng;
+ } catch (Exception ex) {
+ LOG.warn("Error updating the conf using python hook", ex);
+ return null;
+ }
+ }
+ public static class CleanupHook implements ExecuteWithHookContext {
+ public void run(HookContext hookContext) throws Exception {
+ if (!hookContext.getConf().getBoolean("fbhive.pyrules.modified", false)) {
+ return;
+ } else {
+ try {
+ ScriptEngine pythonRevert = getPythonModifier(hookContext);
+ pythonRevert.put("hookContext", hookContext);
+ pythonRevert.put("oldConf", PyRulesHook.savedConf);
+ pythonRevert.eval("revertConf(hookContext, oldConf)");
+ } catch (Exception ex) {
+ LOG.error("Error reverting config", ex);
+ }
+ }
+ }
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryDroppedPartitionsHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryDroppedPartitionsHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryDroppedPartitionsHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryDroppedPartitionsHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A hook which is used to prevent people from querying dropped partitions in
+ * silver. The list of wrongly dropped partitions is in cdb.datahawk - if the
+ * query uses any of these partitions, it should fail.
+ */
+public class QueryDroppedPartitionsHook implements ExecuteWithHookContext {
+ static final private Log LOG =
+ LogFactory.getLog(QueryDroppedPartitionsHook.class);
+
+// private static final String SMC_DATABASE_NAME = "cdb.datahawk";
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+
+ assert(hookContext.getHookType() == HookContext.HookType.PRE_EXEC_HOOK);
+ SessionState sess = SessionState.get();
+ HiveConf conf = sess.getConf();
+ String commandType = StringEscapeUtils.escapeJava(sess.getCommandType());
+
+ // Only check for queries
+ if ((commandType == null) ||
+ (!commandType.equals("QUERY") &&
+ !commandType.equals("CREATETABLE_AS_SELECT"))) {
+ return;
+ }
+
+ Set<ReadEntity> inputs = hookContext.getInputs();
+
+ // Nothing to check
+ if ((inputs == null) || (inputs.isEmpty())) {
+ return;
+ }
+
+ String inputString = getInputs(inputs);
+ if ((inputString == null) || (inputString.isEmpty())) {
+ return;
+ }
+
+ ConnectionUrlFactory urlFactory =
+ HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.QUERYDROPPED_PARTITIONS_CONNECTION_FACTORY,
+ FBHiveConf.QUERYDROPPED_PARTITIONS_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.QUERYPLAN_HOST_DATABASE_VAR_NAME);
+
+ // Return silently if you cannot connect for some reason
+ if ((FBHiveConf.QUERYDROPPED_PARTITIONS_MYSQL_TIER_VAR_NAME == null) ||
+ FBHiveConf.QUERYDROPPED_PARTITIONS_MYSQL_TIER_VAR_NAME.isEmpty()) {
+ LOG.warn(FBHiveConf.QUERYPLAN_MYSQL_TIER_VAR_NAME + " is null");
+ return;
+ }
+
+ if (urlFactory == null) {
+ LOG.warn("unable to access " + conf.get(FBHiveConf.QUERYPLAN_MYSQL_TIER_VAR_NAME));
+ return;
+ }
+
+ List<Object> sqlParams = new ArrayList<Object>();
+ sqlParams.add(inputString);
+
+ LOG.info("QueryDroppedPartitionsHook input string: " + inputString);
+
+ // Does the query reference a dropped partition
+ String sql = "select count(*) from 0114_dropped_parts3 " +
+ "where (recovered is null or recovered != 1) and ?";
+
+ List<List<Object>> result =
+ HookUtils.runInsertSelect(conf, urlFactory, sql, sqlParams, false);
+
+ Long numberDroppedPartitions = null;
+
+ if (!result.isEmpty() && result.get(0).get(0) != null) {
+ numberDroppedPartitions = (Long)result.get(0).get(0);
+ }
+
+ if ((numberDroppedPartitions != null) &&
+ (numberDroppedPartitions > 0)) {
+ String exception = "You cannot select from " + inputString + ".";
+ exception += "Look at ";
+ exception +=
+ "https://our.intern.facebook.com/intern/sevmanager/prod/sev/137261279725248";
+ exception += " for details ";
+ throw new Exception(exception);
+ }
+
+ }
+
+ private String getInputs(Set<ReadEntity> inputs) {
+ StringBuilder sb = new StringBuilder();
+
+ Map<String, Set<String>> inputMap = new HashMap<String, Set<String>>();
+
+ for (ReadEntity input : inputs) {
+ Partition inputPartition = input.getP();
+ if (inputPartition == null) {
+ continue;
+ }
+
+ if (!inputMap.containsKey(inputPartition.getTable().getTableName())) {
+ inputMap.put(inputPartition.getTable().getTableName(), new HashSet<String>());
+ }
+ inputMap.get(
+ inputPartition.getTable().getTableName()).add(inputPartition.getName().split("/")[0]);
+ }
+
+ if (inputMap.isEmpty()) {
+ return "";
+ }
+
+ sb.append("(");
+ boolean firstTable = true;
+
+ for (Entry<String, Set<String>> entry : inputMap.entrySet()) {
+ if (!firstTable) {
+ sb.append(" OR ");
+ } else {
+ firstTable = false;
+ }
+
+ sb.append("(table_name = '" + entry.getKey() + "' AND ds IN (");
+
+ boolean firstPartition = true;
+ for (String part : entry.getValue()) {
+ if (!firstPartition) {
+ sb.append(", ");
+ } else {
+ firstPartition = false;
+ }
+
+ sb.append("'" + part + "'");
+ }
+ sb.append("))");
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryPlanHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryPlanHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryPlanHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/QueryPlanHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.json.JSONObject;
+
+/**
+ * A hook which populates the query_plan_log MySQL table with
+ * the query plan for the query. The query plan is recorded as a json string.
+ * the stats through it as well.
+ */
+public class QueryPlanHook implements ExecuteWithHookContext {
+
+ private ConnectionUrlFactory urlFactory = null;
+ private HiveConf conf;
+
+ public static ConnectionUrlFactory getQueryPlanUrlFactory(HiveConf conf) {
+ return HookUtils.getUrlFactory(
+ conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.QUERYPLAN_CONNECTION_FACTORY,
+ FBHiveConf.QUERYPLAN_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.QUERYPLAN_HOST_DATABASE_VAR_NAME);
+ }
+
+ public QueryPlanHook() throws Exception {
+ conf = new HiveConf(QueryPlanHook.class);
+ }
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+
+ assert(hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK);
+
+ String queryId = "";
+ SessionState sess = SessionState.get();
+
+ if (sess != null) {
+ conf = sess.getConf();
+ queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
+ HiveOperation op = sess.getHiveOperation();
+
+ // No need to log for DDLs
+ if ((op == null) ||
+ ((!op.equals(HiveOperation.CREATETABLE_AS_SELECT)) &&
+ (!op.equals(HiveOperation.CREATEVIEW)) &&
+ (!op.equals(HiveOperation.LOAD)) &&
+ (!op.equals(HiveOperation.QUERY)))) {
+ return;
+ }
+ }
+ // QueryId not present - nothing to do
+ else {
+ return;
+ }
+
+ // Get the list of root tasks
+ List<Task<? extends Serializable>> rootTasks = hookContext.getQueryPlan().getRootTasks();
+ if ((rootTasks == null) || (rootTasks.isEmpty())) {
+ return;
+ }
+
+ ExplainWork explainWork = new ExplainWork(null, rootTasks, null, false, true);
+ JSONObject queryPlan = ExplainTask.getJSONPlan(null, explainWork);
+
+ List<Object> sqlParams = new ArrayList<Object>();
+ sqlParams.add(StringEscapeUtils.escapeJava(queryId));
+ sqlParams.add(StringEscapeUtils.escapeJava(queryPlan.toString()));
+
+ // Assertion at beginning of method guarantees this string will not remain empty
+ String sql = "insert into query_plan_log set queryId = ?, queryPlan = ?";
+ if (urlFactory == null) {
+ urlFactory = getQueryPlanUrlFactory(conf);
+ if (urlFactory == null) {
+ throw new RuntimeException("DB parameters not set!");
+ }
+ }
+
+ HookUtils.runInsert(conf, urlFactory, sql, sqlParams, HookUtils
+ .getSqlNumRetry(conf));
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/RegressionTestHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/RegressionTestHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/RegressionTestHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/RegressionTestHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Set;
+import java.util.ArrayList;
+import java.sql.Connection;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.hooks.PostExecute;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Implementation of a post execute hook that prints out some more information
+ * to console to allow regression tests to check correctness.
+ */
+public class RegressionTestHook implements PostExecute {
+ static final private Log LOG = LogFactory
+ .getLog("hive.ql.hooks.RegressionTestHook");
+
+ final static String REGRESSION_TEST_PRINT_SWITCH_VAR_NAME = "fbhive.regressiontesthook.swtich";
+
+ public RegressionTestHook() throws Exception {
+ }
+
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, LineageInfo lInfo,
+ UserGroupInformation ugi) throws Exception {
+ HiveConf conf = sess.getConf();
+
+ String hookSwitch = conf.get(REGRESSION_TEST_PRINT_SWITCH_VAR_NAME, "");
+
+ if (!hookSwitch.equals("1")) {
+ return;
+ }
+
+ String inputStr = "";
+
+ if (inputs != null) {
+ StringBuilder inputsSB = new StringBuilder();
+
+ boolean first = true;
+
+ for (ReadEntity inp : inputs) {
+ if (!first)
+ inputsSB.append(",");
+ first = false;
+ inputsSB.append(inp.toString());
+ }
+ inputStr = StringEscapeUtils.escapeJava(inputsSB.toString());
+ }
+
+ String outputStr = "";
+
+ if (outputs != null) {
+ StringBuilder outputsSB = new StringBuilder();
+
+ boolean first = true;
+
+ for (WriteEntity o : outputs) {
+ if (!first)
+ outputsSB.append(",");
+ first = false;
+ outputsSB.append(o.toString());
+ }
+ outputStr = StringEscapeUtils.escapeJava(outputsSB.toString());
+ }
+
+ String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
+
+ System.out
+ .println("++++++++++Regression Test Hook Output Start+++++++++");
+
+ System.out.println("+++queryId:" + queryId);
+ System.out.println("+++input:" + inputStr);
+ System.out.println("+++output:" + outputStr);
+ System.out
+ .println("++++++++++Regression Test Hook Output End+++++++++");
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ReplicationHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ReplicationHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ReplicationHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/ReplicationHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.json.JSONObject;
+
+
+/**
+ * Implementation of a post execute hook that checks whether
+ * a partition is archived or not and also sets that query
+ * time for the partition.
+ */
+public class ReplicationHook extends BaseReplicationHook implements ExecuteWithHookContext {
+
+ static final private Log LOG = LogFactory.getLog(ReplicationHook.class.getName());
+
+ private HiveConf conf;
+
+ public ReplicationHook() throws Exception {
+ super();
+ conf = new HiveConf(this.getClass());
+ }
+
+ /**
+ * Set this replication hook's hive configuration.
+ * Expose this as a public function in case run() cannot get the HiveConf
+ * from the session, e.g., if ReplicationHook is not called after a CLI query.
+ * @param conf the configuration to use
+ */
+ public void setHiveConf(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, LineageInfo lInfo, UserGroupInformation ugi)
+ throws Exception {
+ run(sess, inputs, outputs, lInfo, ugi, null, HookContext.HookType.POST_EXEC_HOOK);
+ }
+
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, LineageInfo lInfo, UserGroupInformation ugi,
+ List<TaskRunner> completedTasks, HookContext.HookType hookType)
+ throws Exception {
+
+ assert(hookType == HookContext.HookType.POST_EXEC_HOOK ||
+ hookType == HookContext.HookType.ON_FAILURE_HOOK);
+
+ String command = "";
+ String commandType = "";
+ String user_info = "";
+ String inputStr = "";
+ String outputStr = "";
+ String queryId = "";
+ String querySrc = "";
+ String startTimeStr = "";
+ String packageName = "";
+
+ if (sess != null) {
+ command = StringEscapeUtils.escapeJava(sess.getCmd());
+ commandType = StringEscapeUtils.escapeJava(sess.getCommandType());
+ setHiveConf(sess.getConf());
+ queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
+
+ querySrc = conf.get(JobStatsHook.HIVE_QUERY_SOURCE, "");
+ packageName = conf.get(FBHiveConf.FB_CURRENT_CLUSTER);
+ }
+
+ if (ugi != null) {
+ user_info = StringEscapeUtils.escapeJava(ugi.getUserName());
+ }
+
+ if (inputs != null) {
+ inputStr = entitiesToString(inputs);
+ }
+
+ if (outputs != null) {
+ outputStr = entitiesToString(outputs);
+ }
+
+ // Retrieve the time the Driver.run method started from the PerfLogger, as this corresponds
+ // to approximately the time when the query started to be processed, and format it.
+ // If, some how, this time was not set, it will default to 0000-00-00 00:00:00 in the db.
+ Long startTimeMillis = PerfLogger.getPerfLogger().getStartTime(PerfLogger.DRIVER_RUN);
+ if (startTimeMillis != null) {
+ Date startTime = new Date(startTimeMillis.longValue());
+ startTimeStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(startTime);
+ } else {
+ LOG.error("Start time was null in ReplicationHook");
+ }
+
+ ArrayList<Object> sqlParams = new ArrayList<Object>();
+ sqlParams.add(StringEscapeUtils.escapeJava(command));
+ sqlParams.add(StringEscapeUtils.escapeJava(commandType));
+ sqlParams.add(StringEscapeUtils.escapeJava(inputStr));
+ sqlParams.add(outputStr);
+ sqlParams.add(StringEscapeUtils.escapeJava(queryId));
+ sqlParams.add(StringEscapeUtils.escapeJava(user_info));
+ sqlParams.add(StringEscapeUtils.escapeJava(querySrc));
+ sqlParams.add(startTimeStr);
+ sqlParams.add(packageName);
+
+ // Assertion at beginning of method guarantees this string will remain empty
+ String sql = "";
+ if (hookType == HookContext.HookType.POST_EXEC_HOOK) {
+ sql = "insert into snc1_command_log set command = ?, command_type = ?, inputs = ?, " +
+ "outputs = ?, queryId = ?, user_info = ?, query_src = ?, start_time = ?, " +
+ "package_name = ?";
+ } else if (hookType == HookContext.HookType.ON_FAILURE_HOOK) {
+
+ List<String> errors = ((CachingPrintStream)sess.err).getOutput();
+ String localErrorString = "";
+ if (!errors.isEmpty()) {
+ JSONObject localErrorObj = new JSONObject();
+ localErrorObj.put("localErrors", errors);
+ localErrorString = localErrorObj.toString();
+ }
+
+ sqlParams.add(localErrorString);
+
+ sql = "insert into snc1_failed_command_log set command = ?, command_type = ?, inputs = ?, " +
+ "outputs = ?, queryId = ?, user_info = ?, query_src = ?, start_time = ?, " +
+ "package_name = ?, local_errors = ?";
+ }
+ HookUtils.runInsert(conf, urlFactory, sql, sqlParams, HookUtils
+ .getSqlNumRetry(conf));
+ }
+
+@Override
+ public void run(HookContext hookContext) throws Exception {
+ SessionState ss = SessionState.get();
+ Set<ReadEntity> inputs = hookContext.getInputs();
+ Set<WriteEntity> outputs = hookContext.getOutputs();
+ LineageInfo linfo = hookContext.getLinfo();
+ UserGroupInformation ugi = hookContext.getUgi();
+ this.run(ss, inputs, outputs, linfo, ugi,
+ hookContext.getCompleteTaskList(), hookContext.getHookType());
+ }
+
+ public static String entitiesToString(Set<? extends Serializable> entities) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ boolean first = true;
+
+ for (Serializable o : entities) {
+ if (!first) {
+ stringBuilder.append(",");
+ }
+ first = false;
+ stringBuilder.append(o.toString());
+ }
+ return stringBuilder.toString();
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SMCStatsDBHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SMCStatsDBHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SMCStatsDBHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SMCStatsDBHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+
+public class SMCStatsDBHook extends AbstractSemanticAnalyzerHook {
+
+ @Override
+ public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast) {
+ HiveConf conf;
+ try {
+ conf = (HiveConf) context.getConf();
+ } catch (ClassCastException e) {
+ // Statistics won't be collected for this query,
+ // warning about it will be supplied later, by JDBCStatsPublisher
+ return ast;
+ }
+ ConnectionUrlFactory urlFactory =
+ HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.STATS_CONNECTION_FACTORY,
+ FBHiveConf.STATS_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.STATS_HOST_DATABASE_VAR_NAME);
+ String databaseHostName;
+
+ try {
+ databaseHostName = urlFactory.getUrl();
+ } catch (Exception e) {
+ // Statistics won't be collected for this query,
+ // warning about it will be supplied later, by JDBCStatsPublisher
+ return ast;
+ }
+
+ conf.setVar(
+ HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING,
+ getUpdatedConnectionString(conf.getVar(HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING),
+ databaseHostName));
+ return ast;
+ }
+
+ // default visibility for the sake of TestSMCStatsDBHook
+ String getUpdatedConnectionString(String initialConnectionString, String addressFromSMC) {
+ return initialConnectionString.replaceAll("jdbc.*\\?", addressFromSMC);
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SampleConcurrencyHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SampleConcurrencyHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SampleConcurrencyHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SampleConcurrencyHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Implementation of a compile time hook to enable concurrency for a subset
+ * of queries
+ */
+public class SampleConcurrencyHook extends AbstractSemanticAnalyzerHook {
+
+
+ // Set concurrency for a sample of the queries
+ @Override
+ public ASTNode preAnalyze(
+ HiveSemanticAnalyzerHookContext context,
+ ASTNode ast) throws SemanticException {
+ HiveSemanticAnalyzerHookContextImpl ctx = (HiveSemanticAnalyzerHookContextImpl)context;
+ HiveConf conf = (HiveConf)ctx.getConf();
+
+ // If concurrency is disabled, nothing to do
+ boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ if (!supportConcurrency) {
+ return ast;
+ }
+
+ // Do nothing is the statement is show locks
+ if (ast.getToken().getType() == HiveParser.TOK_SHOWLOCKS) {
+ return ast;
+ }
+
+ //
+ // based on sample rate, decide whether to gather stats
+ //
+ float pubPercent = conf.getFloat(FBHiveConf.ENABLE_PARTIAL_CONCURRENCY, 0);
+
+ try {
+ if (!HookUtils.rollDice(pubPercent)) {
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ return ast;
+ }
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ return ast;
+ }
+
+ // Nothing to do
+ @Override
+ public void postAnalyze(
+ HiveSemanticAnalyzerHookContext context,
+ List<Task<? extends Serializable>> rootTasks) throws SemanticException {
+ // no nothing
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigDriverRunHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigDriverRunHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigDriverRunHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigDriverRunHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.HiveDriverRunHook;
+import org.apache.hadoop.hive.ql.HiveDriverRunHookContext;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+/**
+ * Retrieves and sets Hive config key/values based on a config stored in the
+ * properties of an SMC tier. This is useful for quick changes to the config
+ * that should apply to a particular package of Hive. (e.g. silver.trunk). The
+ * advantage over a XML file edit is that it's persistent between hotfixes and
+ * we have a history of what changes were made. But since this is a hook that
+ * runs at the very beginning of the Driver.run method, before compilation,
+ * it should be able to effectively change most values that affect query
+ * processing and execution.
+ *
+ * The configs are supposed to be stored in the properties of an SMC tier. The
+ * name of the property corresponds to the hive package. The value of the
+ * property is a JSON object that holds 1) an enabled field that controls
+ * whether the key-value pairs should be applied 2) a config field that holds
+ * an array of Objects
+ *
+ * (Property)hivePackageName -> {enabled : boolean,
+ * configs : [
+ * {key : key1,
+ * value : value1,
+ * percentage : 50,
+ * enforce : true
+ * },
+ * {key : key2,
+ * value : value2
+ * }, ...
+ * ]
+ * }
+ *
+ * The key is the config variables key, value is the config variables value,
+ * percentage is optional, if set, the change will only be applied to
+ * approximately that percentage of queries, and enforce is also optional, if
+ * true, even if the user explicitely set this config variable, it will be
+ * overwritten.
+ *
+ * The primary application of this hook is to modify the behavior of Hive clients dynamically,
+ * without a push, and for incremental rollouts of config changes. E.g. if a feature is broken and
+ * can be turned off using a config variable, this hook can be used to turn it off without rolling
+ * back the push. Also, if there is a change and we are not sure how it will perform at scale and
+ * it can be controlled via a config, we can turn it on for increasing percentages of users using
+ * this hook.
+ */
+public class SmcConfigDriverRunHook extends AbstractSmcConfigHook implements HiveDriverRunHook {
+
+ static final private Log LOG = LogFactory.getLog(SmcConfigDriverRunHook.class);
+ static final private String KEY_FIELD = "key";
+ static final private String VALUE_FIELD = "value";
+ static final private String PERCENTAGE_FIELD = "percentage";
+ static final private String ENFORCE_FIELD = "enforce";
+
+ @Override
+ public void preDriverRun(HiveDriverRunHookContext hookContext) throws Exception {
+ HiveConf conf = (HiveConf) hookContext.getConf();
+ if (!isEnabled(conf)) {
+ return;
+ }
+
+ Object configObj = getConfigObject(conf);
+
+ if (configObj == null || !(configObj instanceof JSONArray) ) {
+ LOG.error("config not properly set!");
+ return;
+ }
+
+ // Sanity checks pass, apply all the configs.
+ JSONArray configEntries = (JSONArray) configObj;
+ for (int i = 0; i < configEntries.length(); i++) {
+ JSONObject configEntry = configEntries.getJSONObject(i);
+ Object percentage = new Integer(100);
+ Object enforce = new Boolean(false);
+
+ // Get the config key and value
+ String key = configEntry.getString(KEY_FIELD);
+ String value = configEntry.get(VALUE_FIELD).toString();
+
+ LOG.debug("SmcConfigHook found configuration KEY: " + key + " VALUE: " + value);
+
+ // If enforce is set to true, even if the user has set the value of this config variable
+ // explicitely, we will overwrite it
+ if (configEntry.has(ENFORCE_FIELD)) {
+ enforce = configEntry.get(ENFORCE_FIELD);
+ }
+
+ LOG.debug("Enforce for key " + key + " is " + enforce.toString());
+
+ if (!(enforce instanceof Boolean)) {
+ LOG.error("enforce is not properly set for " + key);
+ continue;
+ }
+
+ if (!(Boolean)enforce && SessionState.get() != null &&
+ SessionState.get().getOverriddenConfigurations().containsKey(key)) {
+ continue;
+ }
+
+ // If the percentage field is set to some number n, the configuration change will be made
+ // to approximately n% of queries
+ if (configEntry.has(PERCENTAGE_FIELD)) {
+ percentage = configEntry.getInt(PERCENTAGE_FIELD);
+ }
+
+ LOG.debug("Percentage for key " + key + " is " + percentage.toString());
+
+ if (!(percentage instanceof Integer)) {
+ LOG.error("percentage is not properly set for " + key);
+ continue;
+ }
+
+ if ((Integer)percentage != 100) {
+ boolean diceRoll = false;
+
+ try {
+ diceRoll = HookUtils.rollDice(((Integer)percentage).intValue()/100f);
+ } catch (Exception e) {
+ LOG.error("percentage is not properly set for " + key);
+ LOG.error(e.getMessage());
+ }
+
+ if (!diceRoll) {
+ continue;
+ }
+ }
+
+ conf.set(key, value);
+ }
+ }
+
+ @Override
+ public void postDriverRun(HiveDriverRunHookContext hookContext)
+ throws Exception {
+ // Do nothing
+ }
+
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SmcConfigHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.json.JSONObject;
+/**
+ * Retrieves and sets Hive config key/values based on a config stored in the
+ * properties of an SMC tier. This is useful for quick changes to the config
+ * that should apply to a particular package of Hive. (e.g. silver.trunk). The
+ * advantage over a XML file edit is that it's persistent between hotfixes and
+ * we have a history of what changes were made. But since this is a hook that
+ * runs after query compilation, it is limited in what values it can effectively
+ * change.
+ *
+ * The configs are supposed to be stored in the properties of an SMC tier. The
+ * name of the property corresponds to the hive package. The value of the
+ * property is a JSON object that holds 1) an enabled field that controls
+ * whether the key-value pairs should be applied 2) a config field that holds
+ * the actual key-value pairs.
+ *
+ * (Property)hivePackageName -> {enabled : boolean,
+ * configs : {key1 : value1,
+ * key2 : value2..
+ * }
+ * }
+ *
+ * The primary application of this hook is to modify the behavior of the
+ * jobtracker hook. For the configs to apply to the hook, it must be listed
+ * before the jobtracker hook in hive.exec.pre.hooks
+ */
+public class SmcConfigHook extends AbstractSmcConfigHook implements ExecuteWithHookContext {
+
+ static final private Log LOG = LogFactory.getLog(SmcConfigHook.class);
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ HiveConf conf = hookContext.getConf();
+
+ if (!isEnabled(conf)) {
+ return;
+ }
+
+ Object configObj = getConfigObject(conf);
+
+ if (configObj == null || !(configObj instanceof JSONObject) ) {
+ LOG.error("config not properly set!");
+ return;
+ }
+
+ // Sanity checks pass, apply all the configs.
+ JSONObject configJson = (JSONObject) configObj;
+ @SuppressWarnings("unchecked")
+ Iterator<String> i = (Iterator<String>) configJson.keys();
+ while(i.hasNext()) {
+ String key = i.next();
+ Object valueObj = configJson.get(key);
+ String value = valueObj.toString();
+
+ conf.set(key, value);
+ LOG.debug("Setting " + key + " to " + value);
+ }
+ }
+
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SplitSizeHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SplitSizeHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SplitSizeHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SplitSizeHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Random;
+import java.util.List;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+
+/**
+ * Implementation of a compile time hook to set all split size parameters from
+ * mapred.min.split.size if it is CombineHiveInputFormat
+ * of queries
+ */
+public class SplitSizeHook extends AbstractSemanticAnalyzerHook {
+ final static String COMBINE_HIVE_INPUT_FORMAT =
+ "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
+ final static String CONF_MAPRED_MAX_SPLIT_SIZE = "mapred.max.split.size";
+ final static String CONF_MAPRED_MIN_SPLIT_PER_RACK = "mapred.min.split.size.per.rack";
+ final static String CONF_MAPRED_MIN_SPLIT_PER_NODE = "mapred.min.split.size.per.node";
+
+ // If input format is CombineHiveInputFormat, set all 3 related split size parameter to
+ // mapred.min.split.size. mapred.max.split.size remains its old value if it is larger
+ // than the new value.
+ public ASTNode preAnalyze(
+ HiveSemanticAnalyzerHookContext context,
+ ASTNode ast) throws SemanticException {
+ HiveSemanticAnalyzerHookContextImpl ctx = (HiveSemanticAnalyzerHookContextImpl)context;
+ HiveConf conf = (HiveConf)ctx.getConf();
+
+ String hiveInputFormat = conf.getVar(HiveConf.ConfVars.HIVEINPUTFORMAT);
+
+ if (!hiveInputFormat.equals(COMBINE_HIVE_INPUT_FORMAT)) {
+ return ast;
+ }
+
+ long mapredMinSplitSize = conf.getLongVar(HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+
+ conf.setLong(CONF_MAPRED_MIN_SPLIT_PER_NODE, mapredMinSplitSize);
+ conf.setLong(CONF_MAPRED_MIN_SPLIT_PER_RACK, mapredMinSplitSize);
+ long maxSplit = conf.getLong(CONF_MAPRED_MAX_SPLIT_SIZE, (long)-1);
+ if (mapredMinSplitSize > maxSplit) {
+ conf.setLong(CONF_MAPRED_MAX_SPLIT_SIZE, mapredMinSplitSize);
+ }
+
+ return ast;
+ }
+
+ // Nothing to do
+ public void postAnalyze(
+ HiveSemanticAnalyzerHookContext context,
+ List<Task<? extends Serializable>> rootTasks) throws SemanticException {
+ // no nothing
+ }
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/StartFinishHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/StartFinishHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/StartFinishHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/StartFinishHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.conf.FBHiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * This hook records the approx. start and finish times of queries into a table
+ * in MySQL (query_time_log). Useful for debugging. Possibly for performance
+ * measurement.
+ *
+ * - Relies on query_id to update rows with the finish time.
+ * - Old entries in this table should be cleaned out on a regular basis.
+ */
+
+/*
+Example table schema:
+
+CREATE TABLE `query_time_log` (
+ `query_id` varchar(512) DEFAULT NULL,
+ `start_time` timestamp NULL DEFAULT NULL,
+ `finish_time` timestamp NULL DEFAULT NULL,
+ `query` mediumtext,
+ `query_type` varchar(32) DEFAULT NULL,
+ `inputs` mediumtext,
+ `outputs` mediumtext,
+ `user_info` varchar(512) DEFAULT NULL,
+ PRIMARY KEY (`query_id`),
+ INDEX(start_time),
+ INDEX(finish_time),
+ INDEX(inputs(256)),
+ INDEX(outputs(256))
+ ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+*/
+
+public class StartFinishHook implements PreExecute, PostExecute {
+
+
+
+ ConnectionUrlFactory urlFactory = null;
+
+ public StartFinishHook() throws Exception {
+ HiveConf conf = new HiveConf(StartFinishHook.class);
+
+
+ urlFactory = HookUtils.getUrlFactory(conf,
+ FBHiveConf.CONNECTION_FACTORY,
+ FBHiveConf.STARTFINISH_CONNECTION_FACTORY,
+ FBHiveConf.STARTFINISH_MYSQL_TIER_VAR_NAME,
+ FBHiveConf.STARTFINISH_HOST_DATABASE_VAR_NAME);
+ }
+
+ /**
+ * Returns a list of strings with following values extracted from the state:
+ * command, commandType, inputStr, outputStr, queryId, userInfo
+ *
+ * @param sess
+ * @param inputs
+ * @param outputs
+ * @param ugi
+ * @return
+ */
+ private static ArrayList<Object> extractValues(SessionState sess,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs, UserGroupInformation ugi) {
+ String command = sess.getCmd();
+ String commandType = sess.getCommandType();
+ String userInfo = "";
+ if (ugi != null) {
+ userInfo = ugi.getUserName();
+ }
+ String inputStr = "";
+
+ if (inputs != null) {
+ StringBuilder inputsSB = new StringBuilder();
+
+ boolean first = true;
+
+ for (ReadEntity inp : inputs) {
+ if (!first) {
+ inputsSB.append(",");
+ }
+ first = false;
+ inputsSB.append(inp.toString());
+ }
+ inputStr = inputsSB.toString();
+ }
+
+ String outputStr = "";
+
+ if (outputs != null) {
+ StringBuilder outputsSB = new StringBuilder();
+
+ boolean first = true;
+
+ for (WriteEntity o : outputs) {
+ if (!first) {
+ outputsSB.append(",");
+ }
+ first = false;
+ outputsSB.append(o.toString());
+ }
+ outputStr = outputsSB.toString();
+ }
+
+ String queryId = getQueryId(sess);
+
+ ArrayList<Object> values = new ArrayList<Object>();
+ values.add(command);
+ values.add(commandType);
+ values.add(inputStr);
+ values.add(outputStr);
+ values.add(queryId);
+ values.add(userInfo);
+
+ return values;
+ }
+
+ private static String getQueryId(SessionState sess) {
+ HiveConf conf = sess.getConf();
+ String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);
+ return queryId;
+
+ }
+
+ /**
+ * For PreExecute
+ */
+ @Override
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, UserGroupInformation ugi) throws Exception {
+ ArrayList<Object> values = StartFinishHook.extractValues(sess,
+ inputs, outputs, ugi);
+ String query = "INSERT INTO query_time_log SET " +
+ "query = ?, " +
+ "query_type = ?, " +
+ "inputs = ?, " +
+ "outputs = ?, " +
+ "query_id = ?, " +
+ "user_info = ?, " +
+ "start_time = now()";
+
+ HiveConf conf = sess.getConf();
+ // pre-hook doesn't need to retry many times and can fail faster.
+ HookUtils.runInsert(conf, urlFactory, query, values, 5);
+ }
+
+ /**
+ * For PostExecute
+ */
+ @Override
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, LineageInfo info, UserGroupInformation ugi)
+ throws Exception {
+ ArrayList<Object> values = StartFinishHook.extractValues(sess,
+ inputs, outputs, ugi);
+ // Duplicate values for update statement
+ values.addAll(values);
+ // The ON DUPLICATE.. ensures that start_time is preserved for normal cases
+ // where start_time was recorded
+ String valueString =
+ "query = ?, " +
+ "query_type = ?, " +
+ "inputs = ?, " +
+ "outputs = ?, " +
+ "query_id = ?, " +
+ "user_info = ?, " +
+ "finish_time = now()";
+ String query = "INSERT INTO query_time_log SET " + valueString +
+ " ON DUPLICATE KEY UPDATE " + valueString ;
+
+ HiveConf conf = sess.getConf();
+ HookUtils.runInsert(conf, urlFactory, query, values, HookUtils
+ .getSqlNumRetry(conf));
+ }
+
+}

Added: hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SuggestionPrintingHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SuggestionPrintingHook.java?rev=1332873&view=auto
==============================================================================
--- hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SuggestionPrintingHook.java (added)
+++ hive/trunk/contrib/src/java/org/apache/hadoop/hive/ql/hooks/SuggestionPrintingHook.java Tue May 1 22:52:38 2012
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.Random;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+import java.io.Serializable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Implementation of a pre execute hook that prints out a suggestion for users
+ * to use TABLESAMPLE when inputs are large.
+ */
+public class SuggestionPrintingHook implements ExecuteWithHookContext {
+
+ static final private Log LOG = LogFactory.getLog(SuggestionPrintingHook.class
+ .getName());
+
+ static private int timesReported = 0;
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ assert (hookContext.getHookType() == HookContext.HookType.PRE_EXEC_HOOK);
+ SessionState sess = SessionState.get();
+ if (sess.getIsSilent()) {
+ return;
+ }
+ SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+
+ QueryPlan queryPlan = hookContext.getQueryPlan();
+ ArrayList<Task<? extends Serializable>> rootTasks = queryPlan.getRootTasks();
+
+ // If it is a pure DDL task,
+ if (rootTasks == null) {
+ return;
+ }
+ if (rootTasks.size() == 1) {
+ Task<? extends Serializable> tsk = rootTasks.get(0);
+ if (tsk instanceof DDLTask) {
+ return;
+ }
+ }
+
+ // do some simple query matching to not to show the suggestion for some
+ // queries.
+ String command = SessionState.get().getCmd().toUpperCase().replace('\n',
+ ' ').replace('\t', ' ');
+ if ((timesReported > 0 && HookUtils.rollDice(0.9f)) ||
+ !command.contains("SELECT ") || command.contains(" TABLESAMPLE")
+ || command.contains(" JOIN ") || command.contains(" LIMIT ")) {
+ return;
+ }
+
+ Set<ReadEntity> inputs = hookContext.getInputs();
+ Map<String, ContentSummary> inputToCS = hookContext
+ .getInputPathToContentSummary();
+
+ HiveConf conf = sess.getConf();
+
+ int maxGigaBytes = conf.getInt("fbhive.suggest.tablesample.gigabytes", 32);
+
+ long maxBytes = maxGigaBytes * 1024 * 1024 * 1024L;
+
+ if (maxGigaBytes < 0) {
+ console.printError("maxGigaBytes value of " + maxGigaBytes
+ + "is invalid");
+ return;
+ }
+
+ long inputSize = HookUtils.getInputSize(inputs, inputToCS, conf);
+
+ if (inputSize > maxBytes) {
+ console.printInfo("");
+ console
+ .printInfo("*** This queries over "
+ + Math.round(maxBytes / 1024D / 1024D / 1024D)
+ + " GB data. Consider TABLESAMPLE: fburl.com/?key=2001210");
+ console.printInfo("");
+ timesReported++;
+ }
+ }
+}

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 3 of 4 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMay 1, '12 at 10:53p
activeMay 1, '12 at 10:53p
posts4
users1
websitehive.apache.org

1 user in discussion

Kevinwilfong: 4 posts

People

Translate

site design / logo © 2021 Grokbase