Hi Experts
I was working on Hadoop mapreduce 0.18 API for some time. Now I
just tried to migrate some existing application to hadoop mapreduce 0.20
API. But after the migration, It seems like the reduce logic is not working.
Map output records and reduce output records show the same number on console
even though i have an aggregation operation on reducer. I tried to migrate
the simple word count example to 0.20 API to debug, but still the same
issue. I feel I'm missing something but still not able to locate what it is.
Please share your thoughts, given below are the sample codes.

*Mapper*

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text,
IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

//iterating through all the words available in that line and
forming the key value pair
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
//sending to output collector which inturn passes the same
to reducer
context.write(word, one);
}
}
}

*Reducer*

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text,
IntWritable>
{
//Reduce method for just outputting the key from mapper as the value
from mapper is just an empty string
public void reduce(Text key, Iterator<Text> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
/*iterates through all the values available with a key and add
them together and give the
final result as the key and sum of its values*/
while (values.hasNext())
{
sum += 1;
values.next();
}
context.write(key, new IntWritable(sum));
}
}

*Driver Class*

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class WordCount extends Configured implements Tool
{
public int run(String[] args) throws Exception
{
//getting configuration object and setting job name
Configuration conf = getConf();
Job job = new Job(conf, "Word Count hadoop-0.20");

//setting the class names
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

//setting the output data type classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//to accept the hdfs input and outpur dir at run time
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCount(),
args);
System.exit(res);
}
}

Search Discussions

  • Harsh J at Sep 6, 2011 at 1:13 pm
    This has come up before. Its a code error of using Iterator instead of
    Iterable. Please see
    http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201107.mbox/%3CCAOcnVr3tdEt9_+PxqcJXUPEygTe7R7-4kV34tA5MBhDqyOX1+A@mail.gmail.com%3E

    P.s. Where did you pick up the new reduce(…) signature in docs? Maybe
    everyone's reading a faulty docs somewhere and that's why there's so
    many reports of this user-error.
    On Tue, Sep 6, 2011 at 5:46 PM, Bejoy KS wrote:
    Hi Experts
    I was working on  Hadoop mapreduce  0.18 API for some time. Now I
    just tried to migrate some existing application to hadoop mapreduce 0.20
    API. But after the migration, It seems like the reduce logic is not working.
    Map output records and reduce output records show the same number on console
    even though i have an aggregation operation on reducer. I tried to migrate
    the simple word count example to 0.20 API to debug, but still the same
    issue. I feel I'm missing something but still not able to locate what it is.
    Please share your thoughts, given below are the sample codes.

    Mapper

    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Mapper;

    public class WordCountMapper extends Mapper<LongWritable, Text, Text,
    IntWritable>
    {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
    {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);

    //iterating through all the words available in that line and
    forming the key value pair
    while (tokenizer.hasMoreTokens())
    {
    word.set(tokenizer.nextToken());
    //sending to output collector which inturn passes the same
    to reducer
    context.write(word, one);
    }
    }
    }

    Reducer

    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Reducer;

    public class WordCountReducer extends Reducer<Text, IntWritable, Text,
    IntWritable>
    {
    //Reduce method for just outputting the key from mapper as the value
    from mapper is just an empty string
    public void reduce(Text key, Iterator<Text> values, Context context)
    throws IOException, InterruptedException
    {
    int sum = 0;
    /*iterates through all the values available with a key and add
    them together and give the
    final result as the key and sum of its values*/
    while (values.hasNext())
    {
    sum += 1;
    values.next();
    }
    context.write(key, new IntWritable(sum));
    }
    }

    Driver Class

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;



    public class WordCount extends Configured implements Tool
    {
    public int run(String[] args) throws Exception
    {
    //getting configuration object and setting job name
    Configuration conf = getConf();
    Job job = new Job(conf, "Word Count hadoop-0.20");

    //setting the class names
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);

    //setting the output data type classes
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //to accept the hdfs input and outpur dir at run time
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new WordCount(),
    args);
    System.exit(res);
    }
    }


    --
    Harsh J

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupmapreduce-user @
categorieshadoop
postedSep 6, '11 at 12:17p
activeSep 6, '11 at 1:13p
posts2
users2
websitehadoop.apache.org...
irc#hadoop

2 users in discussion

Harsh J: 1 post Bejoy KS: 1 post

People

Translate

site design / logo © 2022 Grokbase