FAQ
I'm using streaming hadoop, installed vua cloudera on ec2.

My job should be straightforward:

1) Map task, emits 2 keys and 1 VALUE

<WORD><FLAG, 0 or 1><TEXT>

eg

AA 0 QUICK BROWN FOX
AA 1 QUICK BROWN FOX
BB 1 QUICK RED DOG


2) Reduce Task, assuming <WORD> are all in its standard input and flag, runs
thru the stdin. When the 1st key changes it checks to see if flag is 0 or
1, if it is 0, it emits all records of that key. If it changes and is a 1 it
skips all records of that key.


My run script is here:

hadoop jar
/usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options="-k1,1"\
-D
mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
\
-D mapred.text.key.comparator.options="-k1,1 -k2,2"\
-file $files \
-input input \
-output output \
-mapper mapper.rb \
-reducer reducer.rb \
-combiner /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
hadoop dfs -get output .

No matter what I do, I do not get the desired effect of partition on Key,
and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
just fine on a single node test case, but as soon as I run it on a 32 node
hadoop cluster, it breaks. I don't really have any sense on what is going
on, other than perhaps I do not understand the subtleties between
partitioning and ordering the input to the reduce task. It's possible also
that I misunderstand how the reducer is fed its data, but again, my test
example doesn't exhibit the problem.

The reducer code is here:
#!/usr/bin/env ruby
#
#
lastkey=nil
noskip=true
STDIN.each_line do |line|
keyval = line.strip.split("\t")
# new key!
# if the second value is 0 after a keychange then we are going to output.
if lastkey != keyval[0] then
noskip = ( keyval[1] == "0" )
lastkey = keyval[0]
end
puts line.strip if noskip
end



Thanks so much for any comments,
Winton

Search Discussions

  • E. Sammer at Feb 11, 2010 at 12:03 am
    Winton:

    I don't know the exact streaming options you're looking for, but what
    you have looks correct. Generally, to do what you want all you should
    have to do is 1. sort on both field zero and one in the key and 2.
    partition on only zero. This ensures all keys containing 'AA' go to the
    same reducer regardless of the zero or one. Once the reducer code is
    invoked, you're guaranteed to see records in the order they were sorted
    (which, if #1 goes right, is what you're looking for).

    Sorry I can't help much with the streaming options, but hopefully this
    clears up any questions you have around the sort / partition / reducer
    record order semantics.
    On 2/10/10 6:13 PM, Winton Davies wrote:
    I'm using streaming hadoop, installed vua cloudera on ec2.

    My job should be straightforward:

    1) Map task, emits 2 keys and 1 VALUE

    <WORD><FLAG, 0 or 1><TEXT>

    eg

    AA 0 QUICK BROWN FOX
    AA 1 QUICK BROWN FOX
    BB 1 QUICK RED DOG


    2) Reduce Task, assuming<WORD> are all in its standard input and flag, runs
    thru the stdin. When the 1st key changes it checks to see if flag is 0 or
    1, if it is 0, it emits all records of that key. If it changes and is a 1 it
    skips all records of that key.


    My run script is here:

    hadoop jar
    /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.text.key.partitioner.options="-k1,1"\
    -D
    mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
    \
    -D mapred.text.key.comparator.options="-k1,1 -k2,2"\
    -file $files \
    -input input \
    -output output \
    -mapper mapper.rb \
    -reducer reducer.rb \
    -combiner /bin/cat \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    hadoop dfs -get output .

    No matter what I do, I do not get the desired effect of partition on Key,
    and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
    just fine on a single node test case, but as soon as I run it on a 32 node
    hadoop cluster, it breaks. I don't really have any sense on what is going
    on, other than perhaps I do not understand the subtleties between
    partitioning and ordering the input to the reduce task. It's possible also
    that I misunderstand how the reducer is fed its data, but again, my test
    example doesn't exhibit the problem.

    The reducer code is here:
    #!/usr/bin/env ruby
    #
    #
    lastkey=nil
    noskip=true
    STDIN.each_line do |line|
    keyval = line.strip.split("\t")
    # new key!
    # if the second value is 0 after a keychange then we are going to output.
    if lastkey != keyval[0] then
    noskip = ( keyval[1] == "0" )
    lastkey = keyval[0]
    end
    puts line.strip if noskip
    end



    Thanks so much for any comments,
    Winton

    --
    Eric Sammer
    eric@lifeless.net
    http://esammer.blogspot.com
  • Winton Davies at Feb 11, 2010 at 3:31 am
    Thanks Eric,

    I think I may have found the cause of the problem, but have no idea how to
    do fix it.

    My mapper is STDOUT.puts "key1 tab key2 tab text" -- and the job tracker
    shows the total number of records being emitted as
    say 35 million.

    it then goes thru -combiner /bin/cat (ie a NOOP, in theory)

    The job tracker however shows 70 million output records.

    So, it seems to me like that something isnt quite working correctly here,
    perhaps like a Double NewLine being inserted? Something else? I have not a
    clue. Do you know the syntax for not having ANY combiner, or where I could
    find such documentation?

    Cheers,
    Winton



    On Wed, Feb 10, 2010 at 4:02 PM, E. Sammer wrote:

    Winton:

    I don't know the exact streaming options you're looking for, but what you
    have looks correct. Generally, to do what you want all you should have to do
    is 1. sort on both field zero and one in the key and 2. partition on only
    zero. This ensures all keys containing 'AA' go to the same reducer
    regardless of the zero or one. Once the reducer code is invoked, you're
    guaranteed to see records in the order they were sorted (which, if #1 goes
    right, is what you're looking for).

    Sorry I can't help much with the streaming options, but hopefully this
    clears up any questions you have around the sort / partition / reducer
    record order semantics.

    On 2/10/10 6:13 PM, Winton Davies wrote:

    I'm using streaming hadoop, installed vua cloudera on ec2.

    My job should be straightforward:

    1) Map task, emits 2 keys and 1 VALUE

    <WORD><FLAG, 0 or 1><TEXT>

    eg

    AA 0 QUICK BROWN FOX
    AA 1 QUICK BROWN FOX
    BB 1 QUICK RED DOG


    2) Reduce Task, assuming<WORD> are all in its standard input and flag,
    runs
    thru the stdin. When the 1st key changes it checks to see if flag is 0 or
    1, if it is 0, it emits all records of that key. If it changes and is a 1
    it
    skips all records of that key.


    My run script is here:

    hadoop jar
    /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.text.key.partitioner.options="-k1,1"\
    -D

    mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
    \
    -D mapred.text.key.comparator.options="-k1,1 -k2,2"\
    -file $files \
    -input input \
    -output output \
    -mapper mapper.rb \
    -reducer reducer.rb \
    -combiner /bin/cat \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    hadoop dfs -get output .

    No matter what I do, I do not get the desired effect of partition on Key,
    and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
    just fine on a single node test case, but as soon as I run it on a 32 node
    hadoop cluster, it breaks. I don't really have any sense on what is going
    on, other than perhaps I do not understand the subtleties between
    partitioning and ordering the input to the reduce task. It's possible also
    that I misunderstand how the reducer is fed its data, but again, my test
    example doesn't exhibit the problem.

    The reducer code is here:
    #!/usr/bin/env ruby
    #
    #
    lastkey=nil
    noskip=true
    STDIN.each_line do |line|
    keyval = line.strip.split("\t")
    # new key!
    # if the second value is 0 after a keychange then we are going to
    output.
    if lastkey != keyval[0] then
    noskip = ( keyval[1] == "0" )
    lastkey = keyval[0]
    end
    puts line.strip if noskip
    end



    Thanks so much for any comments,
    Winton
    --
    Eric Sammer
    eric@lifeless.net
    http://esammer.blogspot.com
  • Eric Sammer at Feb 11, 2010 at 4:08 am
    Winton:

    The combiner is always optional. Simply leave it out to not have one.
    The reason you're seeing extra records is because a combiner can run
    multiple times. This means you're growing your dataset after the
    mapper.

    HTH
    Eric

    On Feb 10, 2010, at 10:30 PM, Winton Davies wrote:

    Thanks Eric,

    I think I may have found the cause of the problem, but have no idea
    how to
    do fix it.

    My mapper is STDOUT.puts "key1 tab key2 tab text" -- and the job
    tracker
    shows the total number of records being emitted as
    say 35 million.

    it then goes thru -combiner /bin/cat (ie a NOOP, in theory)

    The job tracker however shows 70 million output records.

    So, it seems to me like that something isnt quite working correctly
    here,
    perhaps like a Double NewLine being inserted? Something else? I
    have not a
    clue. Do you know the syntax for not having ANY combiner, or where I
    could
    find such documentation?

    Cheers,
    Winton



    On Wed, Feb 10, 2010 at 4:02 PM, E. Sammer wrote:

    Winton:

    I don't know the exact streaming options you're looking for, but
    what you
    have looks correct. Generally, to do what you want all you should
    have to do
    is 1. sort on both field zero and one in the key and 2. partition
    on only
    zero. This ensures all keys containing 'AA' go to the same reducer
    regardless of the zero or one. Once the reducer code is invoked,
    you're
    guaranteed to see records in the order they were sorted (which, if
    #1 goes
    right, is what you're looking for).

    Sorry I can't help much with the streaming options, but hopefully
    this
    clears up any questions you have around the sort / partition /
    reducer
    record order semantics.

    On 2/10/10 6:13 PM, Winton Davies wrote:

    I'm using streaming hadoop, installed vua cloudera on ec2.

    My job should be straightforward:

    1) Map task, emits 2 keys and 1 VALUE

    <WORD><FLAG, 0 or 1><TEXT>

    eg

    AA 0 QUICK BROWN FOX
    AA 1 QUICK BROWN FOX
    BB 1 QUICK RED DOG


    2) Reduce Task, assuming<WORD> are all in its standard input and
    flag,
    runs
    thru the stdin. When the 1st key changes it checks to see if flag
    is 0 or
    1, if it is 0, it emits all records of that key. If it changes and
    is a 1
    it
    skips all records of that key.


    My run script is here:

    hadoop jar
    /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-
    streaming.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.text.key.partitioner.options="-k1,1"\
    -D

    mapred.output.key.comparator.class=
    org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
    \
    -D mapred.text.key.comparator.options="-k1,1 -k2,2"\
    -file $files \
    -input input \
    -output output \
    -mapper mapper.rb \
    -reducer reducer.rb \
    -combiner /bin/cat \
    -partitioner
    org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    hadoop dfs -get output .

    No matter what I do, I do not get the desired effect of partition
    on Key,
    and the reduce input sorted by KEY0 and then by KEY1 -- it appears
    to wokr
    just fine on a single node test case, but as soon as I run it on a
    32 node
    hadoop cluster, it breaks. I don't really have any sense on what
    is going
    on, other than perhaps I do not understand the subtleties between
    partitioning and ordering the input to the reduce task. It's
    possible also
    that I misunderstand how the reducer is fed its data, but again,
    my test
    example doesn't exhibit the problem.

    The reducer code is here:
    #!/usr/bin/env ruby
    #
    #
    lastkey=nil
    noskip=true
    STDIN.each_line do |line|
    keyval = line.strip.split("\t")
    # new key!
    # if the second value is 0 after a keychange then we are going to
    output.
    if lastkey != keyval[0] then
    noskip = ( keyval[1] == "0" )
    lastkey = keyval[0]
    end
    puts line.strip if noskip
    end



    Thanks so much for any comments,
    Winton
    --
    Eric Sammer
    eric@lifeless.net
    http://esammer.blogspot.com
  • Winton Davies at Feb 11, 2010 at 5:40 am
    ahhahhahahahahaha... I thought it was single-pass, and in this case, an
    'echo'.

    Thanks !
    W
    On Wed, Feb 10, 2010 at 8:05 PM, Eric Sammer wrote:

    Winton:

    The combiner is always optional. Simply leave it out to not have one. The
    reason you're seeing extra records is because a combiner can run multiple
    times. This means you're growing your dataset after the mapper.

    HTH
    Eric



    On Feb 10, 2010, at 10:30 PM, Winton Davies wrote:

    Thanks Eric,
    I think I may have found the cause of the problem, but have no idea how to
    do fix it.

    My mapper is STDOUT.puts "key1 tab key2 tab text" -- and the job tracker
    shows the total number of records being emitted as
    say 35 million.

    it then goes thru -combiner /bin/cat (ie a NOOP, in theory)

    The job tracker however shows 70 million output records.

    So, it seems to me like that something isnt quite working correctly here,
    perhaps like a Double NewLine being inserted? Something else? I have not
    a
    clue. Do you know the syntax for not having ANY combiner, or where I could
    find such documentation?

    Cheers,
    Winton




    On Wed, Feb 10, 2010 at 4:02 PM, E. Sammer wrote:

    Winton:
    I don't know the exact streaming options you're looking for, but what you
    have looks correct. Generally, to do what you want all you should have to
    do
    is 1. sort on both field zero and one in the key and 2. partition on only
    zero. This ensures all keys containing 'AA' go to the same reducer
    regardless of the zero or one. Once the reducer code is invoked, you're
    guaranteed to see records in the order they were sorted (which, if #1
    goes
    right, is what you're looking for).

    Sorry I can't help much with the streaming options, but hopefully this
    clears up any questions you have around the sort / partition / reducer
    record order semantics.


    On 2/10/10 6:13 PM, Winton Davies wrote:

    I'm using streaming hadoop, installed vua cloudera on ec2.
    My job should be straightforward:

    1) Map task, emits 2 keys and 1 VALUE

    <WORD><FLAG, 0 or 1><TEXT>

    eg

    AA 0 QUICK BROWN FOX
    AA 1 QUICK BROWN FOX
    BB 1 QUICK RED DOG


    2) Reduce Task, assuming<WORD> are all in its standard input and flag,
    runs
    thru the stdin. When the 1st key changes it checks to see if flag is 0
    or
    1, if it is 0, it emits all records of that key. If it changes and is a
    1
    it
    skips all records of that key.


    My run script is here:

    hadoop jar
    /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.text.key.partitioner.options="-k1,1"\
    -D


    mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
    \
    -D mapred.text.key.comparator.options="-k1,1 -k2,2"\
    -file $files \
    -input input \
    -output output \
    -mapper mapper.rb \
    -reducer reducer.rb \
    -combiner /bin/cat \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    hadoop dfs -get output .

    No matter what I do, I do not get the desired effect of partition on
    Key,
    and the reduce input sorted by KEY0 and then by KEY1 -- it appears to
    wokr
    just fine on a single node test case, but as soon as I run it on a 32
    node
    hadoop cluster, it breaks. I don't really have any sense on what is
    going
    on, other than perhaps I do not understand the subtleties between
    partitioning and ordering the input to the reduce task. It's possible
    also
    that I misunderstand how the reducer is fed its data, but again, my test
    example doesn't exhibit the problem.

    The reducer code is here:
    #!/usr/bin/env ruby
    #
    #
    lastkey=nil
    noskip=true
    STDIN.each_line do |line|
    keyval = line.strip.split("\t")
    # new key!
    # if the second value is 0 after a keychange then we are going to
    output.
    if lastkey != keyval[0] then
    noskip = ( keyval[1] == "0" )
    lastkey = keyval[0]
    end
    puts line.strip if noskip
    end



    Thanks so much for any comments,
    Winton

    --
    Eric Sammer
    eric@lifeless.net
    http://esammer.blogspot.com
  • Eric Sammer at Feb 11, 2010 at 5:46 am

    On 2/11/10 12:40 AM, Winton Davies wrote:
    ahhahhahahahahaha... I thought it was single-pass, and in this case, an
    'echo'.
    Yea, the combiner can be confusing at first. It may run N times where N
    is zero or greater. And yes, this means that even if you supply a
    combiner the framework may opt to not run it at all.

    --
    Eric Sammer
    eric@lifeless.net
    http://esammer.blogspot.com

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedFeb 10, '10 at 11:39p
activeFeb 11, '10 at 5:46a
posts6
users2
websitehadoop.apache.org...
irc#hadoop

2 users in discussion

Eric Sammer: 3 posts Winton Davies: 3 posts

People

Translate

site design / logo © 2022 Grokbase