FAQ
Hi,

Can I recursively launch a job within the map() method of a mapper?
The task i am facing involves extracting metadata for filenames and
catalog names from hierarchical catalogs, and for each catalog name
extract, I must recursively do the same thing until no more catalog
names present. Therefore, it will be convenient if I can launch a job
within the map(), but I want to make certain that it will work before
investing too much efforts into the coding.

If it won't work, could you suggest the proper way to go about doing
so? Much appreciated in advance.

-- Jim

Search Discussions

  • Lohit Vijayarenu at Oct 29, 2007 at 9:00 pm
    Can I recursively launch a job within the map() method of a mapper?
    Interesting. If your launch is blocking, then I am thinking possibility of deadlock. Lets say you have N nodes and your initial map tasks are scheduled on all N nodes. Now if each of the map tasks launch few more jobs, then the new ones are sitting there waiting for free nodes.

    Thanks,
    Lohit
    ----- Original Message ----
    From: Jim the Standing Bear <standingbear@gmail.com>
    To: hadoop-user@lucene.apache.org
    Sent: Monday, October 29, 2007 1:28:17 PM
    Subject: can jobs be launched recursively within a mapper ?


    Hi,

    Can I recursively launch a job within the map() method of a mapper?
    The task i am facing involves extracting metadata for filenames and
    catalog names from hierarchical catalogs, and for each catalog name
    extract, I must recursively do the same thing until no more catalog
    names present. Therefore, it will be convenient if I can launch a job
    within the map(), but I want to make certain that it will work before
    investing too much efforts into the coding.

    If it won't work, could you suggest the proper way to go about doing
    so? Much appreciated in advance.

    -- Jim




    __________________________________________________
    Do You Yahoo!?
    Tired of spam? Yahoo! Mail has the best spam protection around
    http://mail.yahoo.com
  • Jim the Standing Bear at Oct 29, 2007 at 9:19 pm
    Hi Lohit,

    Thank you for the response. If jobs cannot be recursively launched,
    does it mean that I have to pre-process all the catalogs and putting
    only filenames into a sequence file and then use hadoop from that
    point on?

    -- Jim
    On 10/29/07, lohit.vijayarenu@yahoo.com wrote:
    Can I recursively launch a job within the map() method of a mapper?
    Interesting. If your launch is blocking, then I am thinking possibility of deadlock. Lets say you have N nodes and your initial map tasks are scheduled on all N nodes. Now if each of the map tasks launch few more jobs, then the new ones are sitting there waiting for free nodes.

    Thanks,
    Lohit
    ----- Original Message ----
    From: Jim the Standing Bear <standingbear@gmail.com>
    To: hadoop-user@lucene.apache.org
    Sent: Monday, October 29, 2007 1:28:17 PM
    Subject: can jobs be launched recursively within a mapper ?


    Hi,

    Can I recursively launch a job within the map() method of a mapper?
    The task i am facing involves extracting metadata for filenames and
    catalog names from hierarchical catalogs, and for each catalog name
    extract, I must recursively do the same thing until no more catalog
    names present. Therefore, it will be convenient if I can launch a job
    within the map(), but I want to make certain that it will work before
    investing too much efforts into the coding.

    If it won't work, could you suggest the proper way to go about doing
    so? Much appreciated in advance.

    -- Jim




    __________________________________________________
    Do You Yahoo!?
    Tired of spam? Yahoo! Mail has the best spam protection around
    http://mail.yahoo.com

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Owen O'Malley at Oct 29, 2007 at 9:25 pm

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    If jobs cannot be recursively launched,
    does it mean that I have to pre-process all the catalogs and putting
    only filenames into a sequence file and then use hadoop from that
    point on?
    As Lohit wrote, launching recursive jobs that block the current task
    don't work. A much better approach is to have a map/reduce job that
    generates the work list for the next iteration and continuing on
    until there are no new items to process.

    -- Owen
  • David Balatero at Oct 29, 2007 at 9:33 pm
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then
  • Jim the Standing Bear at Oct 29, 2007 at 9:46 pm
    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Ted Dunning at Oct 30, 2007 at 4:52 pm
    Jim,

    Think like a web crawler.

    You have a list of catalogs you already know about and you can create a list
    of catalogs as you discover them.

    Then you can merge your old list with your new list with suitable old/new
    marks and only keep new ones that do not have an old version.

    When you run out of new ones, you are done.

    You will have to do some file copying so that you have your new list where
    you want it and have all of your old generations handy as well, but it isn't
    hard. The outer loop of your program will be, as people have pointed out,
    in the main program.

    On 10/29/07 2:17 PM, "Jim the Standing Bear" wrote:

    Hi Lohit,

    Thank you for the response. If jobs cannot be recursively launched,
    does it mean that I have to pre-process all the catalogs and putting
    only filenames into a sequence file and then use hadoop from that
    point on?

    -- Jim
    On 10/29/07, lohit.vijayarenu@yahoo.com wrote:
    Can I recursively launch a job within the map() method of a mapper?
    Interesting. If your launch is blocking, then I am thinking possibility of
    deadlock. Lets say you have N nodes and your initial map tasks are scheduled
    on all N nodes. Now if each of the map tasks launch few more jobs, then the
    new ones are sitting there waiting for free nodes.

    Thanks,
    Lohit
    ----- Original Message ----
    From: Jim the Standing Bear <standingbear@gmail.com>
    To: hadoop-user@lucene.apache.org
    Sent: Monday, October 29, 2007 1:28:17 PM
    Subject: can jobs be launched recursively within a mapper ?


    Hi,

    Can I recursively launch a job within the map() method of a mapper?
    The task i am facing involves extracting metadata for filenames and
    catalog names from hierarchical catalogs, and for each catalog name
    extract, I must recursively do the same thing until no more catalog
    names present. Therefore, it will be convenient if I can launch a job
    within the map(), but I want to make certain that it will work before
    investing too much efforts into the coding.

    If it won't work, could you suggest the proper way to go about doing
    so? Much appreciated in advance.

    -- Jim




    __________________________________________________
    Do You Yahoo!?
    Tired of spam? Yahoo! Mail has the best spam protection around
    http://mail.yahoo.com
  • Stu Hood at Oct 29, 2007 at 10:01 pm
    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Jim the Standing Bear at Oct 30, 2007 at 1:57 am
    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:
    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------


    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Aaron Kimball at Oct 30, 2007 at 2:40 am
    If you modify the JobConf for a running job within the context of a
    mapper, the changes will not propagate back to the other machines.
    JobConfs are serialized to XML and then distributed to the mapping nodes
    where they are read back into the running Java tasks. There is no
    "refresh" function that I am aware of.

    - Aaron

    Jim the Standing Bear wrote:
    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:

    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:

    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David


    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then
    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------


  • Jim the Standing Bear at Oct 30, 2007 at 3:04 pm
    thank you for all the help. I think I am beginning to gain a more
    clear picture of hadoop. I will try the file solution.
    On 10/29/07, Aaron Kimball wrote:
    If you modify the JobConf for a running job within the context of a
    mapper, the changes will not propagate back to the other machines.
    JobConfs are serialized to XML and then distributed to the mapping nodes
    where they are read back into the running Java tasks. There is no
    "refresh" function that I am aware of.

    - Aaron

    Jim the Standing Bear wrote:
    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:

    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:

    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David


    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then
    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------



    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Johnson, Jorgen at Oct 30, 2007 at 3:51 pm
    Hey all,

    Any reason why something like this wouldn't work?

    Create a QueueInputFormat, which provides a RecordReader implementation that pops values off a globally accessible queue*. This would require filling the queue with values prior to loading the map/red job. This would allow the mappers to cram values back into the queue for further processing when necessary.

    We could even avoid pre-loading the queue by combining this with some form of hybrid InputFormat which first reads a normal input file, but after each reader has finished reading its piece of the input file (and before it receives the OK to quit), it would try to pull items off the queue for further processing.

    At the bottom is a pseudo-implementation to help illustrate what I'm saying.

    -jorgenj

    *A simple db table or file store in hdfs might be ways to accomplish this globally accessible queue, or something like SQS might work: http://www.amazon.com/Simple-Queue-Service-home-page/b?ie=UTF8&node=13584001 =)


    /** InputFormat which allows map/red jobs to process values
    * out of a globally accessible queue
    *
    * Note: The queue must be primed prior to kicking off any job which
    * attempts to process values in the queue
    */
    public class QueueInputFormat extends Object implements InputFormat {

    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
    return new QueueRecordReader();
    }

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    int overrideNumSplits = job.getInt("input.queue.hardcoded.num.maps", numSplits);

    InputSplit[] splits = new InputSplit[overrideNumSplits];
    for ( int i = 0; i < numSplits; i++ ) {
    splits[i] = new QueueInputSplit(i);
    }

    return splits;
    }

    public void validateInput(JobConf job) throws IOException {
    throw new UnsupportedOperationException("Implement your validation, if needed");
    }

    private static final class QueueRecordReader implements RecordReader {
    public void close() throws IOException {
    // close the queue
    }

    public WritableComparable createKey() {
    //this might be the id of this object from the queue, or whatever you want
    return new LongWritable();
    }

    public Writable createValue() {
    //replace this with a type representing what's in your queue
    return new Text();
    }

    public long getPos() throws IOException {
    return (queueIsExhausted()?1:0);
    }

    public float getProgress() throws IOException {
    //could change this to report progress based on size of queue...
    return (queueIsExhausted()?1:0);
    }

    public boolean next(Writable key, Writable value) throws IOException {
    ((LongWritable)key).set(1);
    ((Text)value).set(fetchValueFromQueue());

    return true;
    }

    /** Use this to determine when the queue is exhausted, which in
    * turn determines when the mappers will stop (no more items to process)
    */
    private boolean queueIsExhausted() {
    /*
    * This could be as simple as checking if the queue is empty,
    * or this could block until all mappers have signalled* that
    * they are finished AND the queue is empty...
    */
    throw new UnsupportedOperationException("Depends on your queueing impl");
    }

    /** Fetches a value from the queue */
    private String fetchValueFromQueue() {
    throw new UnsupportedOperationException("Depends on your queueing impl");
    }
    }

    private static final class QueueInputSplit implements InputSplit {
    private final int id;

    private QueueInputSplit(int id) {
    this.id = id;
    }

    public long getLength() throws IOException {
    return 1;
    }

    public String[] getLocations() throws IOException {
    return new String[]{"queueIdentifierGoesHere?"+id};
    }

    public void readFields(DataInput in) throws IOException {
    throw new UnsupportedOperationException("Real implementation needs to handle this");
    }

    public void write(DataOutput out) throws IOException {
    throw new UnsupportedOperationException("Real implementation needs to handle this");
    }

    }
    }



    -----Original Message-----
    From: Jim the Standing Bear
    Sent: Tuesday, October 30, 2007 8:04 AM
    To: hadoop-user@lucene.apache.org
    Cc: stuhood@webmail.us
    Subject: Re: can jobs be launched recursively within a mapper ?

    thank you for all the help. I think I am beginning to gain a more
    clear picture of hadoop. I will try the file solution.
    On 10/29/07, Aaron Kimball wrote:
    If you modify the JobConf for a running job within the context of a
    mapper, the changes will not propagate back to the other machines.
    JobConfs are serialized to XML and then distributed to the mapping nodes
    where they are read back into the running Java tasks. There is no
    "refresh" function that I am aware of.

    - Aaron

    Jim the Standing Bear wrote:
    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:

    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:

    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David


    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then
    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------



    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Doug Cutting at Oct 30, 2007 at 4:56 pm

    Johnson, Jorgen wrote:
    Create a QueueInputFormat, which provides a RecordReader implementation that pops values off a globally accessible queue*. This would require filling the queue with values prior to loading the map/red job. This would allow the mappers to cram values back into the queue for further processing when necessary.
    Maintaining such a queue would be tricky, I think. One concern is that
    one might pop the last item from the queue and prematurely terminate the
    job. To fix this you would need to leave things in the queue until
    their map processing completes, but also ensure that no other map task
    removes them from the queue while they're being processed. Then you'd
    need to worry about handling failed tasks. I think it would be far
    simpler to do this iteratively, with multiple mapreduce passes, each
    writing files to a new temporary directory that's the input for the next
    pass, thus performing a breadth-first traversal of the space, with a
    mapreduce stage at each depth.

    Doug
  • Jim the Standing Bear at Oct 30, 2007 at 7:20 pm
    Hi jorgen,

    thank you for taking the time showing me the pseudo-code for a
    queueInputFormat. I will see if I can implement more details and use
    it for my project.

    -- Jim
    On 10/30/07, Johnson, Jorgen wrote:
    Hey all,

    Any reason why something like this wouldn't work?

    Create a QueueInputFormat, which provides a RecordReader implementation that pops values off a globally accessible queue*. This would require filling the queue with values prior to loading the map/red job. This would allow the mappers to cram values back into the queue for further processing when necessary.

    We could even avoid pre-loading the queue by combining this with some form of hybrid InputFormat which first reads a normal input file, but after each reader has finished reading its piece of the input file (and before it receives the OK to quit), it would try to pull items off the queue for further processing.

    At the bottom is a pseudo-implementation to help illustrate what I'm saying.

    -jorgenj

    *A simple db table or file store in hdfs might be ways to accomplish this globally accessible queue, or something like SQS might work: http://www.amazon.com/Simple-Queue-Service-home-page/b?ie=UTF8&node=13584001 =)


    /** InputFormat which allows map/red jobs to process values
    * out of a globally accessible queue
    *
    * Note: The queue must be primed prior to kicking off any job which
    * attempts to process values in the queue
    */
    public class QueueInputFormat extends Object implements InputFormat {

    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
    return new QueueRecordReader();
    }

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    int overrideNumSplits = job.getInt("input.queue.hardcoded.num.maps", numSplits);

    InputSplit[] splits = new InputSplit[overrideNumSplits];
    for ( int i = 0; i < numSplits; i++ ) {
    splits[i] = new QueueInputSplit(i);
    }

    return splits;
    }

    public void validateInput(JobConf job) throws IOException {
    throw new UnsupportedOperationException("Implement your validation, if needed");
    }

    private static final class QueueRecordReader implements RecordReader {
    public void close() throws IOException {
    // close the queue
    }

    public WritableComparable createKey() {
    //this might be the id of this object from the queue, or whatever you want
    return new LongWritable();
    }

    public Writable createValue() {
    //replace this with a type representing what's in your queue
    return new Text();
    }

    public long getPos() throws IOException {
    return (queueIsExhausted()?1:0);
    }

    public float getProgress() throws IOException {
    //could change this to report progress based on size of queue...
    return (queueIsExhausted()?1:0);
    }

    public boolean next(Writable key, Writable value) throws IOException {
    ((LongWritable)key).set(1);
    ((Text)value).set(fetchValueFromQueue());

    return true;
    }

    /** Use this to determine when the queue is exhausted, which in
    * turn determines when the mappers will stop (no more items to process)
    */
    private boolean queueIsExhausted() {
    /*
    * This could be as simple as checking if the queue is empty,
    * or this could block until all mappers have signalled* that
    * they are finished AND the queue is empty...
    */
    throw new UnsupportedOperationException("Depends on your queueing impl");
    }

    /** Fetches a value from the queue */
    private String fetchValueFromQueue() {
    throw new UnsupportedOperationException("Depends on your queueing impl");
    }
    }

    private static final class QueueInputSplit implements InputSplit {
    private final int id;

    private QueueInputSplit(int id) {
    this.id = id;
    }

    public long getLength() throws IOException {
    return 1;
    }

    public String[] getLocations() throws IOException {
    return new String[]{"queueIdentifierGoesHere?"+id};
    }

    public void readFields(DataInput in) throws IOException {
    throw new UnsupportedOperationException("Real implementation needs to handle this");
    }

    public void write(DataOutput out) throws IOException {
    throw new UnsupportedOperationException("Real implementation needs to handle this");
    }

    }
    }



    -----Original Message-----
    From: Jim the Standing Bear
    Sent: Tuesday, October 30, 2007 8:04 AM
    To: hadoop-user@lucene.apache.org
    Cc: stuhood@webmail.us
    Subject: Re: can jobs be launched recursively within a mapper ?

    thank you for all the help. I think I am beginning to gain a more
    clear picture of hadoop. I will try the file solution.
    On 10/29/07, Aaron Kimball wrote:
    If you modify the JobConf for a running job within the context of a
    mapper, the changes will not propagate back to the other machines.
    JobConfs are serialized to XML and then distributed to the mapping nodes
    where they are read back into the running Java tasks. There is no
    "refresh" function that I am aware of.

    - Aaron

    Jim the Standing Bear wrote:
    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:

    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:

    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David


    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then
    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------



    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Ted Dunning at Oct 30, 2007 at 5:03 pm
    When there are no new catalogs to examine, then the main code can exit.

    The easiest way to present this back to the controller is by using the
    counter capability. That what the controller can look at the results of a
    map-reduce step to determine how many new catalogs were found.

    You haven't hit a dead end. This is really a pretty simple program that is
    very similar to what nutch does all the time to crawl web sites.

    On 10/29/07 6:57 PM, "Jim the Standing Bear" wrote:

    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:
    The iteration would take place in your control code (your 'main' method, as
    shown in the examples).

    In order to prevent records from looping infinitely, each iteration would
    need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------

  • Jim the Standing Bear at Oct 30, 2007 at 5:18 pm
    Thanks for jumping in and giving me inputs, Ted. Yes, intuitively it
    is an easy project (we had a conversation a few days back), except
    when it comes to implementation, I am having trouble with the details.

    I tried to look at nutch's source code, but frankly it wasn't trivial.
    I guess I will try again, with what you just said in the emails as a
    guide.

    -- Jim
    On 10/30/07, Ted Dunning wrote:


    When there are no new catalogs to examine, then the main code can exit.

    The easiest way to present this back to the controller is by using the
    counter capability. That what the controller can look at the results of a
    map-reduce step to determine how many new catalogs were found.

    You haven't hit a dead end. This is really a pretty simple program that is
    very similar to what nutch does all the time to crawl web sites.

    On 10/29/07 6:57 PM, "Jim the Standing Bear" wrote:

    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:
    The iteration would take place in your control code (your 'main' method, as
    shown in the examples).

    In order to prevent records from looping infinitely, each iteration would
    need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------


    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Ted Dunning at Oct 30, 2007 at 5:47 pm
    There is a slide show on nutch that would be much more clear. I mentioned
    it some time ago. If you go to the hadoop presentations page on the wiki (
    http://wiki.apache.org/lucene-hadoop/HadoopPresentations) then you will be
    able to find one of the slide shows that goes through the nutch MR steps.

    On 10/30/07 10:17 AM, "Jim the Standing Bear" wrote:

    Thanks for jumping in and giving me inputs, Ted. Yes, intuitively it
    is an easy project (we had a conversation a few days back), except
    when it comes to implementation, I am having trouble with the details.

    I tried to look at nutch's source code, but frankly it wasn't trivial.
    I guess I will try again, with what you just said in the emails as a
    guide.

    -- Jim
    On 10/30/07, Ted Dunning wrote:


    When there are no new catalogs to examine, then the main code can exit.

    The easiest way to present this back to the controller is by using the
    counter capability. That what the controller can look at the results of a
    map-reduce step to determine how many new catalogs were found.

    You haven't hit a dead end. This is really a pretty simple program that is
    very similar to what nutch does all the time to crawl web sites.

    On 10/29/07 6:57 PM, "Jim the Standing Bear" wrote:

    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:
    The iteration would take place in your control code (your 'main' method, as
    shown in the examples).

    In order to prevent records from looping infinitely, each iteration would
    need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------

  • Jim the Standing Bear at Oct 30, 2007 at 7:19 pm
    yes, you pointed me to the slides in a previous thread. I looked at
    it, but when I was reading nutch source code, it escaped my mind.
    Thank you so much for reminding me again, Ted.

    -- Jim
    On 10/30/07, Ted Dunning wrote:


    There is a slide show on nutch that would be much more clear. I mentioned
    it some time ago. If you go to the hadoop presentations page on the wiki (
    http://wiki.apache.org/lucene-hadoop/HadoopPresentations) then you will be
    able to find one of the slide shows that goes through the nutch MR steps.

    On 10/30/07 10:17 AM, "Jim the Standing Bear" wrote:

    Thanks for jumping in and giving me inputs, Ted. Yes, intuitively it
    is an easy project (we had a conversation a few days back), except
    when it comes to implementation, I am having trouble with the details.

    I tried to look at nutch's source code, but frankly it wasn't trivial.
    I guess I will try again, with what you just said in the emails as a
    guide.

    -- Jim
    On 10/30/07, Ted Dunning wrote:


    When there are no new catalogs to examine, then the main code can exit.

    The easiest way to present this back to the controller is by using the
    counter capability. That what the controller can look at the results of a
    map-reduce step to determine how many new catalogs were found.

    You haven't hit a dead end. This is really a pretty simple program that is
    very similar to what nutch does all the time to crawl web sites.

    On 10/29/07 6:57 PM, "Jim the Standing Bear" wrote:

    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:
    The iteration would take place in your control code (your 'main' method, as
    shown in the examples).

    In order to prevent records from looping infinitely, each iteration would
    need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left.

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------


    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------
  • Stu Hood at Oct 30, 2007 at 2:39 am

    Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?
    I'm pretty sure that there is no way (within the Hadoop framework) for the mapper to send feedback to the job controller, and there is no way for the mapper to stop itself without the job being marked killed/failed. MapReduce is not really strong for that kind of interactive task. If I'm off base here, I'm sure someone will jump in...

    Any jobs you run should be allowed to finish properly, and then it should be up to your control code to start another job (or not, if the control decides that you have performed enough iterations).

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.
    Yea, you will need to have your control code read the output from a job at the end of each iteration to decide whether to start another. See the FileSystem API (specifically the .open(Path) method): http://lucene.apache.org/hadoop/api/org/apache/hadoop/fs/FileSystem.html#open(org.apache.hadoop.fs.Path)

    Definitely not a dead-end... good luck!

    Thanks,
    Stu




    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmail.com>
    Sent: Monday, October 29, 2007 9:57pm
    To: hadoop-user@lucene.apache.org, stuhood@webmail.us
    Subject: Re: can jobs be launched recursively within a mapper ?

    Thanks, Stu... Maybe my mind is way off track - but I still sense a
    problem with the mapper sending feedbacks to the job controller. That
    is, when a mapper has reached the terminal condition, how can it tell
    the job controller to stop?

    If I keep a JobConf object in the mapper, and set a property
    "stop.processing" to true when a mapping task has reached the terminal
    condition, will it cause synchronization problems? There could be
    other mapping tasks that still wish to go on?

    I tried to find a way so that the job controller can open the file in
    the output path at the end of the loop to read the contents; but thus
    far, I haven't seen a way to achieve this.

    Does this mean I have hit a dead-end?

    -- Jim


    On 10/29/07, Stu Hood wrote:
    The iteration would take place in your control code (your 'main' method, as shown in the examples).

    In order to prevent records from looping infinitely, each iteration would need to use a separate output/input directory.

    Thanks,
    Stu


    -----Original Message-----
    From: Jim the Standing Bear <standingbear@gmailcom>
    Sent: Monday, October 29, 2007 5:45pm
    To: hadoop-user@lucene.apache.org
    Subject: Re: can jobs be launched recursively within a mapper ?

    thanks, Owen and David,

    I also thought of making a queue so that I can push catalog names to
    the end of it, while the job control loop keeps removing items off the
    queue until there is no more left

    However, the problem is I don't see how I can do so within the
    map/reduce context. All the code examples are one-shot deals and
    there is no iteration involved.

    Furthermore, what David said made sense, but to avoid infinite loop,
    the code must remove the record it just read from the input file. How
    do I do that using hadoop's fs? or does hadoop take care of it
    automatically?

    -- Jim


    On 10/29/07, David Balatero wrote:
    Aren't these questions a little advanced for a bear to be asking?
    I'll be here all night...

    But seriously, if your job is inherently recursive, one possible way
    to do it would be to make sure that you output in the same format
    that you input. Then you can keep re-reading the outputted file back
    into a new map/reduce job, until you hit some base case and you
    terminate. I've had a main method before that would kick off a bunch
    of jobs in a row -- but I wouldn't really recommend starting another
    map/reduce job in the scope of a running map() or reduce() method.

    - David

    On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:

    then

    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------


    --
    --------------------------------------
    Standing Bear Has Spoken
    --------------------------------------

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-user @
categorieshadoop
postedOct 29, '07 at 8:29p
activeOct 30, '07 at 7:20p
posts19
users9
websitehadoop.apache.org...
irc#hadoop

People

Translate

site design / logo © 2022 Grokbase