FAQ
I had started a thread recently to ask questions about custom writable
implementations which is basically similar to this .. but that was more of
an understanding of the concept and here I wanted to ask my actual problem
and get help on that.

I want to be able to read text data line by line in my mapper ..
create an instance of a custom writable class that holds some information
parsed out of the line ..
pass that custom writable along with its count to reducer
reducer then simply need to insert every single entry into a database ..

I am just trying to understand how to accomplish this. here is what I am
thinking i need to do based on my little understanding of all this custom
stuff

1. create a custom writable class that can hold my parsed records. in my
mapper create a new instance of it using the text line and output the
created instance
2. accept this custom writable in mapper
3. set reducer output to DBOutputFormat
I tried doing that and it seems like I am supposed to use JobConf class
which is deprecated and the new configuration class where you are supposed
to use the job object to set the input/output formats doesnt seems to work
with DBOuputFormat .. doesnt this DBOutputFormat stuff works with hadoop new
api

4. now in reducer I am confused wat to do .. i guess i need to convert my
custom writable object to another custom dbwritable object .. that will then
be written to the database .. any hints on how to accomplish this ..

Sorry if the questions arent very clear .. I am just really confused about
this stuff and it doesnt helps that there is literally NO useful information
available anywhere on this writable and dbwritable stuff

Thanks
Adeel

Search Discussions

  • Adeelmahmood at Feb 4, 2011 at 12:35 am
    I had started a thread recently to ask questions about custom writable
    implementations which is basically similar to this .. but that was more of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a database ..

    I am just trying to understand how to accomplish this. here is what I am
    thinking i need to do based on my little understanding of all this custom
    stuff

    1. create a custom writable class that can hold my parsed records. in my
    mapper create a new instance of it using the text line and output the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use JobConf class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems to work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with hadoop new
    api

    4. now in reducer I am confused wat to do .. i guess i need to convert my
    custom writable object to another custom dbwritable object .. that will then
    be written to the database .. any hints on how to accomplish this ..

    Sorry if the questions arent very clear .. I am just really confused about
    this stuff and it doesnt helps that there is literally NO useful information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
    --
    View this message in context: http://old.nabble.com/Writing-Reducer-output-to-database-tp30840930p30840930.html
    Sent from the Hadoop core-user mailing list archive at Nabble.com.
  • Ted Yu at Feb 4, 2011 at 3:30 am
    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.
    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi wrote:

    I had started a thread recently to ask questions about custom writable
    implementations which is basically similar to this .. but that was more of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a database ..

    I am just trying to understand how to accomplish this. here is what I am
    thinking i need to do based on my little understanding of all this custom
    stuff

    1. create a custom writable class that can hold my parsed records. in my
    mapper create a new instance of it using the text line and output the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use JobConf class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems to work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to convert my
    custom writable object to another custom dbwritable object .. that will
    then
    be written to the database .. any hints on how to accomplish this ..

    Sorry if the questions arent very clear .. I am just really confused about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
  • Adeel Qureshi at Feb 4, 2011 at 8:01 pm
    Thanks - I switched to using the mapreduce version of dboutputformat and
    things look a little better but I am getting a ClassCastException ..

    here is my writable class
    public class LogRecord implements Writable, DBWritable {
    private long timestamp;
    private String userId;
    private String action;

    public LogRecord() {
    }

    public LogRecord(long timestamp, String userId, String action,
    String pageType, String pageName, String attrPath, String
    attrName,
    String forEntity, String forEntityInfo, long rendTime) {
    this.timestamp = timestamp;
    this.userId = userId;
    this.action = action;
    }

    public void clearFields(){
    this.timestamp = 0;
    this.userId = "";
    this.action = "";
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((action == null) ? 0 :
    action.hashCode());
    result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
    result = prime * result + ((userId == null) ? 0 :
    userId.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    LogRecord other = (LogRecord) obj;
    if (action == null) {
    if (other.action != null)
    return false;
    } else if (!action.equals(other.action))
    return false;
    if (timestamp != other.timestamp)
    return false;
    if (userId == null) {
    if (other.userId != null)
    return false;
    } else if (!userId.equals(other.userId))
    return false;
    return true;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.timestamp = in.readLong();
    this.userId = Text.readString(in);
    this.action = Text.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp);
    Text.writeString(out, this.userId);
    Text.writeString(out, this.action);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
    this.timestamp = rs.getLong(1);
    this.userId = rs.getString(2);
    this.action = rs.getString(3);
    }

    @Override
    public void write(PreparedStatement stmt) throws SQLException {
    stmt.setLong(1, this.timestamp);
    stmt.setString(2, this.userId);
    stmt.setString(3, this.action);
    }

    public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
    }
    public void setUserId(String userId) {
    this.userId = userId;
    }
    public void setAction(String action) {
    this.action = action;
    }
    }
    **************************************

    here is my job runner/configuration code

    //configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Log Parser Job");

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "jdbc:sqlserver://..........",
    "...", "...");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    //job properties
    job.setJarByClass(Driver.class);

    job.setMapperClass(LogParserMapper.class);
    job.setReducerClass(LogParserReducer.class);

    job.setMapOutputKeyClass(LogRecord.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(LogRecord.class);
    job.setOutputValueClass(NullWritable.class);

    *************

    mapper code:
    public class LogParserMapper extends Mapper<LongWritable, Text, LogRecord,
    IntWritable> {

    private LogRecord rec = new LogRecord();
    private final static IntWritable _val = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context){
    String line = value.toString();
    //parse the line into tokens
    ...
    rec.setUserId(userId);
    rec.setAction("test");
    rec.setTimestamp(0);
    }
    }

    ******************
    reducer:

    public class LogParserReducer extends Reducer<LogRecord, IntWritable,
    LogRecord, NullWritable> {
    private NullWritable n = NullWritable.get();

    public void reduce(LogRecord key, Iterable<IntWritable> values, Context
    context) throws IOException, InterruptedException {
    context.write(key, n);
    }
    }

    ******************
    finally when i run it I am getting this error message

    11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
    attempt_201101241250_0094_m_000000_1, Status : FAILED
    java.lang.ClassCastException: class logparser.model.LogRecord
    at java.lang.Class.asSubclass(Class.java:3018)
    at
    org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
    at
    org.apache.hadoop.mapred.MapTask$MapOutputBuffer.(MapTask.java:549)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


    my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
    while running it and if thats the problem.

    Thanks for your help

    On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu wrote:

    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.

    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    I had started a thread recently to ask questions about custom writable
    implementations which is basically similar to this .. but that was more of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a database ..

    I am just trying to understand how to accomplish this. here is what I am
    thinking i need to do based on my little understanding of all this custom
    stuff

    1. create a custom writable class that can hold my parsed records. in my
    mapper create a new instance of it using the text line and output the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use JobConf class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems to work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to convert my
    custom writable object to another custom dbwritable object .. that will
    then
    be written to the database .. any hints on how to accomplish this ..

    Sorry if the questions arent very clear .. I am just really confused about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
  • Adeel Qureshi at Feb 6, 2011 at 4:12 pm
    I am kind of stuck on this problem and really need help from you guys on
    this .. problem is simple .. all my code is in this email and I am getting
    java.lang.ClassCastException

    I would appreciate any ideas or hints
    On Fri, Feb 4, 2011 at 2:00 PM, Adeel Qureshi wrote:

    Thanks - I switched to using the mapreduce version of dboutputformat and
    things look a little better but I am getting a ClassCastException ..

    here is my writable class
    public class LogRecord implements Writable, DBWritable {
    private long timestamp;
    private String userId;
    private String action;

    public LogRecord() {
    }

    public LogRecord(long timestamp, String userId, String action,
    String pageType, String pageName, String attrPath, String
    attrName,
    String forEntity, String forEntityInfo, long rendTime) {
    this.timestamp = timestamp;
    this.userId = userId;
    this.action = action;
    }

    public void clearFields(){
    this.timestamp = 0;
    this.userId = "";
    this.action = "";
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((action == null) ? 0 :
    action.hashCode());
    result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
    result = prime * result + ((userId == null) ? 0 :
    userId.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    LogRecord other = (LogRecord) obj;
    if (action == null) {
    if (other.action != null)
    return false;
    } else if (!action.equals(other.action))
    return false;
    if (timestamp != other.timestamp)
    return false;
    if (userId == null) {
    if (other.userId != null)
    return false;
    } else if (!userId.equals(other.userId))
    return false;
    return true;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.timestamp = in.readLong();
    this.userId = Text.readString(in);
    this.action = Text.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp);
    Text.writeString(out, this.userId);
    Text.writeString(out, this.action);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
    this.timestamp = rs.getLong(1);
    this.userId = rs.getString(2);
    this.action = rs.getString(3);
    }

    @Override
    public void write(PreparedStatement stmt) throws SQLException {
    stmt.setLong(1, this.timestamp);
    stmt.setString(2, this.userId);
    stmt.setString(3, this.action);
    }

    public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
    }
    public void setUserId(String userId) {
    this.userId = userId;
    }
    public void setAction(String action) {
    this.action = action;
    }
    }
    **************************************

    here is my job runner/configuration code

    //configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Log Parser Job");

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "jdbc:sqlserver://..........",
    "...", "...");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    //job properties
    job.setJarByClass(Driver.class);

    job.setMapperClass(LogParserMapper.class);
    job.setReducerClass(LogParserReducer.class);

    job.setMapOutputKeyClass(LogRecord.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(LogRecord.class);
    job.setOutputValueClass(NullWritable.class);

    *************

    mapper code:
    public class LogParserMapper extends Mapper<LongWritable, Text, LogRecord,
    IntWritable> {

    private LogRecord rec = new LogRecord();
    private final static IntWritable _val = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context){
    String line = value.toString();
    //parse the line into tokens
    ...
    rec.setUserId(userId);
    rec.setAction("test");
    rec.setTimestamp(0);
    }
    }

    ******************
    reducer:

    public class LogParserReducer extends Reducer<LogRecord, IntWritable,
    LogRecord, NullWritable> {
    private NullWritable n = NullWritable.get();

    public void reduce(LogRecord key, Iterable<IntWritable> values, Context
    context) throws IOException, InterruptedException {
    context.write(key, n);
    }
    }

    ******************
    finally when i run it I am getting this error message

    11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
    attempt_201101241250_0094_m_000000_1, Status : FAILED
    java.lang.ClassCastException: class logparser.model.LogRecord
    at java.lang.Class.asSubclass(Class.java:3018)
    at
    org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
    at
    org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:809)
    at
    org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:549)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


    my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
    while running it and if thats the problem.

    Thanks for your help


    On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu wrote:

    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.

    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    I had started a thread recently to ask questions about custom writable
    implementations which is basically similar to this .. but that was more of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some
    information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a database ..

    I am just trying to understand how to accomplish this. here is what I am
    thinking i need to do based on my little understanding of all this custom
    stuff

    1. create a custom writable class that can hold my parsed records. in my
    mapper create a new instance of it using the text line and output the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use JobConf class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems to work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to convert my
    custom writable object to another custom dbwritable object .. that will
    then
    be written to the database .. any hints on how to accomplish this ..

    Sorry if the questions arent very clear .. I am just really confused about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
  • Ted Yu at Feb 6, 2011 at 5:00 pm
    I think you have looked at
    src/examples/org/apache/hadoop/examples/DBCountPageView.java
    where:
    job.setMapOutputKeyClass(Text.class);

    and:
    public class Text extends BinaryComparable
    implements WritableComparable<BinaryComparable> {
    ...
    static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
    }

    Modify your LogRecord accordingly.
    On Fri, Feb 4, 2011 at 12:00 PM, Adeel Qureshi wrote:

    Thanks - I switched to using the mapreduce version of dboutputformat and
    things look a little better but I am getting a ClassCastException ..

    here is my writable class
    public class LogRecord implements Writable, DBWritable {
    private long timestamp;
    private String userId;
    private String action;

    public LogRecord() {
    }

    public LogRecord(long timestamp, String userId, String action,
    String pageType, String pageName, String attrPath, String
    attrName,
    String forEntity, String forEntityInfo, long rendTime) {
    this.timestamp = timestamp;
    this.userId = userId;
    this.action = action;
    }

    public void clearFields(){
    this.timestamp = 0;
    this.userId = "";
    this.action = "";
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((action == null) ? 0 :
    action.hashCode());
    result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
    result = prime * result + ((userId == null) ? 0 :
    userId.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    LogRecord other = (LogRecord) obj;
    if (action == null) {
    if (other.action != null)
    return false;
    } else if (!action.equals(other.action))
    return false;
    if (timestamp != other.timestamp)
    return false;
    if (userId == null) {
    if (other.userId != null)
    return false;
    } else if (!userId.equals(other.userId))
    return false;
    return true;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.timestamp = in.readLong();
    this.userId = Text.readString(in);
    this.action = Text.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp);
    Text.writeString(out, this.userId);
    Text.writeString(out, this.action);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
    this.timestamp = rs.getLong(1);
    this.userId = rs.getString(2);
    this.action = rs.getString(3);
    }

    @Override
    public void write(PreparedStatement stmt) throws SQLException {
    stmt.setLong(1, this.timestamp);
    stmt.setString(2, this.userId);
    stmt.setString(3, this.action);
    }

    public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
    }
    public void setUserId(String userId) {
    this.userId = userId;
    }
    public void setAction(String action) {
    this.action = action;
    }
    }
    **************************************

    here is my job runner/configuration code

    //configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Log Parser Job");

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "jdbc:sqlserver://..........",
    "...", "...");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    //job properties
    job.setJarByClass(Driver.class);

    job.setMapperClass(LogParserMapper.class);
    job.setReducerClass(LogParserReducer.class);

    job.setMapOutputKeyClass(LogRecord.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(LogRecord.class);
    job.setOutputValueClass(NullWritable.class);

    *************

    mapper code:
    public class LogParserMapper extends Mapper<LongWritable, Text, LogRecord,
    IntWritable> {

    private LogRecord rec = new LogRecord();
    private final static IntWritable _val = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context){
    String line = value.toString();
    //parse the line into tokens
    ...
    rec.setUserId(userId);
    rec.setAction("test");
    rec.setTimestamp(0);
    }
    }

    ******************
    reducer:

    public class LogParserReducer extends Reducer<LogRecord, IntWritable,
    LogRecord, NullWritable> {
    private NullWritable n = NullWritable.get();

    public void reduce(LogRecord key, Iterable<IntWritable> values, Context
    context) throws IOException, InterruptedException {
    context.write(key, n);
    }
    }

    ******************
    finally when i run it I am getting this error message

    11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
    attempt_201101241250_0094_m_000000_1, Status : FAILED
    java.lang.ClassCastException: class logparser.model.LogRecord
    at java.lang.Class.asSubclass(Class.java:3018)
    at
    org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
    at
    org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:809)
    at

    org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:549)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at

    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


    my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
    while running it and if thats the problem.

    Thanks for your help

    On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu wrote:

    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.

    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    I had started a thread recently to ask questions about custom writable
    implementations which is basically similar to this .. but that was more of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some
    information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a database
    ..
    I am just trying to understand how to accomplish this. here is what I
    am
    thinking i need to do based on my little understanding of all this
    custom
    stuff

    1. create a custom writable class that can hold my parsed records. in
    my
    mapper create a new instance of it using the text line and output the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use JobConf class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems to work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with
    hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to convert
    my
    custom writable object to another custom dbwritable object .. that will
    then
    be written to the database .. any hints on how to accomplish this ..

    Sorry if the questions arent very clear .. I am just really confused about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
  • Adeel Qureshi at Feb 6, 2011 at 11:29 pm
    Thanks Ted - That makes sense .. If I want the mapper to pass around a
    custom object as Key .. I need to provide the compareTo method = implement
    WritableComparable instead of Writable .. I did that and map stage is
    working fine now .. reducer is throwing an error now .. I am assuming its
    not able to communicate with database since its an IO exception but it
    doesnt provides much useful information ..

    11/02/06 17:26:51 INFO mapred.JobClient: Task Id :
    attempt_201102061204_0004_r_000000_0, Status : FAILED
    java.io.IOException
    at
    org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.getRecordWriter(DBOutputFormat.java:180)
    at
    org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:557)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:412)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)

    On Sun, Feb 6, 2011 at 11:00 AM, Ted Yu wrote:

    I think you have looked at
    src/examples/org/apache/hadoop/examples/DBCountPageView.java
    where:
    job.setMapOutputKeyClass(Text.class);

    and:
    public class Text extends BinaryComparable
    implements WritableComparable<BinaryComparable> {
    ...
    static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
    }

    Modify your LogRecord accordingly.

    On Fri, Feb 4, 2011 at 12:00 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    Thanks - I switched to using the mapreduce version of dboutputformat and
    things look a little better but I am getting a ClassCastException ..

    here is my writable class
    public class LogRecord implements Writable, DBWritable {
    private long timestamp;
    private String userId;
    private String action;

    public LogRecord() {
    }

    public LogRecord(long timestamp, String userId, String action,
    String pageType, String pageName, String attrPath, String
    attrName,
    String forEntity, String forEntityInfo, long rendTime) {
    this.timestamp = timestamp;
    this.userId = userId;
    this.action = action;
    }

    public void clearFields(){
    this.timestamp = 0;
    this.userId = "";
    this.action = "";
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((action == null) ? 0 :
    action.hashCode());
    result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
    result = prime * result + ((userId == null) ? 0 :
    userId.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    LogRecord other = (LogRecord) obj;
    if (action == null) {
    if (other.action != null)
    return false;
    } else if (!action.equals(other.action))
    return false;
    if (timestamp != other.timestamp)
    return false;
    if (userId == null) {
    if (other.userId != null)
    return false;
    } else if (!userId.equals(other.userId))
    return false;
    return true;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.timestamp = in.readLong();
    this.userId = Text.readString(in);
    this.action = Text.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp);
    Text.writeString(out, this.userId);
    Text.writeString(out, this.action);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
    this.timestamp = rs.getLong(1);
    this.userId = rs.getString(2);
    this.action = rs.getString(3);
    }

    @Override
    public void write(PreparedStatement stmt) throws SQLException {
    stmt.setLong(1, this.timestamp);
    stmt.setString(2, this.userId);
    stmt.setString(3, this.action);
    }

    public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
    }
    public void setUserId(String userId) {
    this.userId = userId;
    }
    public void setAction(String action) {
    this.action = action;
    }
    }
    **************************************

    here is my job runner/configuration code

    //configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Log Parser Job");

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "jdbc:sqlserver://..........",
    "...", "...");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    //job properties
    job.setJarByClass(Driver.class);

    job.setMapperClass(LogParserMapper.class);
    job.setReducerClass(LogParserReducer.class);

    job.setMapOutputKeyClass(LogRecord.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(LogRecord.class);
    job.setOutputValueClass(NullWritable.class);

    *************

    mapper code:
    public class LogParserMapper extends Mapper<LongWritable, Text,
    LogRecord,
    IntWritable> {

    private LogRecord rec = new LogRecord();
    private final static IntWritable _val = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context){
    String line = value.toString();
    //parse the line into tokens
    ...
    rec.setUserId(userId);
    rec.setAction("test");
    rec.setTimestamp(0);
    }
    }

    ******************
    reducer:

    public class LogParserReducer extends Reducer<LogRecord, IntWritable,
    LogRecord, NullWritable> {
    private NullWritable n = NullWritable.get();

    public void reduce(LogRecord key, Iterable<IntWritable> values, Context
    context) throws IOException, InterruptedException {
    context.write(key, n);
    }
    }

    ******************
    finally when i run it I am getting this error message

    11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
    attempt_201101241250_0094_m_000000_1, Status : FAILED
    java.lang.ClassCastException: class logparser.model.LogRecord
    at java.lang.Class.asSubclass(Class.java:3018)
    at
    org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
    at
    org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:809)
    at

    org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:549)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at

    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


    my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
    while running it and if thats the problem.

    Thanks for your help

    On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu wrote:

    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.

    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    I had started a thread recently to ask questions about custom
    writable
    implementations which is basically similar to this .. but that was
    more
    of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some
    information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a database
    ..
    I am just trying to understand how to accomplish this. here is what I
    am
    thinking i need to do based on my little understanding of all this
    custom
    stuff

    1. create a custom writable class that can hold my parsed records. in
    my
    mapper create a new instance of it using the text line and output the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use JobConf class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems to work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with
    hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to
    convert
    my
    custom writable object to another custom dbwritable object .. that
    will
    then
    be written to the database .. any hints on how to accomplish this ..

    Sorry if the questions arent very clear .. I am just really confused about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
  • Ted Yu at Feb 7, 2011 at 4:55 am
    Have you noticed this method in DBConfiguration ?
    public static void configureDB(Configuration conf, String driverClass,
    String dbUrl, String userName, String passwd) {

    DBCountPageView has this call:
    DBConfiguration.configureDB(job, driverClassName, url);

    On Sun, Feb 6, 2011 at 3:28 PM, Adeel Qureshi wrote:

    Thanks Ted - That makes sense .. If I want the mapper to pass around a
    custom object as Key .. I need to provide the compareTo method = implement
    WritableComparable instead of Writable .. I did that and map stage is
    working fine now .. reducer is throwing an error now .. I am assuming its
    not able to communicate with database since its an IO exception but it
    doesnt provides much useful information ..

    11/02/06 17:26:51 INFO mapred.JobClient: Task Id :
    attempt_201102061204_0004_r_000000_0, Status : FAILED
    java.io.IOException
    at

    org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.getRecordWriter(DBOutputFormat.java:180)
    at
    org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:557)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:412)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at

    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)

    On Sun, Feb 6, 2011 at 11:00 AM, Ted Yu wrote:

    I think you have looked at
    src/examples/org/apache/hadoop/examples/DBCountPageView.java
    where:
    job.setMapOutputKeyClass(Text.class);

    and:
    public class Text extends BinaryComparable
    implements WritableComparable<BinaryComparable> {
    ...
    static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
    }

    Modify your LogRecord accordingly.

    On Fri, Feb 4, 2011 at 12:00 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    Thanks - I switched to using the mapreduce version of dboutputformat
    and
    things look a little better but I am getting a ClassCastException ..

    here is my writable class
    public class LogRecord implements Writable, DBWritable {
    private long timestamp;
    private String userId;
    private String action;

    public LogRecord() {
    }

    public LogRecord(long timestamp, String userId, String action,
    String pageType, String pageName, String attrPath, String
    attrName,
    String forEntity, String forEntityInfo, long rendTime) {
    this.timestamp = timestamp;
    this.userId = userId;
    this.action = action;
    }

    public void clearFields(){
    this.timestamp = 0;
    this.userId = "";
    this.action = "";
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((action == null) ? 0 :
    action.hashCode());
    result = prime * result + (int) (timestamp ^ (timestamp >>>
    32));
    result = prime * result + ((userId == null) ? 0 :
    userId.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    LogRecord other = (LogRecord) obj;
    if (action == null) {
    if (other.action != null)
    return false;
    } else if (!action.equals(other.action))
    return false;
    if (timestamp != other.timestamp)
    return false;
    if (userId == null) {
    if (other.userId != null)
    return false;
    } else if (!userId.equals(other.userId))
    return false;
    return true;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.timestamp = in.readLong();
    this.userId = Text.readString(in);
    this.action = Text.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp);
    Text.writeString(out, this.userId);
    Text.writeString(out, this.action);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
    this.timestamp = rs.getLong(1);
    this.userId = rs.getString(2);
    this.action = rs.getString(3);
    }

    @Override
    public void write(PreparedStatement stmt) throws SQLException {
    stmt.setLong(1, this.timestamp);
    stmt.setString(2, this.userId);
    stmt.setString(3, this.action);
    }

    public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
    }
    public void setUserId(String userId) {
    this.userId = userId;
    }
    public void setAction(String action) {
    this.action = action;
    }
    }
    **************************************

    here is my job runner/configuration code

    //configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Log Parser Job");

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "jdbc:sqlserver://..........",
    "...", "...");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    //job properties
    job.setJarByClass(Driver.class);

    job.setMapperClass(LogParserMapper.class);
    job.setReducerClass(LogParserReducer.class);

    job.setMapOutputKeyClass(LogRecord.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(LogRecord.class);
    job.setOutputValueClass(NullWritable.class);

    *************

    mapper code:
    public class LogParserMapper extends Mapper<LongWritable, Text,
    LogRecord,
    IntWritable> {

    private LogRecord rec = new LogRecord();
    private final static IntWritable _val = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context){
    String line = value.toString();
    //parse the line into tokens
    ...
    rec.setUserId(userId);
    rec.setAction("test");
    rec.setTimestamp(0);
    }
    }

    ******************
    reducer:

    public class LogParserReducer extends Reducer<LogRecord, IntWritable,
    LogRecord, NullWritable> {
    private NullWritable n = NullWritable.get();

    public void reduce(LogRecord key, Iterable<IntWritable> values, Context
    context) throws IOException, InterruptedException {
    context.write(key, n);
    }
    }

    ******************
    finally when i run it I am getting this error message

    11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
    attempt_201101241250_0094_m_000000_1, Status : FAILED
    java.lang.ClassCastException: class logparser.model.LogRecord
    at java.lang.Class.asSubclass(Class.java:3018)
    at
    org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
    at
    org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:809)
    at
    org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:549)
    at
    org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


    my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
    while running it and if thats the problem.

    Thanks for your help

    On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu wrote:

    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.

    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <
    adeelmahmood@gmail.com
    wrote:
    I had started a thread recently to ask questions about custom
    writable
    implementations which is basically similar to this .. but that was
    more
    of
    an understanding of the concept and here I wanted to ask my actual problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some
    information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a
    database
    ..
    I am just trying to understand how to accomplish this. here is what
    I
    am
    thinking i need to do based on my little understanding of all this
    custom
    stuff

    1. create a custom writable class that can hold my parsed records.
    in
    my
    mapper create a new instance of it using the text line and output
    the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use
    JobConf
    class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt seems
    to
    work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with
    hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to
    convert
    my
    custom writable object to another custom dbwritable object .. that
    will
    then
    be written to the database .. any hints on how to accomplish this
    ..
    Sorry if the questions arent very clear .. I am just really
    confused
    about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel
  • Adeel Qureshi at Feb 7, 2011 at 2:16 pm
    yeah and I have that setup correctly ...

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",

    "jdbc:sqlserver://r0145981\\SQLEXPRESS:3921;databaseName=BR;SelectMethod=cursor",
    "localsqluser", "localsqluser");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    I have checked this connection string directly and was able to connect to
    the database .. I also tried putting the sql jdbc jar in hadoop/lib folder
    as well as in application/lib jar. It still doesnt works .. it keeps giving
    me the same IO Exception on getRecordWriter. I guess I will continue to look
    into it to see if I can find anything else wrong with it .. THanks for all
    your help
    On Sun, Feb 6, 2011 at 10:55 PM, Ted Yu wrote:

    Have you noticed this method in DBConfiguration ?
    public static void configureDB(Configuration conf, String driverClass,
    String dbUrl, String userName, String passwd) {

    DBCountPageView has this call:
    DBConfiguration.configureDB(job, driverClassName, url);


    On Sun, Feb 6, 2011 at 3:28 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    Thanks Ted - That makes sense .. If I want the mapper to pass around a
    custom object as Key .. I need to provide the compareTo method = implement
    WritableComparable instead of Writable .. I did that and map stage is
    working fine now .. reducer is throwing an error now .. I am assuming its
    not able to communicate with database since its an IO exception but it
    doesnt provides much useful information ..

    11/02/06 17:26:51 INFO mapred.JobClient: Task Id :
    attempt_201102061204_0004_r_000000_0, Status : FAILED
    java.io.IOException
    at

    org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.getRecordWriter(DBOutputFormat.java:180)
    at
    org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:557)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:412)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at

    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)

    On Sun, Feb 6, 2011 at 11:00 AM, Ted Yu wrote:

    I think you have looked at
    src/examples/org/apache/hadoop/examples/DBCountPageView.java
    where:
    job.setMapOutputKeyClass(Text.class);

    and:
    public class Text extends BinaryComparable
    implements WritableComparable<BinaryComparable> {
    ...
    static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
    }

    Modify your LogRecord accordingly.

    On Fri, Feb 4, 2011 at 12:00 PM, Adeel Qureshi <adeelmahmood@gmail.com
    wrote:
    Thanks - I switched to using the mapreduce version of dboutputformat
    and
    things look a little better but I am getting a ClassCastException ..

    here is my writable class
    public class LogRecord implements Writable, DBWritable {
    private long timestamp;
    private String userId;
    private String action;

    public LogRecord() {
    }

    public LogRecord(long timestamp, String userId, String action,
    String pageType, String pageName, String attrPath, String
    attrName,
    String forEntity, String forEntityInfo, long rendTime) {
    this.timestamp = timestamp;
    this.userId = userId;
    this.action = action;
    }

    public void clearFields(){
    this.timestamp = 0;
    this.userId = "";
    this.action = "";
    }

    @Override
    public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((action == null) ? 0 :
    action.hashCode());
    result = prime * result + (int) (timestamp ^ (timestamp >>>
    32));
    result = prime * result + ((userId == null) ? 0 :
    userId.hashCode());
    return result;
    }

    @Override
    public boolean equals(Object obj) {
    if (this == obj)
    return true;
    if (obj == null)
    return false;
    if (getClass() != obj.getClass())
    return false;
    LogRecord other = (LogRecord) obj;
    if (action == null) {
    if (other.action != null)
    return false;
    } else if (!action.equals(other.action))
    return false;
    if (timestamp != other.timestamp)
    return false;
    if (userId == null) {
    if (other.userId != null)
    return false;
    } else if (!userId.equals(other.userId))
    return false;
    return true;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.timestamp = in.readLong();
    this.userId = Text.readString(in);
    this.action = Text.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(this.timestamp);
    Text.writeString(out, this.userId);
    Text.writeString(out, this.action);
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
    this.timestamp = rs.getLong(1);
    this.userId = rs.getString(2);
    this.action = rs.getString(3);
    }

    @Override
    public void write(PreparedStatement stmt) throws SQLException {
    stmt.setLong(1, this.timestamp);
    stmt.setString(2, this.userId);
    stmt.setString(3, this.action);
    }

    public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
    }
    public void setUserId(String userId) {
    this.userId = userId;
    }
    public void setAction(String action) {
    this.action = action;
    }
    }
    **************************************

    here is my job runner/configuration code

    //configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Log Parser Job");

    //configure database output
    job.setOutputFormatClass(DBOutputFormat.class);
    DBConfiguration.configureDB(conf,
    "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "jdbc:sqlserver://..........",
    "...", "...");
    String[] fields = {"timestamp", "userId", "action"};
    DBOutputFormat.setOutput(job, "LogParser", fields);

    //job properties
    job.setJarByClass(Driver.class);

    job.setMapperClass(LogParserMapper.class);
    job.setReducerClass(LogParserReducer.class);

    job.setMapOutputKeyClass(LogRecord.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(LogRecord.class);
    job.setOutputValueClass(NullWritable.class);

    *************

    mapper code:
    public class LogParserMapper extends Mapper<LongWritable, Text,
    LogRecord,
    IntWritable> {

    private LogRecord rec = new LogRecord();
    private final static IntWritable _val = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context){
    String line = value.toString();
    //parse the line into tokens
    ...
    rec.setUserId(userId);
    rec.setAction("test");
    rec.setTimestamp(0);
    }
    }

    ******************
    reducer:

    public class LogParserReducer extends Reducer<LogRecord, IntWritable,
    LogRecord, NullWritable> {
    private NullWritable n = NullWritable.get();

    public void reduce(LogRecord key, Iterable<IntWritable> values, Context
    context) throws IOException, InterruptedException {
    context.write(key, n);
    }
    }

    ******************
    finally when i run it I am getting this error message

    11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
    attempt_201101241250_0094_m_000000_1, Status : FAILED
    java.lang.ClassCastException: class logparser.model.LogRecord
    at java.lang.Class.asSubclass(Class.java:3018)
    at
    org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
    at
    org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:809)
    at
    org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:549)
    at
    org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at
    org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


    my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
    while running it and if thats the problem.

    Thanks for your help

    On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu wrote:

    At least in cdh3b2, there are two DBOutputFormat.java:

    ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

    You should be able to use the latter.

    On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <
    adeelmahmood@gmail.com
    wrote:
    I had started a thread recently to ask questions about custom
    writable
    implementations which is basically similar to this .. but that
    was
    more
    of
    an understanding of the concept and here I wanted to ask my
    actual
    problem
    and get help on that.

    I want to be able to read text data line by line in my mapper ..
    create an instance of a custom writable class that holds some
    information
    parsed out of the line ..
    pass that custom writable along with its count to reducer
    reducer then simply need to insert every single entry into a
    database
    ..
    I am just trying to understand how to accomplish this. here is
    what
    I
    am
    thinking i need to do based on my little understanding of all
    this
    custom
    stuff

    1. create a custom writable class that can hold my parsed
    records.
    in
    my
    mapper create a new instance of it using the text line and output
    the
    created instance
    2. accept this custom writable in mapper
    3. set reducer output to DBOutputFormat
    I tried doing that and it seems like I am supposed to use
    JobConf
    class
    which is deprecated and the new configuration class where you are supposed
    to use the job object to set the input/output formats doesnt
    seems
    to
    work
    with DBOuputFormat .. doesnt this DBOutputFormat stuff works with
    hadoop
    new
    api

    4. now in reducer I am confused wat to do .. i guess i need to
    convert
    my
    custom writable object to another custom dbwritable object ..
    that
    will
    then
    be written to the database .. any hints on how to accomplish this
    ..
    Sorry if the questions arent very clear .. I am just really
    confused
    about
    this stuff and it doesnt helps that there is literally NO useful
    information
    available anywhere on this writable and dbwritable stuff

    Thanks
    Adeel

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedFeb 3, '11 at 10:46p
activeFeb 7, '11 at 2:16p
posts9
users2
websitehadoop.apache.org...
irc#hadoop

2 users in discussion

Adeel Qureshi: 6 posts Ted Yu: 3 posts

People

Translate

site design / logo © 2022 Grokbase