FAQ
As per a previous list question
(http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3Ce75c02ef0804011433x144813e6x2450da7883de3aca@mail.gmail.com%3E)
it looks as though it's not possible for hadoop to traverse input
directories recursively in order to discover input files.

Just wondering a) if there's any particular reason why this
functionality doesn't exist, and b) if not, if there's any
workaround/hack to make it possible.

Like the OP, I was thinking it would be helpful to partition my input
data by year, month, and day. I figured his would enable me to run jobs
against specific date ranges of input data, and thereby speed up the
execution of my jobs since they wouldn't have to process every single
record.

Any way to make this happen? (Or am I totally going about this the
wrong way for what I'm trying to achieve?)

TIA,

DR

Search Discussions

  • Aaron Kimball at Jun 2, 2009 at 11:23 pm
    There is no technical limit that prevents Hadoop from operating in this
    fashion; it's simply the case that the included InputFormat implementations
    do not do so. This behavior has been set in this fashion for a long time, so
    it's unlikely that it will change soon, as that might break existing
    applications.

    But you can write your own subclass of TextInputFormat or
    SequenceFileInputFormat that overrides the getSplits() method to recursively
    descend through directories and search for files.

    - Aaron
    On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote:

    As per a previous list question (
    http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3Ce75c02ef0804011433x144813e6x2450da7883de3aca@mail.gmail.com%3E)
    it looks as though it's not possible for hadoop to traverse input
    directories recursively in order to discover input files.

    Just wondering a) if there's any particular reason why this functionality
    doesn't exist, and b) if not, if there's any workaround/hack to make it
    possible.

    Like the OP, I was thinking it would be helpful to partition my input data
    by year, month, and day. I figured his would enable me to run jobs against
    specific date ranges of input data, and thereby speed up the execution of my
    jobs since they wouldn't have to process every single record.

    Any way to make this happen? (Or am I totally going about this the wrong
    way for what I'm trying to achieve?)

    TIA,

    DR
  • Brian Bockelman at Jun 3, 2009 at 12:35 am
    Hey Aaron,

    I had a similar problem. I have log files arranged in the following
    fashion:

    /logs/<hostname>/<date>.log

    I want to analyze a range of dates for all hosts. What I did was
    write into my driver class a subroutine that descends through the HDFS
    file system starting at /logs and builds a list of input files, then
    fed the list of files to the framework.

    Example code below.

    Brian

    FileSystem fs = FileSystem.get(conf);
    Pattern fileNamePattern = Pattern.compile(".*datanode-(.*).log.
    ([0-9]+-[0-9]+-[0-9]+)");
    for (FileStatus status : fs.listStatus(base)) {
    Path pathname = status.getPath();
    for (FileStatus logfile : fs.listStatus(pathname)) {
    Path logFilePath = logfile.getPath();
    Matcher m = fileNamePattern.matcher(logFilePath.getName());
    if (m.matches()) {
    String dateString = m.group(2);
    Date logDate = df.parse(dateString);
    if ((logDate.equals(startDate) || logDate.after(startDate))
    && logDate.before(endDate)) {
    FileInputFormat.addInputPath(conf, logFilePath);
    } else {
    //System.out.println("Ignoring file: " +
    logFilePath.getName());
    //System.out.println("Start Date: " + startDate + ", End
    Date: " + endDate + ", Log date: " + logDate);
    }
    } else {
    System.out.println("Ignoring file: " +
    logFilePath.getName());
    }
    }
    }

    On Jun 2, 2009, at 6:22 PM, Aaron Kimball wrote:

    There is no technical limit that prevents Hadoop from operating in
    this
    fashion; it's simply the case that the included InputFormat
    implementations
    do not do so. This behavior has been set in this fashion for a long
    time, so
    it's unlikely that it will change soon, as that might break existing
    applications.

    But you can write your own subclass of TextInputFormat or
    SequenceFileInputFormat that overrides the getSplits() method to
    recursively
    descend through directories and search for files.

    - Aaron

    On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch
    wrote:
    As per a previous list question (
    http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3Ce75c02ef0804011433x144813e6x2450da7883de3aca@mail.gmail.com%3E)
    it looks as though it's not possible for hadoop to traverse input
    directories recursively in order to discover input files.

    Just wondering a) if there's any particular reason why this
    functionality
    doesn't exist, and b) if not, if there's any workaround/hack to
    make it
    possible.

    Like the OP, I was thinking it would be helpful to partition my
    input data
    by year, month, and day. I figured his would enable me to run jobs
    against
    specific date ranges of input data, and thereby speed up the
    execution of my
    jobs since they wouldn't have to process every single record.

    Any way to make this happen? (Or am I totally going about this the
    wrong
    way for what I'm trying to achieve?)

    TIA,

    DR
  • Brian Bockelman at Jun 3, 2009 at 12:35 am
    Hey Aaron,

    I had a similar problem. I have log files arranged in the following
    fashion:

    /logs/<hostname>/<date>.log

    I want to analyze a range of dates for all hosts. What I did was
    write into my driver class a subroutine that descends through the HDFS
    file system starting at /logs and builds a list of input files, then
    fed the list of files to the framework.

    Example code below.

    Brian

    FileSystem fs = FileSystem.get(conf);
    Pattern fileNamePattern = Pattern.compile(".*datanode-(.*).log.
    ([0-9]+-[0-9]+-[0-9]+)");
    for (FileStatus status : fs.listStatus(base)) {
    Path pathname = status.getPath();
    for (FileStatus logfile : fs.listStatus(pathname)) {
    Path logFilePath = logfile.getPath();
    Matcher m = fileNamePattern.matcher(logFilePath.getName());
    if (m.matches()) {
    String dateString = m.group(2);
    Date logDate = df.parse(dateString);
    if ((logDate.equals(startDate) || logDate.after(startDate))
    && logDate.before(endDate)) {
    FileInputFormat.addInputPath(conf, logFilePath);
    } else {
    //System.out.println("Ignoring file: " +
    logFilePath.getName());
    //System.out.println("Start Date: " + startDate + ", End
    Date: " + endDate + ", Log date: " + logDate);
    }
    } else {
    System.out.println("Ignoring file: " +
    logFilePath.getName());
    }
    }
    }

    On Jun 2, 2009, at 6:22 PM, Aaron Kimball wrote:

    There is no technical limit that prevents Hadoop from operating in
    this
    fashion; it's simply the case that the included InputFormat
    implementations
    do not do so. This behavior has been set in this fashion for a long
    time, so
    it's unlikely that it will change soon, as that might break existing
    applications.

    But you can write your own subclass of TextInputFormat or
    SequenceFileInputFormat that overrides the getSplits() method to
    recursively
    descend through directories and search for files.

    - Aaron

    On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch
    wrote:
    As per a previous list question (
    http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3Ce75c02ef0804011433x144813e6x2450da7883de3aca@mail.gmail.com%3E)
    it looks as though it's not possible for hadoop to traverse input
    directories recursively in order to discover input files.

    Just wondering a) if there's any particular reason why this
    functionality
    doesn't exist, and b) if not, if there's any workaround/hack to
    make it
    possible.

    Like the OP, I was thinking it would be helpful to partition my
    input data
    by year, month, and day. I figured his would enable me to run jobs
    against
    specific date ranges of input data, and thereby speed up the
    execution of my
    jobs since they wouldn't have to process every single record.

    Any way to make this happen? (Or am I totally going about this the
    wrong
    way for what I'm trying to achieve?)

    TIA,

    DR
  • Ian Soboroff at Jun 4, 2009 at 3:06 pm
    Here's how I solved the problem using a custom InputFormat... the key
    part is in listStatus(), where we traverse the directory tree. Since
    HDFS doesn't have links this code is probably safe, but if you have a
    filesystem with cycles you will get trapped.

    Ian

    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.ArrayDeque;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.InvalidInputException;
    import org.apache.hadoop.mapred.LineRecordReader;

    public class TrecWebInputFormat extends FileInputFormat<DocLocation, Text> {
    @Override
    public boolean isSplitable(FileSystem fs, Path filename) {
    return false;
    }

    @Override
    public RecordReader<DocLocation, Text>
    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
    throws IOException {
    return new TrecWebRecordReader(job, (FileSplit)split);
    }

    // The following are incomprehensibly private in FileInputFormat...
    private static final PathFilter hiddenFileFilter = new PathFilter(){
    public boolean accept(Path p){
    String name = p.getName();
    return !name.startsWith("_") && !name.startsWith(".");
    }
    };

    /**
    * Proxy PathFilter that accepts a path only if all filters given in the
    * constructor do. Used by the listPaths() to apply the built-in
    * hiddenFileFilter together with a user provided one (if any).
    */
    private static class MultiPathFilter implements PathFilter {
    private List<PathFilter> filters;

    public MultiPathFilter(List<PathFilter> filters) {
    this.filters = filters;
    }

    public boolean accept(Path path) {
    for (PathFilter filter : filters) {
    if (!filter.accept(path)) {
    return false;
    }
    }
    return true;
    }
    }


    @Override
    protected FileStatus[] listStatus(JobConf job)
    throws IOException {
    Path[] dirs = getInputPaths(job);
    if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
    }

    List<FileStatus> result = new ArrayList<FileStatus>();
    List<IOException> errors = new ArrayList<IOException>();
    ArrayDeque<FileStatus> stats = new ArrayDeque<FileStatus>(dirs.length);

    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
    filters.add(jobFilter);
    }
    PathFilter inputFilter = new MultiPathFilter(filters);

    // Set up traversal from input paths, which may be globs
    for (Path p: dirs) {
    FileSystem fs = p.getFileSystem(job);
    FileStatus[] matches = fs.globStatus(p, inputFilter);
    if (matches == null) {
    errors.add(new IOException("Input path does not exist: " + p));
    } else if (matches.length == 0) {
    errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
    } else {
    for (FileStatus globStat: matches) {
    stats.add(globStat);
    }
    }
    }

    while (!stats.isEmpty()) {
    FileStatus stat = stats.pop();
    if (stat.isDir()) {
    FileSystem fs = stat.getPath().getFileSystem(job);
    for (FileStatus sub: fs.listStatus(stat.getPath(),
    inputFilter)) {
    stats.push(sub);
    }
    } else {
    result.add(stat);
    }
    }

    if (!errors.isEmpty()) {
    throw new InvalidInputException(errors);
    }
    LOG.info("Total input paths to process : " + result.size());
    return result.toArray(new FileStatus[result.size()]);
    }


    public static class TrecWebRecordReader
    implements RecordReader<DocLocation, Text> {
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long end;
    private long pos;
    private Path file;
    private LineRecordReader.LineReader in;

    public TrecWebRecordReader(JobConf job, FileSplit split)
    throws IOException {
    file = split.getPath();
    start = 0;
    end = split.getLength();
    compressionCodecs = new CompressionCodecFactory(job);
    CompressionCodec codec = compressionCodecs.getCodec(file);

    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(file);

    if (codec != null) {
    in = new LineRecordReader.LineReader(codec.createInputStream(fileIn), job);
    } else {
    in = new LineRecordReader.LineReader(fileIn, job);
    }
    pos = 0;
    }

    public DocLocation createKey() {
    return new DocLocation();
    }
    public Text createValue() {
    return new Text();
    }

    public synchronized boolean next(DocLocation key, Text value)
    throws IOException {
    Text line = new Text();
    StringBuilder buf = new StringBuilder();
    boolean in_doc = false;

    try {
    while (true) {
    int size = in.readLine(line);
    if (size <= 0)
    break;
    pos += size;
    if (!in_doc && line.find("<DOC>") >= 0) {
    in_doc = true;
    key.offset = pos;
    key.filename = file.toString();
    }
    if (in_doc) {
    buf.append(line.toString()).append("\n");
    if (line.find("</DOC>") >= 0) {
    in_doc = false;
    break;
    }
    }
    }
    } catch (java.io.EOFException e) {
    }

    if (buf.length() > 0) {
    value.set(buf.toString());
    key.length = value.getLength();
    return true;
    } else {
    return false;
    }
    }

    public synchronized long getPos() throws IOException {
    return pos;
    }

    public float getProgress() {
    return Math.min(1.0f, (pos) / (float)(end));
    }

    public synchronized void close() throws IOException {
    if (in != null) {
    in.close();
    }
    }
    }
    }
  • Brian Bockelman at Jun 3, 2009 at 12:35 am
    Hey Aaron,

    I had a similar problem. I have log files arranged in the following
    fashion:

    /logs/<hostname>/<date>.log

    I want to analyze a range of dates for all hosts. What I did was
    write into my driver class a subroutine that descends through the HDFS
    file system starting at /logs and builds a list of input files, then
    fed the list of files to the framework.

    Example code below.

    Brian

    FileSystem fs = FileSystem.get(conf);
    Pattern fileNamePattern = Pattern.compile(".*datanode-(.*).log.
    ([0-9]+-[0-9]+-[0-9]+)");
    for (FileStatus status : fs.listStatus(base)) {
    Path pathname = status.getPath();
    for (FileStatus logfile : fs.listStatus(pathname)) {
    Path logFilePath = logfile.getPath();
    Matcher m = fileNamePattern.matcher(logFilePath.getName());
    if (m.matches()) {
    String dateString = m.group(2);
    Date logDate = df.parse(dateString);
    if ((logDate.equals(startDate) || logDate.after(startDate))
    && logDate.before(endDate)) {
    FileInputFormat.addInputPath(conf, logFilePath);
    } else {
    //System.out.println("Ignoring file: " +
    logFilePath.getName());
    //System.out.println("Start Date: " + startDate + ", End
    Date: " + endDate + ", Log date: " + logDate);
    }
    } else {
    System.out.println("Ignoring file: " +
    logFilePath.getName());
    }
    }
    }

    On Jun 2, 2009, at 6:22 PM, Aaron Kimball wrote:

    There is no technical limit that prevents Hadoop from operating in
    this
    fashion; it's simply the case that the included InputFormat
    implementations
    do not do so. This behavior has been set in this fashion for a long
    time, so
    it's unlikely that it will change soon, as that might break existing
    applications.

    But you can write your own subclass of TextInputFormat or
    SequenceFileInputFormat that overrides the getSplits() method to
    recursively
    descend through directories and search for files.

    - Aaron

    On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch
    wrote:
    As per a previous list question (
    http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3Ce75c02ef0804011433x144813e6x2450da7883de3aca@mail.gmail.com%3E)
    it looks as though it's not possible for hadoop to traverse input
    directories recursively in order to discover input files.

    Just wondering a) if there's any particular reason why this
    functionality
    doesn't exist, and b) if not, if there's any workaround/hack to
    make it
    possible.

    Like the OP, I was thinking it would be helpful to partition my
    input data
    by year, month, and day. I figured his would enable me to run jobs
    against
    specific date ranges of input data, and thereby speed up the
    execution of my
    jobs since they wouldn't have to process every single record.

    Any way to make this happen? (Or am I totally going about this the
    wrong
    way for what I'm trying to achieve?)

    TIA,

    DR
  • David Rosenstrauch at Jun 3, 2009 at 2:26 pm
    OK, thanks for the pointer.

    If I wind up rolling our own code to handle this I'll make sure to
    contribute it.

    DR

    Aaron Kimball wrote:
    There is no technical limit that prevents Hadoop from operating in this
    fashion; it's simply the case that the included InputFormat implementations
    do not do so. This behavior has been set in this fashion for a long time, so
    it's unlikely that it will change soon, as that might break existing
    applications.

    But you can write your own subclass of TextInputFormat or
    SequenceFileInputFormat that overrides the getSplits() method to recursively
    descend through directories and search for files.

    - Aaron
    On Tue, Jun 2, 2009 at 1:22 PM, David Rosenstrauch wrote:

    As per a previous list question (
    http://mail-archives.apache.org/mod_mbox/hadoop-core-user/200804.mbox/%3Ce75c02ef0804011433x144813e6x2450da7883de3aca@mail.gmail.com%3E)
    it looks as though it's not possible for hadoop to traverse input
    directories recursively in order to discover input files.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedJun 2, '09 at 8:22p
activeJun 4, '09 at 3:06p
posts7
users4
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase