I'm using streaming hadoop, installed vua cloudera on ec2.
My job should be straightforward:
1) Map task, emits 2 keys and 1 VALUE
<WORD><FLAG, 0 or 1><TEXT>
AA 0 QUICK BROWN FOX
AA 1 QUICK BROWN FOX
BB 1 QUICK RED DOG
2) Reduce Task, assuming <WORD> are all in its standard input and flag, runs
thru the stdin. When the 1st key changes it checks to see if flag is 0 or
1, if it is 0, it emits all records of that key. If it changes and is a 1 it
skips all records of that key.
My run script is here:
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.comparator.options="-k1,1 -k2,2"\
-file $files \
-input input \
-output output \
-mapper mapper.rb \
-reducer reducer.rb \
-combiner /bin/cat \
hadoop dfs -get output .
No matter what I do, I do not get the desired effect of partition on Key,
and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
just fine on a single node test case, but as soon as I run it on a 32 node
hadoop cluster, it breaks. I don't really have any sense on what is going
on, other than perhaps I do not understand the subtleties between
partitioning and ordering the input to the reduce task. It's possible also
that I misunderstand how the reducer is fed its data, but again, my test
example doesn't exhibit the problem.
The reducer code is here:
STDIN.each_line do |line|
keyval = line.strip.split("\t")
# new key!
# if the second value is 0 after a keychange then we are going to output.
if lastkey != keyval then
noskip = ( keyval == "0" )
lastkey = keyval
puts line.strip if noskip
Thanks so much for any comments,