Hi
I'm running on Hadoop 0.20.2 and I have a job with the following nature:
* Mapper outputs very large records (50 to 200 MB)
* Reducer (single) merges all those records together
* Map output key is a constant (could be a NullWritable, but currently it's
a LongWritable(1))
* Reducer doesn't care about the keys at all
I'm running into OOMs at the Sort phase. Previously, when I was on Hadoop
0.20.1, I ran into OOMs at the Shuffle phase, but that was due to a bug that
was fixed in MAPREDUCE-1182. The OOMs I'm seeing now are from IFile, line
342 in readNextBlock, where it tries to allocate a byte[] the size of the
key+value output and fails. Inspecting the code, I noticed it depends on
io.file.buffer.size -- it allocates a buffer that large which it attempts to
reuse. However, if the map key+value output's size is large, it allocates
another byte[] of that size, which is devastating.
I'm running with mapred.child.java.opts set to -Xmx4096m, and each machine
in my cluster has 16 GB RAM, as well as it's the only job that's running (at
the moment), so there's definitely a lot of RAM to use.
So I limited the output of the Mappers to be of size <64MB, which is my
io.file.buffer.size and now the job succeeds, but I feel that it's not the
best way to deal with it. First, because someone could easily configure the
job I write (it's a library, so someone can do whatever he wants with it) to
have less io.file.buffer.size, or larger map output, and things will break
again. Second, because I think there must be a better way to deal with it.
Searching around I found several parameters that might be related, but not
sure if they'll help much. Of those are controlling the spill percent,
lowering the size of the largest record that can fit in-memory in the
shuffle phase etc. They feel like voodoo to me, but I may be wrong :).
It'd be best if I can avoid the shuffle and sort phase entirely - they are
not needed for this job. I wrote a *NoSorter implements IndexedSorter* in
hope that it will improve things, but it didn't (it replaces QuickSort by
not doing any sort).
Also, I don't understand why do the values of the Reducer need to be read
into memory, if they most likely (unless you really insist) don't
participate in the sort and merge phase, only the keys matter (again, unless
you provide a Comparator for the values). If it must load stuff into memory,
then loading the keys/values in small chunks of say 64K, is better because
the JVM can really struggle to allocate 100MB of consecutive byte[].
Another thing that would work great is if Hadoop just feed the Reducer as
soon as Mappers' outputs are ready. This job does not care about sorting +
merging the outputs, so I could gain a lot of perf back if the Reducer would
kick in immediately, and not only after all Mappers are done.
I couldn't measure it precisely, but of the 50 minutes it took the Reducer
to finish (Mappers finished in 6 minutes !), more than half was spent in the
copy, shuffle and sort phases, which is insane.
Any help / advise / best practice instructions are greatly appreciated. If
any of these are fixed in a newer Hadoop release I'd love to hear about it
too. I cannot upgrade from 0.20.2 at the moment, but it'd be nice to know
those are already addressed.
Shai