FAQ
I am struggling to control the behavior of the framework. The first
problem is simple: I want to run many simultaneous mapper tasks on each
node. I've scoured the forums, done the obvious, and I still typically get
only 2 tasks per node at execution time. If it is a big job, sometimes I
see 3. Note that the administrator reports 40 Tasks/Node in the config,
but the most I've ever seen running is 3 (and this with a single input file
of 10,000 records, magically yielding 443 maps).

And magically is the next issue. I want to fine tune control the
InputFile, Input # records, to maps relationship. For my immediate
problem, I want to use a single input file with a number of records
yielding the exact same number of maps (all kicked off simultaneously BTW).
Since I did not get this behavior with the standard InputFileFormat, I
created my own input format class and record reader, and am now getting the
"1 file with n recs to nmaps" relationship.... but the problem is that I am
not even sure why....

Any guidance appreciated.

Search Discussions

  • Arun C Murthy at Oct 17, 2007 at 8:06 am
    Lance,
    On Tue, Oct 16, 2007 at 11:27:54PM -0700, Lance Amundsen wrote:

    I am struggling to control the behavior of the framework. The first
    problem is simple: I want to run many simultaneous mapper tasks on each
    node. I've scoured the forums, done the obvious, and I still typically get
    only 2 tasks per node at execution time. If it is a big job, sometimes I
    see 3. Note that the administrator reports 40 Tasks/Node in the config,
    but the most I've ever seen running is 3 (and this with a single input file
    of 10,000 records, magically yielding 443 maps).

    And magically is the next issue. I want to fine tune control the
    InputFile, Input # records, to maps relationship. For my immediate
    problem, I want to use a single input file with a number of records
    yielding the exact same number of maps (all kicked off simultaneously BTW).
    Since I did not get this behavior with the standard InputFileFormat, I
    created my own input format class and record reader, and am now getting the
    "1 file with n recs to nmaps" relationship.... but the problem is that I am
    not even sure why....
    I'm in the process of documenting these better (http://issues.apache.org/jira/browse/HADOOP-2046), meanwhile here are some pointers:
    http://wiki.apache.org/lucene-hadoop/HowManyMapsAndReduces
    and
    http://wiki.apache.org/lucene-hadoop/FAQ#10

    Hope this helps...

    Arun
    Any guidance appreciated.
  • Lance Amundsen at Oct 17, 2007 at 5:38 pm
    That info certainly gives me the ability to eliminate splitting, but fine
    control when splitting is in play remains a mystery. Is it possible to
    control at the level of Input(nFiles, mSplits) -> MapperInvocations( yMaps,
    xRecordsPerMap)?

    Consider the following examples (and pls. verify my conclusions):

    1 file per map, 1 record per file, isSplitable(true or false): yields 1
    record per mapper
    1 file total, n records, isSplitable(true): Yields variable n records per
    variable m mappers
    1 file total, n records, isSplitable(false): Yields n records into 1
    mapper

    What I am immediately looking for is a way to do:

    1 file total, n records, isSplitable(true): Yields 1 record into n mappers

    But ultimately need to control fully control the file/record distributions.



    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Arun C Murthy
    <arunc@yahoo-inc.
    com> To
    hadoop-user@lucene.apache.org
    10/17/2007 01:05 cc
    AM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    Lance,
    On Tue, Oct 16, 2007 at 11:27:54PM -0700, Lance Amundsen wrote:

    I am struggling to control the behavior of the framework. The first
    problem is simple: I want to run many simultaneous mapper tasks on each
    node. I've scoured the forums, done the obvious, and I still typically get
    only 2 tasks per node at execution time. If it is a big job, sometimes I
    see 3. Note that the administrator reports 40 Tasks/Node in the config,
    but the most I've ever seen running is 3 (and this with a single input file
    of 10,000 records, magically yielding 443 maps).

    And magically is the next issue. I want to fine tune control the
    InputFile, Input # records, to maps relationship. For my immediate
    problem, I want to use a single input file with a number of records
    yielding the exact same number of maps (all kicked off simultaneously BTW).
    Since I did not get this behavior with the standard InputFileFormat, I
    created my own input format class and record reader, and am now getting the
    "1 file with n recs to nmaps" relationship.... but the problem is that I am
    not even sure why....
    I'm in the process of documenting these better (
    http://issues.apache.org/jira/browse/HADOOP-2046), meanwhile here are some
    pointers:
    http://wiki.apache.org/lucene-hadoop/HowManyMapsAndReduces
    and
    http://wiki.apache.org/lucene-hadoop/FAQ#10

    Hope this helps...

    Arun
    Any guidance appreciated.
  • Ted Dunning at Oct 17, 2007 at 6:36 pm

    On 10/17/07 10:37 AM, "Lance Amundsen" wrote:

    1 file per map, 1 record per file, isSplitable(true or false): yields 1
    record per mapper Yes.
    1 file total, n records, isSplitable(true): Yields variable n records per
    variable m mappers Yes.
    1 file total, n records, isSplitable(false): Yields n records into 1
    mapper Yes.
    What I am immediately looking for is a way to do:

    1 file total, n records, isSplitable(true): Yields 1 record into n mappers

    But ultimately need to control fully control the file/record distributions.
    Why in the world do you need this level of control? Isn't that the point of
    frameworks like Hadoop? (to avoid the need for this)
  • Lance Amundsen at Oct 17, 2007 at 6:47 pm
    For right now, I am testing boundary conditions related to startup costs.
    I want to build a mapper interface that performs relatively flatly WRT
    numbers of mappers. My goal is to dramatically improve startup costs for
    one mapper, and then make sure that that startup cost does not increase
    dramatically as node, maps, and records are increased.

    Example: let's say I have 10K one second jobs and I want the whole thing to
    run 2 seconds. I currently see no way for Hadoop to achieve this, But I
    also see how to get there, and this level of granularity would be one of
    the requirements..... I believe.

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Ted Dunning
    <tdunning@veoh.co
    m> To
    <hadoop-user@lucene.apache.org>
    10/17/2007 11:34 cc
    AM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org










    On 10/17/07 10:37 AM, "Lance Amundsen" wrote:

    1 file per map, 1 record per file, isSplitable(true or false): yields 1
    record per mapper Yes.
    1 file total, n records, isSplitable(true): Yields variable n records per
    variable m mappers Yes.
    1 file total, n records, isSplitable(false): Yields n records into 1
    mapper Yes.
    What I am immediately looking for is a way to do:

    1 file total, n records, isSplitable(true): Yields 1 record into n mappers
    But ultimately need to control fully control the file/record
    distributions.

    Why in the world do you need this level of control? Isn't that the point
    of
    frameworks like Hadoop? (to avoid the need for this)
  • Ted Dunning at Oct 17, 2007 at 7:49 pm
    In practice, most jobs involve many more records than there are available
    mappers (even for large clusters).

    This means every mapper handles many records and mapper startup is amortized
    pretty widely.

    It would still be nice to have a smaller startup cost, but the limiting
    factor is likely to be the job tracker spitting all of the jar files to the
    task trackers, not actually the map construction time.

    If you really care about map instantiation time, you could start by making
    the map be in the same VM. That doesn't sound like a good trade-off to me
    which in turn tells me that I don't care about startup costs so much.

    It is not all that surprising if small jobs not something that can be sped
    up. The fact that parallelism is generally easier to attain for large
    problems has been noticed for some time.

    On 10/17/07 11:47 AM, "Lance Amundsen" wrote:

    For right now, I am testing boundary conditions related to startup costs.
    I want to build a mapper interface that performs relatively flatly WRT
    numbers of mappers. My goal is to dramatically improve startup costs for
    one mapper, and then make sure that that startup cost does not increase
    dramatically as node, maps, and records are increased.

    Example: let's say I have 10K one second jobs and I want the whole thing to
    run 2 seconds. I currently see no way for Hadoop to achieve this, But I
    also see how to get there, and this level of granularity would be one of
    the requirements..... I believe.

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Ted Dunning
    <tdunning@veoh.co
    m> To
    <hadoop-user@lucene.apache.org>
    10/17/2007 11:34 cc
    AM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org










    On 10/17/07 10:37 AM, "Lance Amundsen" wrote:

    1 file per map, 1 record per file, isSplitable(true or false): yields 1
    record per mapper Yes.
    1 file total, n records, isSplitable(true): Yields variable n records per
    variable m mappers Yes.
    1 file total, n records, isSplitable(false): Yields n records into 1
    mapper Yes.
    What I am immediately looking for is a way to do:

    1 file total, n records, isSplitable(true): Yields 1 record into n mappers
    But ultimately need to control fully control the file/record
    distributions.

    Why in the world do you need this level of control? Isn't that the point
    of
    frameworks like Hadoop? (to avoid the need for this)

  • Lance Amundsen at Oct 18, 2007 at 10:06 pm
    There's lots of references on decreasing DFS block size to increase maps to
    record ratios. What is the easiest way to do this? Is it possible with
    the standard SequenceFile class?

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Ted Dunning
    <tdunning@veoh.co
    m> To
    <hadoop-user@lucene.apache.org>
    10/17/2007 12:49 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org









    In practice, most jobs involve many more records than there are available
    mappers (even for large clusters).

    This means every mapper handles many records and mapper startup is
    amortized
    pretty widely.

    It would still be nice to have a smaller startup cost, but the limiting
    factor is likely to be the job tracker spitting all of the jar files to the
    task trackers, not actually the map construction time.

    If you really care about map instantiation time, you could start by making
    the map be in the same VM. That doesn't sound like a good trade-off to me
    which in turn tells me that I don't care about startup costs so much.

    It is not all that surprising if small jobs not something that can be sped
    up. The fact that parallelism is generally easier to attain for large
    problems has been noticed for some time.

    On 10/17/07 11:47 AM, "Lance Amundsen" wrote:

    For right now, I am testing boundary conditions related to startup costs.
    I want to build a mapper interface that performs relatively flatly WRT
    numbers of mappers. My goal is to dramatically improve startup costs for
    one mapper, and then make sure that that startup cost does not increase
    dramatically as node, maps, and records are increased.

    Example: let's say I have 10K one second jobs and I want the whole thing to
    run 2 seconds. I currently see no way for Hadoop to achieve this, But I
    also see how to get there, and this level of granularity would be one of
    the requirements..... I believe.

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Ted Dunning
    <tdunning@veoh.co
    m> To
    <hadoop-user@lucene.apache.org>
    10/17/2007 11:34 cc
    AM Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org










    On 10/17/07 10:37 AM, "Lance Amundsen" wrote:

    1 file per map, 1 record per file, isSplitable(true or false): yields 1
    record per mapper Yes.
    1 file total, n records, isSplitable(true): Yields variable n records per
    variable m mappers Yes.
    1 file total, n records, isSplitable(false): Yields n records into 1
    mapper Yes.
    What I am immediately looking for is a way to do:

    1 file total, n records, isSplitable(true): Yields 1 record into n mappers
    But ultimately need to control fully control the file/record
    distributions.

    Why in the world do you need this level of control? Isn't that the point
    of
    frameworks like Hadoop? (to avoid the need for this)

  • Doug Cutting at Oct 18, 2007 at 10:22 pm

    Lance Amundsen wrote:
    There's lots of references on decreasing DFS block size to increase maps to
    record ratios. What is the easiest way to do this? Is it possible with
    the standard SequenceFile class?
    You could specify the block size in the Configuration parameter to
    SequenceFile#createWriter() using the dfs.block.size parameter. But if
    you simply want to create sub-block-size splits, then increasing the
    number of map tasks should do that.

    Doug
  • Lance Amundsen at Oct 18, 2007 at 10:40 pm
    Thx, I'll give that a try. Seems to me a method to tell hadoop to split a
    file every "n" key/value pairs would be logical. Or maybe a
    createSplitBoundary when appending key/value records?

    I just want a way, and not a real complex way, of directing the # of maps
    and the breakdown of records going to them. Creating a separate file per
    record group is too slow for my purposes.

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Doug Cutting
    <cutting@apache.o
    rg> To
    hadoop-user@lucene.apache.org
    10/18/2007 03:21 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    Lance Amundsen wrote:
    There's lots of references on decreasing DFS block size to increase maps to
    record ratios. What is the easiest way to do this? Is it possible with
    the standard SequenceFile class?
    You could specify the block size in the Configuration parameter to
    SequenceFile#createWriter() using the dfs.block.size parameter. But if
    you simply want to create sub-block-size splits, then increasing the
    number of map tasks should do that.

    Doug
  • Doug Cutting at Oct 18, 2007 at 11:05 pm

    Lance Amundsen wrote:
    Thx, I'll give that a try. Seems to me a method to tell hadoop to split a
    file every "n" key/value pairs would be logical. Or maybe a
    createSplitBoundary when appending key/value records?
    Splits should not require examining the data: that's not scalable. So
    they're instead on arbitrary byte boundaries.
    I just want a way, and not a real complex way, of directing the # of maps
    and the breakdown of records going to them. Creating a separate file per
    record group is too slow for my purposes.
    Just set the number of map tasks. That should mostly do what you want
    in this case. If you want finer-grained control, implement your own
    InputFormat.

    Doug
  • Lance Amundsen at Oct 19, 2007 at 12:08 am
    You said arbitrary.. maybe I missed something. Can I construct a
    getSplits() method that chunks up the file however I want? I assumed I
    needed to return a split map that corresponded to key, value boundaries,
    which I am having troubling doing since the input format for the value can
    change (I am now using ObjectWritable for example.... GenericWritable makes
    this impossible I think). Then there is the file header to take into
    account..... but if I am making this too complicated, let me know

    Setting the number of maps as a method to force record/file/mapper
    relationships appears dicey at best. This is the sort of stuff I am seeing
    in my 9 node setup:

    1 file, 1000 records, 1000 maps requested yields 43 actual maps
    1 file, 10,000 records, 10,000 maps requested yields 430 actual maps

    In all of these cases I can only get 2 task/node running at the same
    time.... once in a while 3 run.... even though I have specified a higher
    number to be allowed.

    I want 1 map per record, from one file, for any number of records, and I
    want it guaranteed. Later I may want 10 records, or a 100, but now I right
    now I want to force a one record per mapper relationship, an I do not want
    to pay the file creation overhead of, say 1000 files, just to get 1000
    maps.

    BTW, I have started working on my own InputFormat and InputFileClasses as
    well.... and these questions are helping me with context, but tactically I
    am just trying to understand the Hadoop mapper startup overhead with the
    goals of a) reducing it, and b) making the overhead stay constant (flat)
    out to n mappers on m nodes.


    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Doug Cutting
    <cutting@apache.o
    rg> To
    hadoop-user@lucene.apache.org
    10/18/2007 04:04 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    Lance Amundsen wrote:
    Thx, I'll give that a try. Seems to me a method to tell hadoop to split a
    file every "n" key/value pairs would be logical. Or maybe a
    createSplitBoundary when appending key/value records?
    Splits should not require examining the data: that's not scalable. So
    they're instead on arbitrary byte boundaries.
    I just want a way, and not a real complex way, of directing the # of maps
    and the breakdown of records going to them. Creating a separate file per
    record group is too slow for my purposes.
    Just set the number of map tasks. That should mostly do what you want
    in this case. If you want finer-grained control, implement your own
    InputFormat.

    Doug
  • Doug Cutting at Oct 19, 2007 at 12:57 am

    Lance Amundsen wrote:
    I want 1 map per record, from one file, for any number of records, and I
    want it guaranteed.
    Then you need to write your own InputFormat implementation. The default
    implementations are designed to optimize for input i/o, which is not
    your concern.

    Doug
  • Owen O'Malley at Oct 19, 2007 at 4:46 am

    On Oct 18, 2007, at 5:04 PM, Lance Amundsen wrote:

    You said arbitrary.. maybe I missed something. Can I construct a
    getSplits() method that chunks up the file however I want?
    Yes. The application specifies an InputFormat class, which has a
    getSplits method that returns a list of InputSplits. The "standard"
    input formats extends FileInputFormat, which has the behavior we have
    been describing. However, your InputFormat can generate InputSplits
    however it wants. For an example of an unusual variation, look at the
    RandomWriter example. It creates inputs splits that aren't based on
    any files at all. It just creates a split for each map that it wants.
    I assumed I
    needed to return a split map that corresponded to key, value
    boundaries,
    SequenceFileInputFormat and TextInputFormat don't need the splits to
    match the record boundaries. They both start at the first record
    after the split's start offset and continue to the next record after
    the split's end. TextInputFormat always treats records as "/n" and
    SequenceFile uses constant blocks of bytes "sync markers" to find
    record boundaries.
    1 file, 1000 records, 1000 maps requested yields 43 actual maps
    1 file, 10,000 records, 10,000 maps requested yields 430 actual maps
    I don't understand how this is happening. What is the data size,
    block size, and minimum split size in your job.
    In all of these cases I can only get 2 task/node running at the same
    time.... once in a while 3 run.... even though I have specified a
    higher
    number to be allowed.
    Are you maps finishing quickly (< 20 seconds)?
    I want 1 map per record, from one file, for any number of records,
    and I
    want it guaranteed. Later I may want 10 records, or a 100, but now
    I right
    now I want to force a one record per mapper relationship, an I do
    not want
    to pay the file creation overhead of, say 1000 files, just to get 1000
    maps.
    That is completely doable. Although to make it perform well, you
    either need an index from row number to file offset or fixed width
    records... In any case, you'll need to write your own InputFormat.

    -- Owen
  • Lance Amundsen at Oct 23, 2007 at 12:56 am
    OK, I spent forever playing with over-riding SequenceFileInputFormat
    behavior, and attempting my own completely different input format
    (extending SFIF)... but I finally just decided to download tha Hadoop
    source and see exactly what the heck it is doing. It turns out that there
    is a constant value in SequnceFile of SYNC_INTERVAL and that the SFIF
    constructor calls setMinSplitSize with this value (2000). So getting a
    split size less than 2000 was impossible... so I just hard coded a
    splitsize equal to my record size in FileInputFormat and now I am getting
    exactly what I want, 1 map invocation per record per "one" input file.

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).

    Any comments appreciated.... searching the code now.

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    "Owen O'Malley"
    <oom@yahoo-inc.co
    m> To
    hadoop-user@lucene.apache.org
    10/18/2007 09:44 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    On Oct 18, 2007, at 5:04 PM, Lance Amundsen wrote:

    You said arbitrary.. maybe I missed something. Can I construct a
    getSplits() method that chunks up the file however I want?
    Yes. The application specifies an InputFormat class, which has a
    getSplits method that returns a list of InputSplits. The "standard"
    input formats extends FileInputFormat, which has the behavior we have
    been describing. However, your InputFormat can generate InputSplits
    however it wants. For an example of an unusual variation, look at the
    RandomWriter example. It creates inputs splits that aren't based on
    any files at all. It just creates a split for each map that it wants.
    I assumed I
    needed to return a split map that corresponded to key, value
    boundaries,
    SequenceFileInputFormat and TextInputFormat don't need the splits to
    match the record boundaries. They both start at the first record
    after the split's start offset and continue to the next record after
    the split's end. TextInputFormat always treats records as "/n" and
    SequenceFile uses constant blocks of bytes "sync markers" to find
    record boundaries.
    1 file, 1000 records, 1000 maps requested yields 43 actual maps
    1 file, 10,000 records, 10,000 maps requested yields 430 actual maps
    I don't understand how this is happening. What is the data size,
    block size, and minimum split size in your job.
    In all of these cases I can only get 2 task/node running at the same
    time.... once in a while 3 run.... even though I have specified a
    higher
    number to be allowed.
    Are you maps finishing quickly (< 20 seconds)?
    I want 1 map per record, from one file, for any number of records,
    and I
    want it guaranteed. Later I may want 10 records, or a 100, but now
    I right
    now I want to force a one record per mapper relationship, an I do
    not want
    to pay the file creation overhead of, say 1000 files, just to get 1000
    maps.
    That is completely doable. Although to make it perform well, you
    either need an index from row number to file offset or fixed width
    records... In any case, you'll need to write your own InputFormat.

    -- Owen
  • Michael Bieniosek at Oct 23, 2007 at 1:03 am
    You can tune number of map tasks/node with the config variable "mapred.tasktracker.tasks.maximum" on the jobtracker (there is a patch to make it configurable on the tasktracker: see https://issues.apache.org/jira/browse/HADOOP-1245).

    -Michael

    On 10/22/07 5:53 PM, "Lance Amundsen" wrote:

    OK, I spent forever playing with over-riding SequenceFileInputFormat
    behavior, and attempting my own completely different input format
    (extending SFIF)... but I finally just decided to download tha Hadoop
    source and see exactly what the heck it is doing. It turns out that there
    is a constant value in SequnceFile of SYNC_INTERVAL and that the SFIF
    constructor calls setMinSplitSize with this value (2000). So getting a
    split size less than 2000 was impossible... so I just hard coded a
    splitsize equal to my record size in FileInputFormat and now I am getting
    exactly what I want, 1 map invocation per record per "one" input file.

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).

    Any comments appreciated.... searching the code now.

    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    "Owen O'Malley"
    <oom@yahoo-inc.co
    m> To
    hadoop-user@lucene.apache.org
    10/18/2007 09:44 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    On Oct 18, 2007, at 5:04 PM, Lance Amundsen wrote:

    You said arbitrary.. maybe I missed something. Can I construct a
    getSplits() method that chunks up the file however I want?
    Yes. The application specifies an InputFormat class, which has a
    getSplits method that returns a list of InputSplits. The "standard"
    input formats extends FileInputFormat, which has the behavior we have
    been describing. However, your InputFormat can generate InputSplits
    however it wants. For an example of an unusual variation, look at the
    RandomWriter example. It creates inputs splits that aren't based on
    any files at all. It just creates a split for each map that it wants.
    I assumed I
    needed to return a split map that corresponded to key, value
    boundaries,
    SequenceFileInputFormat and TextInputFormat don't need the splits to
    match the record boundaries. They both start at the first record
    after the split's start offset and continue to the next record after
    the split's end. TextInputFormat always treats records as "/n" and
    SequenceFile uses constant blocks of bytes "sync markers" to find
    record boundaries.
    1 file, 1000 records, 1000 maps requested yields 43 actual maps
    1 file, 10,000 records, 10,000 maps requested yields 430 actual maps
    I don't understand how this is happening. What is the data size,
    block size, and minimum split size in your job.
    In all of these cases I can only get 2 task/node running at the same
    time.... once in a while 3 run.... even though I have specified a
    higher
    number to be allowed.
    Are you maps finishing quickly (< 20 seconds)?
    I want 1 map per record, from one file, for any number of records,
    and I
    want it guaranteed. Later I may want 10 records, or a 100, but now
    I right
    now I want to force a one record per mapper relationship, an I do
    not want
    to pay the file creation overhead of, say 1000 files, just to get 1000
    maps.
    That is completely doable. Although to make it perform well, you
    either need an index from row number to file offset or fixed width
    records... In any case, you'll need to write your own InputFormat.

    -- Owen
  • Ted Dunning at Oct 23, 2007 at 2:30 am
    You probably have determined by now that there is a parameter that
    determines how many concurrent maps there are.

    <property>
    <name>mapred.tasktracker.tasks.maximum</name>
    <value>3</value>
    <description>The maximum number of tasks that will be run
    simultaneously by a task tracker.
    </description>
    </property>

    Btw... I am still curious about your approach. Isn't it normally better to
    measure marginal costs such as this startup cost by linear regression as you
    change parameters? It seems that otherwise, you will likely be mislead by
    what happens at the boundaries when what you really want it what happens in
    the normal operating region.



    On 10/22/07 5:53 PM, "Lance Amundsen" wrote:

    ...

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).
  • Lance Amundsen at Oct 23, 2007 at 3:34 am
    What I am trying to do is see what it would take to modify the Hadoop
    framework for transaction based processing. So right now I am just trying
    to get to the point where I can start looking at the hard stuff. I am
    still blocked by simple control at this point. The workload I am trying to
    measure at this point is nothing more than a print statement... ie.
    nothing. Startup costs currently are linear WRT nodes, dependent on Input
    model. I first need to find or create a model that has a flat startup cost
    for n nodes, at which point I can start to tackle the actual pathlength and
    latency issues of the startup itself.

    This is all just investigative at this point, but I can already envision
    changes that would allow process startup and takedown in less than 1 sec
    and be nearly flatly growing as nodes increase.





    Ted Dunning
    <tdunning@veoh.co
    m> To
    <hadoop-user@lucene.apache.org>
    10/22/2007 07:29 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org









    You probably have determined by now that there is a parameter that
    determines how many concurrent maps there are.

    <property>
    <name>mapred.tasktracker.tasks.maximum</name>
    <value>3</value>
    <description>The maximum number of tasks that will be run
    simultaneously by a task tracker.
    </description>
    </property>

    Btw... I am still curious about your approach. Isn't it normally better to
    measure marginal costs such as this startup cost by linear regression as
    you
    change parameters? It seems that otherwise, you will likely be mislead by
    what happens at the boundaries when what you really want it what happens in
    the normal operating region.



    On 10/22/07 5:53 PM, "Lance Amundsen" wrote:

    ...

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).
  • Owen O'Malley at Oct 19, 2007 at 2:50 am

    On Oct 18, 2007, at 3:30 PM, Lance Amundsen wrote:

    Thx, I'll give that a try. Seems to me a method to tell hadoop to
    split a
    file every "n" key/value pairs would be logical. Or maybe a
    createSplitBoundary when appending key/value records?
    The problem is that the split generator doesn't want to read the data
    files. So it picks byte ranges as a reasonable proxy. I know of some
    applications that have custom input formats that use md5 ranges as
    input splits and read multiple files for each split. You could
    equivalently use rows, as long as you had an index.

    -- Owen
  • Doug Cutting at Oct 18, 2007 at 10:16 pm

    Lance Amundsen wrote:
    Example: let's say I have 10K one second jobs and I want the whole thing to
    run 2 seconds. I currently see no way for Hadoop to achieve this,
    That's right. That has not been a design goal to date. Tasks are
    typically expected to last at least several seconds. To fix this we'd
    need to permit a single JVM to handle a sequence of tasks, which would
    be possible. Currently tasks are always run in a separate JVM since
    they load user code and may fail in arbitrary ways. So if someone
    wishes to implement this, it should probably be optional.

    Doug
  • Arun C Murthy at Oct 17, 2007 at 7:38 pm
    Lance,
    On Wed, Oct 17, 2007 at 10:37:58AM -0700, Lance Amundsen wrote:
    That info certainly gives me the ability to eliminate splitting, but fine
    control when splitting is in play remains a mystery. Is it possible to
    control at the level of Input(nFiles, mSplits) -> MapperInvocations( yMaps,
    xRecordsPerMap)?
    Consider the following examples (and pls. verify my conclusions):

    1 file per map, 1 record per file, isSplitable(true or false): yields 1
    record per mapper
    1 file total, n records, isSplitable(true): Yields variable n records per
    variable m mappers
    1 file total, n records, isSplitable(false): Yields n records into 1
    mapper
    Correct, all of them.
    What I am immediately looking for is a way to do:

    1 file total, n records, isSplitable(true): Yields 1 record into n mappers
    There is a slightly obscure reference to it there, anyway I've just uploaded a documentation patch at HADOOP-2046 which should help.

    Specifically the updated documentation for JobConf.setNumMapTasks (for now, in the patch only) reads:

    -*-*-

    *setNumMapTasks*

    public void setNumMapTasks(int n)

    Set the number of map tasks for this job.

    Note: This is only a hint to the framework. The actual number of spawned map tasks depends on the number of InputSplits generated by the job's InputFormat.getSplits(JobConf, int). A custom InputFormat is typically used to accurately control the number of map tasks for the job.
    How many maps?

    The number of maps is usually driven by the total size of the inputs i.e. total number of HDFS blocks of the input files.

    The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 or so for very cpu-light map tasks. Task setup takes awhile, so it is best if the maps take at least a minute to execute.

    The default InputFormat behavior is to split the total number of bytes into the right number of fragments. However, the HDFS block size of the input files is treated as an upper bound for input splits.
    A lower bound on the split size can be set via mapred.min.split.size.

    Thus, if you expect 10TB of input data and have 128MB HDFS blocks, you'll end up with 82,000 maps, unless setNumMapTasks(int) is used to set it even higher.

    Parameters:
    n - the number of map tasks for this job.

    -*-*-

    hth,
    Arun
    But ultimately need to control fully control the file/record distributions.



    Lance

    IBM Software Group - Strategy
    Performance Architect
    High-Performance On Demand Solutions (HiPODS)

    650-678-8425 cell





    Arun C Murthy
    <arunc@yahoo-inc.
    com> To
    hadoop-user@lucene.apache.org
    10/17/2007 01:05 cc
    AM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    Lance,
    On Tue, Oct 16, 2007 at 11:27:54PM -0700, Lance Amundsen wrote:

    I am struggling to control the behavior of the framework. The first
    problem is simple: I want to run many simultaneous mapper tasks on each
    node. I've scoured the forums, done the obvious, and I still typically get
    only 2 tasks per node at execution time. If it is a big job, sometimes I
    see 3. Note that the administrator reports 40 Tasks/Node in the config,
    but the most I've ever seen running is 3 (and this with a single input file
    of 10,000 records, magically yielding 443 maps).

    And magically is the next issue. I want to fine tune control the
    InputFile, Input # records, to maps relationship. For my immediate
    problem, I want to use a single input file with a number of records
    yielding the exact same number of maps (all kicked off simultaneously BTW).
    Since I did not get this behavior with the standard InputFileFormat, I
    created my own input format class and record reader, and am now getting the
    "1 file with n recs to nmaps" relationship.... but the problem is that I am
    not even sure why....
    I'm in the process of documenting these better (
    http://issues.apache.org/jira/browse/HADOOP-2046), meanwhile here are some
    pointers:
    http://wiki.apache.org/lucene-hadoop/HowManyMapsAndReduces
    and
    http://wiki.apache.org/lucene-hadoop/FAQ#10

    Hope this helps...

    Arun
    Any guidance appreciated.
  • Lance Amundsen at Oct 23, 2007 at 2:45 am
    Has had no effect for me however... Not sure why. The admin reports 10 tasks per node possible, but am not seeing it.


    ----- Original Message -----
    From: Ted Dunning [tdunning@veoh.com]
    Sent: 10/22/2007 08:29 PM
    To: <hadoop-user@lucene.apache.org>
    Subject: Re: InputFiles, Splits, Maps, Tasks Questions 1.3 Base




    You probably have determined by now that there is a parameter that
    determines how many concurrent maps there are.

    <property>
    <name>mapred.tasktracker.tasks.maximum</name>
    <value>3</value>
    <description>The maximum number of tasks that will be run
    simultaneously by a task tracker.
    </description>
    </property>

    Btw... I am still curious about your approach. Isn't it normally better to
    measure marginal costs such as this startup cost by linear regression as you
    change parameters? It seems that otherwise, you will likely be mislead by
    what happens at the boundaries when what you really want it what happens in
    the normal operating region.



    On 10/22/07 5:53 PM, "Lance Amundsen" wrote:

    ...

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).
  • Lance Amundsen at Oct 23, 2007 at 4:05 am
    Just had a thought.... I may not be seeing that additional tasks because
    the startup time is washing things out. In other words, the when the
    tasktracker is starting job 3, say, job 1 is already finishing. I'll try a
    pause in the mapper and see what happens.




    Lance
    Amundsen/Rocheste
    r/IBM@IBMUS To
    "hadoop-user"
    10/22/2007 07:42 <hadoop-user@lucene.apache.org>
    PM cc

    Subject
    Please respond to Re: InputFiles, Splits, Maps, Tasks
    hadoop-user@lucen Questions 1.3 Base
    e.apache.org









    Has had no effect for me however... Not sure why. The admin reports 10
    tasks per node possible, but am not seeing it.


    ----- Original Message -----
    From: Ted Dunning [tdunning@veoh.com]
    Sent: 10/22/2007 08:29 PM
    To: <hadoop-user@lucene.apache.org>
    Subject: Re: InputFiles, Splits, Maps, Tasks Questions 1.3 Base




    You probably have determined by now that there is a parameter that
    determines how many concurrent maps there are.

    <property>
    <name>mapred.tasktracker.tasks.maximum</name>
    <value>3</value>
    <description>The maximum number of tasks that will be run
    simultaneously by a task tracker.
    </description>
    </property>

    Btw... I am still curious about your approach. Isn't it normally better to
    measure marginal costs such as this startup cost by linear regression as
    you
    change parameters? It seems that otherwise, you will likely be mislead by
    what happens at the boundaries when what you really want it what happens in
    the normal operating region.



    On 10/22/07 5:53 PM, "Lance Amundsen" wrote:

    ...

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).
  • Lance Amundsen at Oct 23, 2007 at 11:24 pm
    I am starting to wonder if it might be indeed impossible to get map jobs
    running w/o writing to the file system.... as in, not w/o some major
    changes to the job and task tracker code.

    I was thinking about creating an InputFormat that does no file I/O, instead
    is queue based. As mappers start up, their getRecordReader calls get
    re-directed to a remote queue to pull one or more records off of. But I am
    starting to wonder if the file system dependencies in the code are such
    that I could never completely avoid using files. Specifically, even if I
    completely re-write an InputFormat, the framework is still going to try to
    do Filesystem stuff on everything I return (the extensive internal use of
    splits is baffling me some).

    Looking for some enlightening thoughts.





    Lance
    Amundsen/Rocheste
    r/IBM@IBMUS To
    hadoop-user@lucene.apache.org
    10/22/2007 09:02 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org









    Just had a thought.... I may not be seeing that additional tasks because
    the startup time is washing things out. In other words, the when the
    tasktracker is starting job 3, say, job 1 is already finishing. I'll try a
    pause in the mapper and see what happens.




    Lance
    Amundsen/Rocheste
    r/IBM@IBMUS To
    "hadoop-user"
    10/22/2007 07:42 <hadoop-user@lucene.apache.org>
    PM cc

    Subject
    Please respond to Re: InputFiles, Splits, Maps, Tasks
    hadoop-user@lucen Questions 1.3 Base
    e.apache.org









    Has had no effect for me however... Not sure why. The admin reports 10
    tasks per node possible, but am not seeing it.


    ----- Original Message -----
    From: Ted Dunning [tdunning@veoh.com]
    Sent: 10/22/2007 08:29 PM
    To: <hadoop-user@lucene.apache.org>
    Subject: Re: InputFiles, Splits, Maps, Tasks Questions 1.3 Base




    You probably have determined by now that there is a parameter that
    determines how many concurrent maps there are.

    <property>
    <name>mapred.tasktracker.tasks.maximum</name>
    <value>3</value>
    <description>The maximum number of tasks that will be run
    simultaneously by a task tracker.
    </description>
    </property>

    Btw... I am still curious about your approach. Isn't it normally better to
    measure marginal costs such as this startup cost by linear regression as
    you
    change parameters? It seems that otherwise, you will likely be mislead by
    what happens at the boundaries when what you really want it what happens in
    the normal operating region.



    On 10/22/07 5:53 PM, "Lance Amundsen" wrote:

    ...

    Next I want to increase the concurrent # of tasks being executed for each
    node... currently it seems like 2 or 3 is the upper limit (at least on the
    earlier binaries I was running).
  • Doug Cutting at Oct 24, 2007 at 4:03 pm

    Lance Amundsen wrote:
    I am starting to wonder if it might be indeed impossible to get map jobs
    running w/o writing to the file system.... as in, not w/o some major
    changes to the job and task tracker code.

    I was thinking about creating an InputFormat that does no file I/O, instead
    is queue based. As mappers start up, their getRecordReader calls get
    re-directed to a remote queue to pull one or more records off of. But I am
    starting to wonder if the file system dependencies in the code are such
    that I could never completely avoid using files. Specifically, even if I
    completely re-write an InputFormat, the framework is still going to try to
    do Filesystem stuff on everything I return (the extensive internal use of
    splits is baffling me some).
    Nothing internally should depend on an InputSplit representing a file.
    You do need to be able to generate the complete set of splits when the
    job is launched. So if you wanted maps to poll a queue for each task
    then you'd need to know how long the queue is when the job is launched
    so that you could generate the right number of polling splits.

    Doug
  • Lance Amundsen at Oct 24, 2007 at 7:36 pm
    OK, that is encouraging. I'll take another pass at it. I succeeded
    yesterday with an in-memory only InputFormat, but only after I commented
    out some of the split referencing code, like the following in MapTask.java

    if (instantiatedSplit instanceof FileSplit) {
    FileSplit fileSplit = (FileSplit) instantiatedSplit;
    job.set("map.input.file", fileSplit.getPath().toString());
    job.setLong("map.input.start", fileSplit.getStart());
    job.setLong("map.input.length", fileSplit.getLength());
    }

    But maybe I simply need to override more methods in more of the embedded
    classes. You can see why I was wondering about the file system
    dependencies.




    Doug Cutting
    <cutting@apache.o
    rg> To
    hadoop-user@lucene.apache.org
    10/24/2007 09:02 cc
    AM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    Lance Amundsen wrote:
    I am starting to wonder if it might be indeed impossible to get map jobs
    running w/o writing to the file system.... as in, not w/o some major
    changes to the job and task tracker code.

    I was thinking about creating an InputFormat that does no file I/O, instead
    is queue based. As mappers start up, their getRecordReader calls get
    re-directed to a remote queue to pull one or more records off of. But I am
    starting to wonder if the file system dependencies in the code are such
    that I could never completely avoid using files. Specifically, even if I
    completely re-write an InputFormat, the framework is still going to try to
    do Filesystem stuff on everything I return (the extensive internal use of
    splits is baffling me some).
    Nothing internally should depend on an InputSplit representing a file.
    You do need to be able to generate the complete set of splits when the
    job is launched. So if you wanted maps to poll a queue for each task
    then you'd need to know how long the queue is when the job is launched
    so that you could generate the right number of polling splits.

    Doug
  • Doug Cutting at Oct 24, 2007 at 7:42 pm

    Lance Amundsen wrote:
    OK, that is encouraging. I'll take another pass at it. I succeeded
    yesterday with an in-memory only InputFormat, but only after I commented
    out some of the split referencing code, like the following in MapTask.java

    if (instantiatedSplit instanceof FileSplit) {
    FileSplit fileSplit = (FileSplit) instantiatedSplit;
    job.set("map.input.file", fileSplit.getPath().toString());
    job.setLong("map.input.start", fileSplit.getStart());
    job.setLong("map.input.length", fileSplit.getLength());
    }
    Yes, that code should not exist, but it shouldn't affect you either.
    You should be subclassing InputSplit, not FileSplit, so this code
    shouldn't operate on your splits.

    Doug
  • Owen O'Malley at Oct 24, 2007 at 8:06 pm

    On Oct 24, 2007, at 12:42 PM, Doug Cutting wrote:

    Lance Amundsen wrote:
    OK, that is encouraging. I'll take another pass at it. I succeeded
    yesterday with an in-memory only InputFormat, but only after I
    commented
    out some of the split referencing code, like the following in
    MapTask.java
    if (instantiatedSplit instanceof FileSplit) {
    FileSplit fileSplit = (FileSplit) instantiatedSplit;
    job.set("map.input.file", fileSplit.getPath().toString());
    job.setLong("map.input.start", fileSplit.getStart());
    job.setLong("map.input.length", fileSplit.getLength());
    }
    Yes, that code should not exist, but it shouldn't affect you
    either. You should be subclassing InputSplit, not FileSplit, so
    this code shouldn't operate on your splits.
    That code doesn't do anything if they are non file-splits, so it
    absolutely shouldn't break anything. Applications depend on those
    attributes to know which split they are working on and there isn't a
    better fix until we move to context objects. I know that non-
    filesplits work because there are units tests to make sure they don't
    break anything.

    -- Owen
  • Lance Amundsen at Oct 25, 2007 at 5:19 pm
    So I managed to get my fast InputFormat working.... it does still use the
    FS, but in such a way that it improves mapper startup by over 2X. And last
    night I got a prototype working that allows the map task to run under the
    JVM of the TaskTracker, rather than spawing a new JVM.

    The initial performance look really, really good. I just ran a 1000 map
    single input record job, (mappers doing no work however), in a one master,
    one slave setup... on my laptop.... It completed in a couple thousand
    seconds, or a couple seconds per map. Earlier I did a smaller 100 map job
    with a stable quieced system and it came in at about 130 seconds.

    So this prototype can start and end map jobs in 1-2 seconds, and should
    scale flatly with respect to nodes in the setup.




    "Owen O'Malley"
    <oom@yahoo-inc.co
    m> To
    hadoop-user@lucene.apache.org
    10/24/2007 01:05 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org








    On Oct 24, 2007, at 12:42 PM, Doug Cutting wrote:

    Lance Amundsen wrote:
    OK, that is encouraging. I'll take another pass at it. I succeeded
    yesterday with an in-memory only InputFormat, but only after I
    commented
    out some of the split referencing code, like the following in
    MapTask.java
    if (instantiatedSplit instanceof FileSplit) {
    FileSplit fileSplit = (FileSplit) instantiatedSplit;
    job.set("map.input.file", fileSplit.getPath().toString());
    job.setLong("map.input.start", fileSplit.getStart());
    job.setLong("map.input.length", fileSplit.getLength());
    }
    Yes, that code should not exist, but it shouldn't affect you
    either. You should be subclassing InputSplit, not FileSplit, so
    this code shouldn't operate on your splits.
    That code doesn't do anything if they are non file-splits, so it
    absolutely shouldn't break anything. Applications depend on those
    attributes to know which split they are working on and there isn't a
    better fix until we move to context objects. I know that non-
    filesplits work because there are units tests to make sure they don't
    break anything.

    -- Owen
  • Benjamin Reed at Oct 25, 2007 at 5:40 pm
    I did a patch last year that got similar improvements but still using an
    external process. (I really like the idea of keeping user code out of the
    JobTracker and the TaskTracker. It makes things more stable.) See HADOOP-249.
    It reuses the JVM for a task, which avoids the JVM restart hit. This hit is
    really bad for cases such as yours. It also avoids the performance hit of
    doing socket I/O for progress and task info, and instead uses the process
    pip, which also gives a big performance improvement.

    Unfortunately, it was never incorporated and now the patch no longer applies.
    It's really not a big change, but the Hadoop code path to spawn the JVM is a
    bit convoluted, which made it hard to do the change and makes it hard to
    bring the patch up-to-date.

    ben
    On Thursday 25 October 2007 10:19:59 Lance Amundsen wrote:
    So I managed to get my fast InputFormat working.... it does still use the
    FS, but in such a way that it improves mapper startup by over 2X. And last
    night I got a prototype working that allows the map task to run under the
    JVM of the TaskTracker, rather than spawing a new JVM.

    The initial performance look really, really good. I just ran a 1000 map
    single input record job, (mappers doing no work however), in a one master,
    one slave setup... on my laptop.... It completed in a couple thousand
    seconds, or a couple seconds per map. Earlier I did a smaller 100 map job
    with a stable quieced system and it came in at about 130 seconds.

    So this prototype can start and end map jobs in 1-2 seconds, and should
    scale flatly with respect to nodes in the setup.




    "Owen O'Malley"
    <oom@yahoo-inc.co
    m> To
    hadoop-user@lucene.apache.org
    10/24/2007 01:05 cc
    PM
    Subject
    Re: InputFiles, Splits, Maps, Tasks
    Please respond to Questions 1.3 Base
    hadoop-user@lucen
    e.apache.org
    On Oct 24, 2007, at 12:42 PM, Doug Cutting wrote:
    Lance Amundsen wrote:
    OK, that is encouraging. I'll take another pass at it. I succeeded
    yesterday with an in-memory only InputFormat, but only after I
    commented
    out some of the split referencing code, like the following in
    MapTask.java
    if (instantiatedSplit instanceof FileSplit) {
    FileSplit fileSplit = (FileSplit) instantiatedSplit;
    job.set("map.input.file", fileSplit.getPath().toString());
    job.setLong("map.input.start", fileSplit.getStart());
    job.setLong("map.input.length", fileSplit.getLength());
    }
    Yes, that code should not exist, but it shouldn't affect you
    either. You should be subclassing InputSplit, not FileSplit, so
    this code shouldn't operate on your splits.
    That code doesn't do anything if they are non file-splits, so it
    absolutely shouldn't break anything. Applications depend on those
    attributes to know which split they are working on and there isn't a
    better fix until we move to context objects. I know that non-
    filesplits work because there are units tests to make sure they don't
    break anything.

    -- Owen

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedOct 17, '07 at 6:27a
activeOct 25, '07 at 5:40p
posts29
users7
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase