FAQ
Author: sdong
Date: Tue Jul 26 17:13:29 2011
New Revision: 1151175

URL: http://svn.apache.org/viewvc?rev=1151175&view=rev
Log:
HIVE 2282. Local mode needs to work well with block sampling (Kevin Wilfong via Siying Dong)

Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java
hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q
hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1151175&r1=1151174&r2=1151175&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Tue Jul 26 17:13:29 2011
@@ -60,6 +60,10 @@ public class MapRedTask extends ExecDriv
private transient ContentSummary inputSummary = null;
private transient boolean runningViaChild = false;

+ private transient boolean inputSizeEstimated = false;
+ private transient long totalInputFileSize;
+ private transient long totalInputNumFiles;
+
public MapRedTask() {
super();
}
@@ -91,16 +95,21 @@ public class MapRedTask extends ExecDriv
inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
}

+ // set the values of totalInputFileSize and totalInputNumFiles, estimating them
+ // if percentage block sampling is being used
+ estimateInputSize();
+
// at this point the number of reducers is precisely defined in the plan
int numReducers = work.getNumReduceTasks();

if (LOG.isDebugEnabled()) {
LOG.debug("Task: " + getId() + ", Summary: " +
- inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
+ totalInputFileSize + "," + totalInputNumFiles + ","
+ numReducers);
}

- String reason = MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers);
+ String reason = MapRedTask.isEligibleForLocalMode(conf, numReducers,
+ totalInputFileSize, totalInputNumFiles);
if (reason == null) {
// clone configuration before modifying it on per-task basis
cloneConf();
@@ -366,9 +375,50 @@ public class MapRedTask extends ExecDriv
inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
}

- long totalInputFileSize = inputSummary.getLength();
-
// if all inputs are sampled, we should shrink the size of reducers accordingly.
+ estimateInputSize();
+
+ if (totalInputFileSize != inputSummary.getLength()) {
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
+ } else {
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+ }
+
+ int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+ return reducers;
+ }
+
+ /**
+ * Sets the values of totalInputFileSize and totalInputNumFiles. If percentage
+ * block sampling is used, these values are estimates based on the highest
+ * percentage being used for sampling multiplied by the value obtained from the
+ * input summary. Otherwise, these values are set to the exact value obtained
+ * from the input summary.
+ *
+ * Once the function completes, inputSizeEstimated is set so that the logic is
+ * never run more than once.
+ */
+ private void estimateInputSize() {
+ if (inputSizeEstimated) {
+ // If we've already run this function, return
+ return;
+ }
+
+ // Initialize the values to be those taken from the input summary
+ totalInputFileSize = inputSummary.getLength();
+ totalInputNumFiles = inputSummary.getFileCount();
+
+ if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ // If percentage block sampling wasn't used, we don't need to do any estimation
+ inputSizeEstimated = true;
+ return;
+ }
+
+ // if all inputs are sampled, we should shrink the size of the input accordingly
double highestSamplePercentage = 0;
boolean allSample = false;
for (String alias : work.getAliasToWork().keySet()) {
@@ -385,42 +435,38 @@ public class MapRedTask extends ExecDriv
}
if (allSample) {
// This is a little bit dangerous if inputs turns out not to be able to be sampled.
- // In that case, we significantly underestimate number of reducers.
- // It's the same as other cases of estimateNumberOfReducers(). It's just our best
+ // In that case, we significantly underestimate the input.
+ // It's the same as estimateNumberOfReducers(). It's just our best
// guess and there is no guarantee.
totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
, totalInputFileSize);
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
- } else {
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+ totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
+ , totalInputNumFiles);
}

- int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
- return reducers;
+ inputSizeEstimated = true;
}

/**
* Find out if a job can be run in local mode based on it's characteristics
*
* @param conf Hive Configuration
- * @param inputSummary summary about the input files for this job
* @param numReducers total number of reducers for this job
+ * @param inputLength the size of the input
+ * @param inputFileCount the number of files of input
* @return String null if job is eligible for local mode, reason otherwise
*/
public static String isEligibleForLocalMode(HiveConf conf,
- ContentSummary inputSummary,
- int numReducers) {
+ int numReducers,
+ long inputLength,
+ long inputFileCount) {

long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES);
long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS);

// check for max input size
- if (inputSummary.getLength() > maxBytes) {
- return "Input Size (= " + inputSummary.getLength() + ") is larger than " +
+ if (inputLength > maxBytes) {
+ return "Input Size (= " + inputLength + ") is larger than " +
HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
}

@@ -428,8 +474,8 @@ public class MapRedTask extends ExecDriv
// in the absence of an easy way to get the number of splits - do this
// based on the total number of files (pessimistically assumming that
// splits are equal to number of files in worst case)
- if (inputSummary.getFileCount() > maxTasks) {
- return "Number of Input Files (= " + inputSummary.getFileCount() +
+ if (inputFileCount > maxTasks) {
+ return "Number of Input Files (= " + inputFileCount +
") is larger than " +
HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1151175&r1=1151174&r2=1151175&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Jul 26 17:13:29 2011
@@ -7997,7 +7997,8 @@ public class SemanticAnalyzer extends Ba
+ numReducers);
}

- if(MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers) != null) {
+ if(MapRedTask.isEligibleForLocalMode(conf, numReducers,
+ inputSummary.getLength(), inputSummary.getFileCount()) != null) {
hasNonLocalJob = true;
break;
}else{

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java?rev=1151175&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java Tue Jul 26 17:13:29 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+
+public class VerifyIsLocalModeHook implements ExecuteWithHookContext {
+
+ public void run(HookContext hookContext) {
+ if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
+ List<TaskRunner> taskRunners = hookContext.getCompleteTaskList();
+ for (TaskRunner taskRunner : taskRunners) {
+ Task task = taskRunner.getTask();
+ if (task.isMapRedTask()) {
+ Assert.assertTrue("VerifyIsLocalModeHook fails because a isLocalMode was not set for a task.",
+ task.isLocalMode());
+ }
+ }
+ }
+ }
+}

Added: hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q?rev=1151175&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q Tue Jul 26 17:13:29 2011
@@ -0,0 +1,42 @@
+drop table if exists sih_i_part;
+drop table if exists sih_src;
+drop table if exists sih_src2;
+
+set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+set mapred.max.split.size=300;
+set mapred.min.split.size=300;
+set mapred.min.split.size.per.node=300;
+set mapred.min.split.size.per.rack=300;
+set hive.exec.mode.local.auto=true;
+set hive.merge.smallfiles.avgsize=1;
+
+-- create file inputs
+create table sih_i_part (key int, value string) partitioned by (p string);
+insert overwrite table sih_i_part partition (p='1') select key, value from src;
+insert overwrite table sih_i_part partition (p='2') select key+10000, value from src;
+insert overwrite table sih_i_part partition (p='3') select key+20000, value from src;
+create table sih_src as select key, value from sih_i_part;
+create table sih_src2 as select key, value from sih_src;
+
+set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.VerifyIsLocalModeHook ;
+set mapred.job.tracker=does.notexist.com:666;
+set hive.exec.mode.local.auto.tasks.max=1;
+
+-- sample split, running locally limited by num tasks
+select count(1) from sih_src tablesample(1 percent);
+
+set mapred.job.tracker=does.notexist.com:666;
+
+-- sample two tables
+select count(1) from sih_src tablesample(1 percent)a join sih_src2 tablesample(1 percent)b on a.key = b.key;
+
+set hive.exec.mode.local.auto.inputbytes.max=1000;
+set hive.exec.mode.local.auto.tasks.max=4;
+set mapred.job.tracker=does.notexist.com:666;
+
+-- sample split, running locally limited by max bytes
+select count(1) from sih_src tablesample(1 percent);
+
+drop table sih_i_part;
+drop table sih_src;
+drop table sih_src2;

Added: hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out?rev=1151175&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out Tue Jul 26 17:13:29 2011
@@ -0,0 +1,116 @@
+PREHOOK: query: drop table if exists sih_i_part
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists sih_i_part
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists sih_src
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists sih_src
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists sih_src2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists sih_src2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: -- create file inputs
+create table sih_i_part (key int, value string) partitioned by (p string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- create file inputs
+create table sih_i_part (key int, value string) partitioned by (p string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@sih_i_part
+PREHOOK: query: insert overwrite table sih_i_part partition (p='1') select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@sih_i_part@p=1
+POSTHOOK: query: insert overwrite table sih_i_part partition (p='1') select key, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@sih_i_part@p=1
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table sih_i_part partition (p='2') select key+10000, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@sih_i_part@p=2
+POSTHOOK: query: insert overwrite table sih_i_part partition (p='2') select key+10000, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@sih_i_part@p=2
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table sih_i_part partition (p='3') select key+20000, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@sih_i_part@p=3
+POSTHOOK: query: insert overwrite table sih_i_part partition (p='3') select key+20000, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@sih_i_part@p=3
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table sih_src as select key, value from sih_i_part
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@sih_i_part@p=1
+PREHOOK: Input: default@sih_i_part@p=2
+PREHOOK: Input: default@sih_i_part@p=3
+POSTHOOK: query: create table sih_src as select key, value from sih_i_part
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@sih_i_part@p=1
+POSTHOOK: Input: default@sih_i_part@p=2
+POSTHOOK: Input: default@sih_i_part@p=3
+POSTHOOK: Output: default@sih_src
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table sih_src2 as select key, value from sih_src
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@sih_src
+POSTHOOK: query: create table sih_src2 as select key, value from sih_src
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@sih_src
+POSTHOOK: Output: default@sih_src2
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: -- sample split, running locally limited by num tasks
+select count(1) from sih_src tablesample(1 percent)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sih_src
+PREHOOK: Output: file:/data/users/kevinwilfong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2011-07-22_10-31-00_619_5856650519690274700/-mr-10000
+500
+PREHOOK: query: -- sample two tables
+select count(1) from sih_src tablesample(1 percent)a join sih_src2 tablesample(1 percent)b on a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sih_src
+PREHOOK: Input: default@sih_src2
+PREHOOK: Output: file:/data/users/kevinwilfong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2011-07-22_10-31-04_069_8883954538684297085/-mr-10000
+0
+PREHOOK: query: -- sample split, running locally limited by max bytes
+select count(1) from sih_src tablesample(1 percent)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sih_src
+PREHOOK: Output: file:/data/users/kevinwilfong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2011-07-22_10-31-10_389_1530285320067549321/-mr-10000
+500
+PREHOOK: query: drop table sih_i_part
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@sih_i_part
+PREHOOK: Output: default@sih_i_part
+PREHOOK: query: drop table sih_src
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@sih_src
+PREHOOK: Output: default@sih_src
+PREHOOK: query: drop table sih_src2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@sih_src2
+PREHOOK: Output: default@sih_src2

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categorieshive, hadoop
postedJul 26, '11 at 5:13p
activeJul 26, '11 at 5:13p
posts1
users1
websitehive.apache.org

1 user in discussion

Sdong: 1 post

People

Translate

site design / logo © 2021 Grokbase