FAQ
Author: jssarma
Date: Mon Mar 21 22:01:34 2011
New Revision: 1083984

URL: http://svn.apache.org/viewvc?rev=1083984&view=rev
Log:
HIVE-2051: getInputSummary() to call FileSystem.getContentSummary() in parallel (Siying Dong via jssarma)

Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1083984&r1=1083983&r2=1083984&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Mar 21 22:01:34 2011
@@ -59,6 +59,12 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@@ -100,8 +106,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde.Constants;
@@ -110,8 +116,8 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -402,8 +408,8 @@ public final class Utilities {
e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());

e.writeObject(t);
- }finally {
- if(null != e){
+ } finally {
+ if (null != e) {
e.close();
}
}
@@ -632,9 +638,9 @@ public final class Utilities {
public static TableDesc getTableDesc(String cols, String colTypes) {
return (new TableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
}

public static PartitionDesc getPartitionDesc(Partition part) throws HiveException {
@@ -1142,7 +1148,8 @@ public final class Utilities {
}

public static void mvFileToFinalPath(String specPath, Configuration hconf,
- boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException, HiveException {
+ boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException,
+ HiveException {

FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
@@ -1158,7 +1165,7 @@ public final class Utilities {
Utilities.rename(fs, tmpPath, intermediatePath);
// Step2: remove any tmp file or double-committed output files
ArrayList<String> emptyBuckets =
- Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+ Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
// create empty buckets if necessary
if (emptyBuckets.size() > 0) {
createEmptyBuckets(hconf, emptyBuckets, conf);
@@ -1176,13 +1183,18 @@ public final class Utilities {
/**
* Check the existence of buckets according to bucket specification. Create empty buckets if
* needed.
- * @param specPath The final path where the dynamic partitions should be in.
- * @param conf FileSinkDesc.
- * @param dpCtx dynamic partition context.
+ *
+ * @param specPath
+ * The final path where the dynamic partitions should be in.
+ * @param conf
+ * FileSinkDesc.
+ * @param dpCtx
+ * dynamic partition context.
* @throws HiveException
* @throws IOException
*/
- private static void createEmptyBuckets(Configuration hconf, ArrayList<String> paths, FileSinkDesc conf)
+ private static void createEmptyBuckets(Configuration hconf, ArrayList<String> paths,
+ FileSinkDesc conf)
throws HiveException, IOException {

JobConf jc;
@@ -1209,7 +1221,7 @@ public final class Utilities {
throw new HiveException(e);
}

- for (String p: paths) {
+ for (String p : paths) {
Path path = new Path(p);
RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
@@ -1503,6 +1515,8 @@ public final class Utilities {
}
}

+ public static Object getInputSummaryLock = new Object();
+
/**
* Calculate the total size of input files.
*
@@ -1520,9 +1534,13 @@ public final class Utilities {

long[] summary = {0, 0, 0};

- // For each input path, calculate the total size.
- for (String path : work.getPathToAliases().keySet()) {
- try {
+ List<String> pathNeedProcess = new ArrayList<String>();
+
+ // Since multiple threads could call this method concurrently, locking
+ // this method will avoid number of threads out of control.
+ synchronized (getInputSummaryLock) {
+ // For each input path, calculate the total size.
+ for (String path : work.getPathToAliases().keySet()) {
Path p = new Path(path);

if (filter != null && !filter.accept(p)) {
@@ -1531,37 +1549,118 @@ public final class Utilities {

ContentSummary cs = ctx.getCS(path);
if (cs == null) {
- JobConf jobConf = new JobConf(ctx.getConf());
- PartitionDesc partDesc = work.getPathToPartitionInfo().get(
- p.toString());
- Class<? extends InputFormat> inputFormatCls = partDesc
- .getInputFileFormatClass();
- InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
- inputFormatCls, jobConf);
- if(inputFormatObj instanceof ContentSummaryInputFormat) {
- cs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, jobConf);
- } else {
- FileSystem fs = p.getFileSystem(ctx.getConf());
- cs = fs.getContentSummary(p);
+ if (path == null) {
+ continue;
}
- ctx.addCS(path, cs);
- LOG.info("Cache Content Summary for " + path + " length: " + cs.getLength() + " file count: "
- + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
+ pathNeedProcess.add(path);
+ } else {
+ summary[0] += cs.getLength();
+ summary[1] += cs.getFileCount();
+ summary[2] += cs.getDirectoryCount();
}
+ }
+
+ // Process the case when name node call is needed
+ final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
+ ArrayList<Future<?>> results = new ArrayList<Future<?>>();
+ ThreadPoolExecutor executor = null;
+ int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0);
+ if (pathNeedProcess.size() > 1 && maxThreads > 1) {
+ int numExecutors = Math.min(pathNeedProcess.size(), maxThreads);
+ LOG.info("Using " + numExecutors + " threads for getContentSummary");
+ executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ }
+
+ //
+ Configuration conf = ctx.getConf();
+ JobConf jobConf = new JobConf(conf);
+ for (String path : pathNeedProcess) {
+ final Path p = new Path(path);
+ final String pathStr = path;
+ // All threads share the same Configuration and JobConf based on the
+ // assumption that they are thread safe if only read operations are
+ // executed. It is not stated in Hadoop's javadoc, the sourcce codes
+ // clearly showed that they made efforts for it and we believe it is
+ // thread safe. Will revisit this piece of codes if we find the assumption
+ // is not correct.
+ final Configuration myConf = conf;
+ final JobConf myJobConf = jobConf;
+ final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
+ p.toString());
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ ContentSummary resultCs;
+
+ Class<? extends InputFormat> inputFormatCls = partDesc
+ .getInputFileFormatClass();
+ InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
+ inputFormatCls, myJobConf);
+ if (inputFormatObj instanceof ContentSummaryInputFormat) {
+ resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
+ myJobConf);
+ } else {
+ FileSystem fs = p.getFileSystem(myConf);
+ resultCs = fs.getContentSummary(p);
+ }
+ resultMap.put(pathStr, resultCs);
+ } catch (IOException e) {
+ // We safely ignore this exception for summary data.
+ // We don't update the cache to protect it from polluting other
+ // usages. The worst case is that IOException will always be
+ // retried for another getInputSummary(), which is fine as
+ // IOException is not considered as a common case.
+ LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
+ }
+ }
+ };
+
+ if (executor == null) {
+ r.run();
+ } else {
+ Future<?> result = executor.submit(r);
+ results.add(result);
+ }
+ }
+
+ if (executor != null) {
+ for (Future<?> result : results) {
+ boolean executorDone = false;
+ do {
+ try {
+ result.get();
+ executorDone = true;
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted when waiting threads: ", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ } while (!executorDone);
+ }
+ executor.shutdown();
+ }
+
+ for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
+ ContentSummary cs = entry.getValue();

summary[0] += cs.getLength();
summary[1] += cs.getFileCount();
summary[2] += cs.getDirectoryCount();

- } catch (IOException e) {
- LOG.info("Cannot get size of " + path + ". Safely ignored.");
+ ctx.addCS(entry.getKey(), cs);
+ LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength()
+ + " file count: "
+ + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
}
+
+ return new ContentSummary(summary[0], summary[1], summary[2]);
}
- return new ContentSummary(summary[0], summary[1], summary[2]);
}

public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
- throws Exception {
+ throws Exception {
ContentSummary cs = ctx.getCS(dirPath);
if (cs != null) {
LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " num files: "
@@ -1712,17 +1811,17 @@ public final class Utilities {
}

public static String generateTarURI(String baseURI, String filename) {
- String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz");
+ String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz");
return tmpFileURI;
}

public static String generateTarURI(Path baseURI, String filename) {
- String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz");
+ String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz");
return tmpFileURI;
}

public static String generateTarFileName(String name) {
- String tmpFileURI = new String(name+".tar.gz");
+ String tmpFileURI = new String(name + ".tar.gz");
return tmpFileURI;
}

@@ -1738,7 +1837,7 @@ public final class Utilities {
}

public static double showTime(long time) {
- double result = (double) time / (double)1000;
+ double result = (double) time / (double) 1000;
return result;
}

Search Discussions

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 1 | next ›
Discussion Overview
groupcommits @
categorieshive, hadoop
postedMar 21, '11 at 10:02p
activeMar 21, '11 at 10:02p
posts1
users1
websitehive.apache.org

1 user in discussion

Jssarma: 1 post

People

Translate

site design / logo © 2021 Grokbase