FAQ
I would really appreciate any help people can offer on the following matters.

When running a streaming job, -D, -files, -libjars, and -archives don't seem work, but -jobconf, -file, -cacheFile, and -cacheArchive do. With the first four parameters anywhere in command I always get a "Streaming Command Failed!" error. The last four work though. Note that some of those parameters (-files) do work when I a run a Hadoop job in the normal framework, just not when I specify the streaming jar.

How do I specify a Java class as the reducer? I have found examples online, but they always reference "built-in" classes. If I try to use my own class, the job tracker produces a "Cannot run program "org.uw.astro.coadd.Reducer2": java.io.IOException: error=2, No such file or directory" error. As you can see from my first question, I am certainly trying to find ways to include the .jar file containing the class in the distributed cache, but -libjars and -archives don't work, and if I upload the .jar to the cluster and use -cacheArchives, the command runs but I still get the "No such file" error. I can use native compiled programs for the mapper and reducer just fine, but not a Java class. I want a native mapper and a Java reducer. My native mapper runs, but then the Java reducer fails as described.

How do I force a single record (input file) to be processed by a single mapper to get maximum parallelism? All I found online was this terse description (of an example that gzips files, not my application):
• Generate a file containing the full HDFS path of the input files. Each map task would get one file name as input.
• Create a mapper script which, given a filename, will get the file to local disk, gzip the file and put it back in the desired output directory
I don't understand exactly what that means and how to go about doing it. In the normal Hadoop framework I have achieved this goal by setting mapred.max.split.size small enough that only one input record fits (about 6MBs), but I tried that with my streaming job ala "-jobconf mapred.max.split.size=X" where X is a very low number, about as many as a single streaming input record (which in the streaming case is not 6MB, but merely ~100 bytes, just a filename referenced ala -cacheFile), but it didn't work, it sent multiple records to each mapper anyway. Achieving 1-to-1 parallelism between map tasks, nodes, and input records is very import because my map tasks take a very long time to run, upwards of an hour. I cannot have them queueing up on a small number of nodes while there are numerous unused nodes (task slots) available to be doing work.

I realize I'm asking a lot of questions here, but I would greatly appreciate any assistance on these issues.

Thanks.

________________________________________________________________________________
Keith Wiley kwiley@keithwiley.com keithwiley.com music.keithwiley.com

"Luminous beings are we, not this crude matter."
-- Yoda
________________________________________________________________________________

Search Discussions

  • Allen Wittenauer at Feb 4, 2011 at 2:41 am

    On Feb 1, 2011, at 11:40 PM, Keith Wiley wrote:

    I would really appreciate any help people can offer on the following matters.

    When running a streaming job, -D, -files, -libjars, and -archives don't seem work, but -jobconf, -file, -cacheFile, and -cacheArchive do. With the first four parameters anywhere in command I always get a "Streaming Command Failed!" error. The last four work though. Note that some of those parameters (-files) do work when I a run a Hadoop job in the normal framework, just not when I specify the streaming jar.
    There are some issues with how the streaming jar processes the command line, especially in 0.20, in that they need to be in the correct order. In general, the -D's need to be *before* the rest of the streaming params. This is what works for me:

    hadoop \
    jar \
    `ls $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar` \
    -Dmapred.reduce.tasks.speculative.execution=false \
    -Dmapred.map.tasks.speculative.execution=false \
    -Dmapred.job.name="oh noes aw is doing perl again" \
    -input ${ATTEMPTIN} \
    -output ${ATTEMPTOUT} \
    -mapper map.pl \
    -reducer reduce.pl \
    -file jobsvs-map1.pl \
    -file jobsvs-reduce1.pl
    I have found examples online, but they always reference "built-in" classes. If I try to use my own class, the job tracker produces a "Cannot run program "org.uw.astro.coadd.Reducer2": java.io.IOException: error=2, No such file or directory" error.
    I wouldn't be surprised if it is a bug. It might be worthwhile to dig into the streaming jar to figure out how it determines whether something is a class or not. [It might even do something dumb like "is it org.apache.blah?"]
    How do I force a single record (input file) to be processed by a single mapper to get maximum parallelism?
    All I found online was this terse description (of an example that gzips files, not my application):
    • Generate a file containing the full HDFS path of the input files. Each map task would get one file name as input.
    • Create a mapper script which, given a filename, will get the file to local disk, gzip the file and put it back in the desired output directory
    These work, but are less than ideal.
    I don't understand exactly what that means and how to go about doing it. In the normal Hadoop framework I have achieved this goal by setting mapred.max.split.size small enough that only one input record fits (about 6MBs), but I tried that with my streaming job ala "-jobconf mapred.max.split.size=X" where X is a very low number, about as many as a single streaming input record (which in the streaming case is not 6MB, but merely ~100 bytes, just a filename referenced ala -cacheFile), but it didn't work, it sent multiple records to each mapper anyway.
    What you actually want to do is set mapred.min.split.size set to an extremely high value. Setting max.split.size only works on Combined- and MultiFile- InputFormat for some reason.

    Also, you might be able to change the inputformat. My experiences with doing this are Not Good(tm).

    Achieving 1-to-1 parallelism between map tasks, nodes, and input records is very import because my map tasks take a very long time to run, upwards of an hour. I cannot have them queueing up on a small number of nodes while there are numerous unused nodes (task slots) available to be doing work.
    If all the task slots are in use, why would you care if they are queueing up? Also keep in mind that if a node fails, that work will need to get re-done anyway.
  • Keith Wiley at Feb 4, 2011 at 3:47 pm
    That's weird. I thought I responded to this, but I don't see one on the list (and have vague recollection at best of whether I actually did respond)...anyway...
    On Feb 3, 2011, at 6:41 PM, Allen Wittenauer wrote:
    On Feb 1, 2011, at 11:40 PM, Keith Wiley wrote:

    I would really appreciate any help people can offer on the following matters.

    When running a streaming job, -D, -files, -libjars, and -archives don't seem work, but -jobconf, -file, -cacheFile, and -cacheArchive do. With the first four parameters anywhere in command I always get a "Streaming Command Failed!" error. The last four work though. Note that some of those parameters (-files) do work when I a run a Hadoop job in the normal framework, just not when I specify the streaming jar.
    There are some issues with how the streaming jar processes the command line, especially in 0.20, in that they need to be in the correct order. In general, the -D's need to be *before* the rest of the streaming params. This is what works for me:

    hadoop \
    jar \
    `ls $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar` \
    -Dmapred.reduce.tasks.speculative.execution=false \
    -Dmapred.map.tasks.speculative.execution=false \
    -Dmapred.job.name="oh noes aw is doing perl again" \
    -input ${ATTEMPTIN} \
    -output ${ATTEMPTOUT} \
    -mapper map.pl \
    -reducer reduce.pl \
    -file jobsvs-map1.pl \
    -file jobsvs-reduce1.pl
    I'll give that a shot today. Thanks. I hate deprication warnings, they make me feel so guilty.
    How do I force a single record (input file) to be processed by a single mapper to get maximum parallelism?
    I don't understand exactly what that means and how to go about doing it. In the normal Hadoop framework I have achieved this goal by setting mapred.max.split.size small enough that only one input record fits (about 6MBs), but I tried that with my streaming job ala "-jobconf mapred.max.split.size=X" where X is a very low number, about as many as a single streaming input record (which in the streaming case is not 6MB, but merely ~100 bytes, just a filename referenced ala -cacheFile), but it didn't work, it sent multiple records to each mapper anyway.
    What you actually want to do is set mapred.min.split.size set to an extremely high value.
    I agree except that method I described helps force parallelism. Setting mapred.max.split.size to a size slightly larger than a single record does a very good job of forcing 1-to-1 parallelism. Forcing it to just larger than two records forces 2-to-1, etc. It is very nice to be able to achieve perfect parallelism...but it didn't work with streaming.

    I have since discovered that in the case of streaming, mapred.map.tasks is a good way to achieve this goal. Ironically, if I recall correctly, this seemingly obvious method for setting the number mappers did not work so well in my original nonstreaming case, which is why I resorted to the rather contrived method of calculating and setting mapred.max.split.size instead.
    Achieving 1-to-1 parallelism between map tasks, nodes, and input records is very import because my map tasks take a very long time to run, upwards of an hour. I cannot have them queueing up on a small number of nodes while there are numerous unused nodes (task slots) available to be doing work.
    If all the task slots are in use, why would you care if they are queueing up? Also keep in mind that if a node fails, that work will need to get re-done anyway.

    Because all slots are not in use. It's a very larger cluster and it's excruciating that Hadoop partially serializes a job by piling multiple map tasks onto a single map in a queue even when the cluster is massively underutilized. This occurs when the input records are significantly smaller than the block size (6MB vs 64MB in my case, give me about a 32x serialization cost!!!). To put it differently, if I let Hadoop do it its own stupid way, the job takes 32 times longer than it should take if it evenly distributed the map tasks across the nodes. Packing the input files into larger sequence fils does not help with this problem. The input splits are calculated from the individual files and thus, I still get this undesirable packing effect.

    Thanks a lot. Lots of stuff to think about in you post. I appreciate it.

    Cheers!

    ________________________________________________________________________________
    Keith Wiley kwiley@keithwiley.com keithwiley.com music.keithwiley.com

    "It's a fine line between meticulous and obsessive-compulsive and a slippery
    rope between obsessive-compulsive and debilitatingly slow."
    -- Keith Wiley
    ________________________________________________________________________________
  • Keith Wiley at Feb 4, 2011 at 4:51 pm

    On Feb 4, 2011, at 07:46 , Keith Wiley wrote:
    On Feb 3, 2011, at 6:41 PM, Allen Wittenauer wrote:

    If all the task slots are in use, why would you care if they are queueing up? Also keep in mind that if a node fails, that work will need to get re-done anyway.

    Because all slots are not in use. It's a very larger cluster and it's excruciating that Hadoop partially serializes a job by piling multiple map tasks onto a single map in a queue even when the cluster is massively underutilized. This occurs when the input records are significantly smaller than the block size (6MB vs 64MB in my case, give me about a 32x serialization cost!!!). To put it differently, if I let Hadoop do it its own stupid way, the job takes 32 times longer than it should take if it evenly distributed the map tasks across the nodes. Packing the input files into larger sequence fils does not help with this problem. The input splits are calculated from the individual files and thus, I still get this undesirable packing effect.

    Having reread my last paragraph, I am now reconsidering its tone. I apologize. I am entirely open to the possibility that there are smarter ways to achieve my desired goal of minimum job-turnaround time (maximum parallelism), perhaps via various configuration parameters which I have not learned how to use properly...and furthermore I am willing to admit that the seemingly frustrating and seemingly illogical partial serialism that I witnessed in my jobs using Hadoop's default configuration was not necessarily Hadoop's fault but rather originated from some ineptitude on my part w.r.t. configuring, programming, and using Hadoop properly.

    In other words, I am perfectly willing to admit I might just not be using Hadoop correctly and that this problem is therefore basically my fault.

    Sorry.

    ________________________________________________________________________________
    Keith Wiley kwiley@keithwiley.com www.keithwiley.com

    "You can scratch an itch, but you can't itch a scratch. Furthermore, an itch can
    itch but a scratch can't scratch. Finally, a scratch can itch, but an itch can't
    scratch. All together this implies: He scratched the itch from the scratch that
    itched but would never itch the scratch from the itch that scratched."
    -- Keith Wiley
    ________________________________________________________________________________
  • Allen Wittenauer at Feb 7, 2011 at 9:38 pm

    On Feb 4, 2011, at 7:46 AM, Keith Wiley wrote:
    I have since discovered that in the case of streaming, mapred.map.tasks is a good way to achieve this goal. Ironically, if I recall correctly, this seemingly obvious method for setting the number mappers did not work so well in my original nonstreaming case, which is why I resorted to the rather contrived method of calculating and setting mapred.max.split.size instead.
    mapred.map.tasks basically kicks in if the input size is less than a block. (OK, it is technically more complex than that, but ... whatever). Given what you said in the other thread, this makes a lot more sense now as to what is going on.
    Because all slots are not in use. It's a very larger cluster and it's excruciating that Hadoop partially serializes a job by piling multiple map tasks onto a single map in a queue even when the cluster is massively underutilized.
    Well, sort of.

    The only input hadoop has to go on is your filename input which is relatively tiny. So of course it is going to underutilize. This makes sense now. :)
  • Keith Wiley at Feb 7, 2011 at 11:39 pm

    On Feb 7, 2011, at 13:39 , Allen Wittenauer wrote:
    On Feb 4, 2011, at 7:46 AM, Keith Wiley wrote:
    [On the topic of why I care if Hadoop funnels and queues multiple input splits into a a small number mappers instead of perfectly parallelizing the job across the available slots...]
    Because all slots are not in use. It's a very larger cluster and it's excruciating that Hadoop partially serializes a job by piling multiple map tasks onto a single map in a queue even when the cluster is massively underutilized.
    Well, sort of.

    The only input hadoop has to go on is your filename input which is relatively tiny. So of course it is going to underutilize. This makes sense now. :)

    I think we're talking around each other a little bit here. I'm sorry. In in my original description, I was referring to the nonstreaming version of my program. The all-Java version doesn't use filenames, it sets up actual Hadoop input splits from files stored on HDFS. These files are about 6MB after decompression. My point, earlier in this thread, was Hadoop's default behavior, even in that case which used the actual "largish" files as the inputs, still assigned many input splits to a single mapper (since they are smaller than a block) instead of achieving perfect parallelism.

    The degree of queueing seemed perfectly coordinated with the block size of 64Mb. That is to say that given my input files of 6MB each, Hadoop would assign about 10 of them per mapper...where I wanted one per mapper and ten times as many mappers.

    Then, my final point was that in the nonstreaming all Java case, I could *NOT* achieve the desired behavior simply by setting mapred.map.tasks to a high number, say, one per input file (I honestly don't remember what the behavior was when I tried this, it was a very long time ago). This simply did not work, Hadoop ignored it and queued up all my inputs anyway. What I had to do was set mapred.max.split.size really small so that Hadoop would not be willing to queue inputs up per mapper. Ideally, I would set mapred.max.split.size slightly larger than a single input file, about 6MB. Doing this achieves my desired goal, one input per mapper, perfect parallelism, minimum job turn-around time.

    Now, all that said, I am perfectly open to discussion or suggestions as to how I ought to better handle this situation, including the notion that mapred.map.tasks should have worked the way I intended in the first place (Did I just do something wrong there? Should it have worked the way I expected it to?). At any rate, what is the proper Hadoop method for evenly distributing inputs across all nodes before doubling up on any given node?

    Sorry, maybe this thread is getting a bit rambling. We can drop it if people prefer.....

    Thanks.

    ________________________________________________________________________________
    Keith Wiley kwiley@keithwiley.com www.keithwiley.com

    "I do not feel obliged to believe that the same God who has endowed us with
    sense, reason, and intellect has intended us to forgo their use."
    -- Galileo Galilei
    ________________________________________________________________________________

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedFeb 2, '11 at 7:40a
activeFeb 7, '11 at 11:39p
posts6
users2
websitehadoop.apache.org...
irc#hadoop

2 users in discussion

Keith Wiley: 4 posts Allen Wittenauer: 2 posts

People

Translate

site design / logo © 2022 Grokbase