Everybody, thanks for all the help.
Chris/Jason, while 1) assumption is actually incorrect for my situation.
Nonetheless, I can see how one would basically use a dynamic-typing approach
to sending the additional data as a first keys for each partition. It seems
less than elegant but doable.
The solution Harsh J provides seems to be more elegant but I need to spend
some more time understanding exactly what the interplay between paths and
comitting a job using MultipleOutputs is. My first and second passes have
failed but it seems to indeed be committing the output of the map task even
when it is in a map+reduce job. Like most things, I just need to spend some
more time digging through the code at a more detailed level. I will come
back if I have more questions.
Per the final assumption that Harsh J mentions, "the framework guarantees
that the Reduce operation never starts until
all Map tasks have finished". It seems that, while pre-shuffling could be
possible to get ready for running the reduce, the reduce would always have
to wait unless a Partitioner could guarantee that a particular partition was
complete before all maps were completed. Is there talk of this changing
somehow?
Thanks again,
Jacques
On Sun, Feb 13, 2011 at 8:22 PM, Jason wrote:I think this kind of partitioner is a little hackish. More straight forward
approach is to emit the extra data N times under special keys and write a
partitioner that would recognize these keys and dispatch them accordingly
between partitions 0..N-1
Also if this data needs to be shipped to reducers upfront, it could be
easily done using custom sort comparator
Sent from my iPhone
On Feb 13, 2011, at 8:05 PM, Chris Douglas wrote:If these assumptions are correct:
0) Each map outputs one result, a few hundred bytes
1) The map output is deterministic, given an input split index
2) Every reducer must see the result from every map
Then just output the result N times, where N is the number of
reducers, using a custom Partitioner that assigns the result to
(records_seen++ % N), where records_seen is an int field on the
partitioner.
If (1) does not hold, then write the first stage as job with a single
(optional) reduce, and the second stage as a map-only job processing
the result. -C
On Sun, Feb 13, 2011 at 12:18 PM, Jacques wrote:
I'm outputting a small amount of secondary summary information from a
map
task that I want to use in the reduce phase of the job. This
information is
keyed on a custom input split index.
Each map task outputs this summary information (less than hundred bytes
per
input task). Note that the summary information isn't ready until the
completion of the map task.
Each reduce task needs to read this information (for all input splits)
to
complete its task.
What is the best way to pass this information to the Reduce stage? I'm
working on java using cdhb2. Ideas I had include:
1. Output this data to MapContext.getWorkOutputPath(). However, that
data
is not available anywhere in the reduce stage.
2. Output this data to "mapred.output.dir". The problem here is that
the
map task writes immediately to this so failed jobs and speculative
execution
could cause collision issues.
3. Output this data as in (1) and then use Mapper.cleanup() to copy
these
files to "mapred.output.dir". Could work but I'm still a little
concerned
about collision/race issues as I'm not clear about when a Map task
becomes
"the" committed map task for that split.
4. Use an external system to hold this information and then just call
that
system from both phases. This is basically an alternative of #3 and has
the
same issues.
Are there suggested approaches of how to do this?
It seems like (1) might make the most sense if there is a defined way to
stream secondary outputs from all the mappers within the Reduce.setup()
method.
Thanks for any ideas.
Jacques