FAQ
Refactor reduce shuffle code
----------------------------

Key: HADOOP-5223
URL: https://issues.apache.org/jira/browse/HADOOP-5223
Project: Hadoop Core
Issue Type: Improvement
Components: mapred
Reporter: Owen O'Malley
Assignee: Owen O'Malley
Fix For: 0.21.0


The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.

--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Search Discussions

  • Owen O'Malley (JIRA) at Feb 11, 2009 at 6:41 pm
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12672713#action_12672713 ]

    Owen O'Malley commented on HADOOP-5223:
    ---------------------------------------

    Roughly, I think the flow should look like:

    EventFetcher -> HostPlanner -> FetcherPool -> OutputMerger

    There is also a main shuffle object that tracks the progress of the shuffle. Each of these should be a separate class. The EventFetcher gets the map completion events from the TaskTracker. The HostPlanner will keep track of available map outputs, penalty box, and hands out hosts that are ready to the fetchers. The FetcherPool is pool of threads that are doing the actual copy of data. The OutputMerger manages the in memory and on disk data and has a thread to do merges.

    We'll post a patch with the api soon.
    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.
  • Owen O'Malley (JIRA) at Feb 11, 2009 at 6:47 pm
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12672716#action_12672716 ]

    Owen O'Malley commented on HADOOP-5223:
    ---------------------------------------

    We also plan to incorporate HADOOP-1338 into this patch, where the fetchers will request a list of map outputs and the map outputs look like:

    <vint id length> <task id>
    <vint compressed size>
    <vint raw size>
    <map data>
    ... repeated for each map.
    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.
  • Devaraj Das (JIRA) at Feb 11, 2009 at 8:03 pm
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12672749#action_12672749 ]

    Devaraj Das commented on HADOOP-5223:
    -------------------------------------

    This looks nice. One other thing that probably should be added is the failure handling (failed-fetch notifications, etc. on getting a CopyResult with an error).
    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.
  • Arun C Murthy (JIRA) at Feb 11, 2009 at 10:34 pm
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

    Arun C Murthy updated HADOOP-5223:
    ----------------------------------

    Attachment: HADOOP-5233_api.patch

    Here is a the rough draft of the api we've been working on...
    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0

    Attachments: HADOOP-5233_api.patch


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.
  • Arun C Murthy (JIRA) at Feb 12, 2009 at 9:06 am
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

    Arun C Murthy updated HADOOP-5223:
    ----------------------------------

    Attachment: HADOOP-5233_part0.patch

    Another puzzle piece, includes a completed Fetcher alongwith some minor changes to the api and a newly added 'ShuffleContext' to pass around...
    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0

    Attachments: HADOOP-5233_api.patch, HADOOP-5233_part0.patch


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.
  • Arun C Murthy (JIRA) at Feb 12, 2009 at 9:10 am
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12672923#action_12672923 ]

    Arun C Murthy commented on HADOOP-5223:
    ---------------------------------------

    Forgot to add that we felt it's better to move the responsibility of tracking the shuffle into a new ShuffleScheduler; the Fetcher directly informs the ShuffleScheduler about the success/failure of each map-output without going through another queue of CopyResult as-is today.
    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0

    Attachments: HADOOP-5233_api.patch, HADOOP-5233_part0.patch


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.
  • Scott Carey (JIRA) at Jun 10, 2009 at 6:07 pm
    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12718153#action_12718153 ]

    Scott Carey commented on HADOOP-5223:
    -------------------------------------

    I've had issues with shuffle on 0.19, on a cluster with new hardware capable of running 13+ maps with 11+ reduces concurrently (dual quad core w/ hyperthreading = 16 hardware threads, 24GB RAM, 4 drives), shuffle is always my bottleneck on any job where the maps aren't huge or they condense data down significantly before the reduce. During this bottleneck, disk, network, and CPU are calm. I collected quite a few trhead dumps in this state on many different jobs. Increasing parallel copies and tasktracker http threads had no effect. For the most part, the thread dumps always had the shuffle fetch threads idle, and the main thread here:

    "main" prio=10 tid=0x000000005eed3800 nid=0x62a2 waiting on condition [0x0000000040eb4000]
    java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.getMapCompletionEvents(ReduceTask.java:2244)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:1716)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:368)
    at org.apache.hadoop.mapred.Child.main(Child.java:158)


    There is a one line fix. This JIRA is refacoring the code that this fix would apply to however, so i'll post my info here to make sure the improvements are contained in it.
    The fix improves times by 40% on a composite set of jobs (a cascading flow consisting of 25+ map/reduce jobs, with up to 7 concurrent ones).

    First, the fix I made is the below:

    Comment out or delete the line:
    {code} break; //we have a map from this host {code}
    in ReduceOutput.java in ReduceCopier.fetchOutputs()
    -- line 1804 in 0.19.2-dev, 1911 on 0.20.1-dev and line 1954 currently on trunk.

    Reduces that took 1 to 2 minutes while copying little data in the shuffle phase on average on my set of jobs now take 1 to 3 seconds on average in the shuffle.

    Here is the problem. The shuffle is currently limiting itself to only copy one shard from a single host per pass, and then sleeping. As long as the number of map shards is much more than the number of hosts, this requires quite a few sleep delays. For servers that can handle many map and reduce tasks each, this gets out of hand quickly, especially on small or medium sized clusters where the ideal concurrent shuffle copies per reduce is on the order of, or larger than, the number of hosts.

    A more sophisticated fix such as this JIRA will do more, but the low hanging fruit performance fix is to get every shard that is reported from the last ping before sleeping again and checking for more. This not only improves the shuffle speed, but reduces the total number of pings to find out what shards are available which reduces load elsewhere. It makes little sense to do what happens now on a small cluster:
    Discover say, 100 shards are needed to be fetched, grab 8 of them, then sleep, ask the again what are available, grab only 8, sleep ...
    At the very least, if there are 100 map outputs available to a reducer, it should keep draining from this list before sleeping and asking for an updated set.

    Some may object to opening more than one concurrent connection to a host on the basis that it could overload a tasktracker -- but this would seem like a false assumption to me. First, tasktrackers throttle this with the configuration parameter for number http threads. Second, reduces throttle this with the number of concurrent shuffle fetch threads. There is no difference between a reduce opening 10 concurrent shuffle threads to 10 hosts and 10 to one host, when all reduces are concurrently doing this and randomly choosing hosts the average number of concurrent connections on one TT will remain the same.
    If it is a serious concern for other reasons (the 'penalty box'? or other error handling?) then the shuffle queue could be filled in a better order than one host at a time, or at least not sleep and re-fetch the list without first draining it. A more significant refactor may do better than the one liner -- but I suspect this alone is most of the performance gain.


    Here is a sample log before and after the change on 0.19 with a small dev cluster with newer hardware -- a particularly bad case for this:
    3 TT's, each configured for 13 concurrent maps, 11 concurrent reduces, 10 concurrent shuffle copies, 40 TT http threads:

    Before: {code}
    2009-06-09 22:13:53,657 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Need another 51 map output(s) where 0 is already in progress
    2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082206_0006_m_000050_0'
    2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 51 new map-outputs
    2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 1 obsolete map-outputs from tasktracker
    2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts)
    2009-06-09 22:13:53,689 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 54394 bytes (54398 raw bytes) into RAM from attempt_200906082206_0006_m_000014_0
    2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 70736 bytes (70740 raw bytes) into RAM from attempt_200906082206_0006_m_000003_0
    2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 73540 bytes (73544 raw bytes) into RAM from attempt_200906082206_0006_m_000001_0
    2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 54394 bytes from map-output for attempt_200906082206_0006_m_000014_0
    2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 70736 bytes from map-output for attempt_200906082206_0006_m_000003_0
    2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 73540 bytes from map-output for attempt_200906082206_0006_m_000001_0
    2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000014_0 -> (21, 205) from 10.3.0.142
    2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000003_0 -> (21, 240) from 10.3.0.143
    2009-06-09 22:13:53,693 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000001_0 -> (21, 204) from 10.3.0.141
    2009-06-09 22:13:55,662 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts)

    -- SNIP --

    2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Read 79913 bytes from map-output for attempt_200906082206_0006_m_000042_0
    2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000042_0 -> (21, 237) from 10.3.0.141
    2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
    2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
    2009-06-09 22:14:50,752 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 51 files left.
    2009-06-09 22:14:50,813 INFO org.apache.hadoop.mapred.Merger: Merging 51 sorted segments
    2009-06-09 22:14:50,817 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 51 segments left of total size: 3325252 bytes
    {code}

    After -- (slightly different job): {code}
    2009-06-08 23:51:07,057 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Thread waiting: Thread for merging on-disk files
    2009-06-08 23:51:07,058 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Need another 68 map output(s) where 0 is already in progress
    2009-06-08 23:51:07,069 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000060_1'
    2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000014_0'
    2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 68 new map-outputs
    2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 2 obsolete map-outputs from tasktracker
    2009-06-08 23:51:07,071 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Scheduled 68 outputs (0 slow hosts and0 dup hosts)
    2009-06-08 23:51:07,106 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 674904 bytes (674908 raw bytes) into RAM from attempt_200906082336_0014_m_000005_0
    2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Read 674904 bytes from map-output for attempt_200906082336_0014_m_000005_0
    2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000005_0 -> (61, 26) from 10.3.0.143

    -- SNIP --

    2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Read 1439739 bytes from map-output for attempt_200906082336_0014_m_000012_1
    2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000012_1 -> (50, 25) from 10.3.0.141
    2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
    2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
    2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 68 files left.
    2009-06-08 23:51:09,122 INFO org.apache.hadoop.mapred.Merger: Merging 68 sorted segments
    2009-06-08 23:51:09,126 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 52 segments left of total size: 44450255 bytes
    {code}

    Refactor reduce shuffle code
    ----------------------------

    Key: HADOOP-5223
    URL: https://issues.apache.org/jira/browse/HADOOP-5223
    Project: Hadoop Core
    Issue Type: Improvement
    Components: mapred
    Reporter: Owen O'Malley
    Assignee: Owen O'Malley
    Fix For: 0.21.0

    Attachments: HADOOP-5233_api.patch, HADOOP-5233_part0.patch


    The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.
    --
    This message is automatically generated by JIRA.
    -
    You can reply to this email to add a comment to the issue online.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommon-dev @
categorieshadoop
postedFeb 11, '09 at 6:35p
activeJun 10, '09 at 6:07p
posts8
users1
websitehadoop.apache.org...
irc#hadoop

1 user in discussion

Scott Carey (JIRA): 8 posts

People

Translate

site design / logo © 2022 Grokbase