I'm using streaming hadoop, installed vua cloudera on ec2.
My job should be straightforward:
1) Map task, emits 2 keys and 1 VALUE
<WORD><FLAG, 0 or 1><TEXT>
eg
AA 0 QUICK BROWN FOX
AA 1 QUICK BROWN FOX
BB 1 QUICK RED DOG
2) Reduce Task, assuming <WORD> are all in its standard input and flag, runs
thru the stdin. When the 1st key changes it checks to see if flag is 0 or
1, if it is 0, it emits all records of that key. If it changes and is a 1 it
skips all records of that key.
My run script is here:
hadoop jar
/usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options="-k1,1"\
-D
mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
\
-D mapred.text.key.comparator.options="-k1,1 -k2,2"\
-file $files \
-input input \
-output output \
-mapper mapper.rb \
-reducer reducer.rb \
-combiner /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
hadoop dfs -get output .
No matter what I do, I do not get the desired effect of partition on Key,
and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
just fine on a single node test case, but as soon as I run it on a 32 node
hadoop cluster, it breaks. I don't really have any sense on what is going
on, other than perhaps I do not understand the subtleties between
partitioning and ordering the input to the reduce task. It's possible also
that I misunderstand how the reducer is fed its data, but again, my test
example doesn't exhibit the problem.
The reducer code is here:
#!/usr/bin/env ruby
#
#
lastkey=nil
noskip=true
STDIN.each_line do |line|
keyval = line.strip.split("\t")
# new key!
# if the second value is 0 after a keychange then we are going to output.
if lastkey != keyval[0] then
noskip = ( keyval[1] == "0" )
lastkey = keyval[0]
end
puts line.strip if noskip
end
Thanks so much for any comments,
Winton