FAQ
Hi,

I am adding support for parallel processing to an existing program
which fetches some data and then performs some computation with
results saved to a database. Everything went just fine until I wanted
to gather all of the results from the subprocesses.

First idea, which came to my mind, was using a queue. I've got many
producers (all of the workers) and one consumer. Seams quite simple,
but it isn't, at least for me. I presumed that each worker will put()
its results to the queue, and finally will close() it, while the
parent process will get() them as long as there is an active
subprocess. So I did this:
from multiprocessing import Process, Queue, active_children

def f(q):
... q.put(1)
... q.close()
...
queue = Queue()
Process(target=f, args=(queue,)).start()
while len(active_children()) > 0:
... print queue.get()
...
1

This (of course?) hangs after first iteration of the loop. Delaying
second iteration by putting a sleep() call fixes the problem, since
the result of active_children is being some kind of refreshed, but
it's not the solution. One could say to iterate the exact number of
subprocesses I have, but let's presume such information isn't
available.

Due to my failure with queues I decided to have a try with pipes, and
again I found a behavior, which is at least surprising and not
obvious. When I use a pipe within a process everything works
perfectly:
from multiprocessing import Pipe
parent, child = Pipe()
child.send(1)
child.close()
parent.recv()
1
child.closed
True
parent.recv()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
EOFError

The problems appear in subprocess communication using pipes, though.
from multiprocessing import Process, Pipe
def f(child):
... child.send(1)
... child.close()
...
parent, child = Pipe()
Process(target=f, args=(child,)).start()
parent.recv()
1
child.closed
False
parent.recv()
... and hangs. No idea of fixing this, not even of a workaround, which
would solve my problem.

Most possibly I'm missing something in philosophy of multiprocessing,
but I couldn't find anything covering such a situation. I'd appreciate
any kind of hint on this topic, as it became a riddle I just have to
solve. :-)

Best regards,
Michal Chruszcz

Search Discussions

  • MRAB at Apr 22, 2009 at 4:33 pm

    Michal Chruszcz wrote:
    Hi,

    I am adding support for parallel processing to an existing program
    which fetches some data and then performs some computation with
    results saved to a database. Everything went just fine until I wanted
    to gather all of the results from the subprocesses.

    First idea, which came to my mind, was using a queue. I've got many
    producers (all of the workers) and one consumer. Seams quite simple,
    but it isn't, at least for me. I presumed that each worker will put()
    its results to the queue, and finally will close() it, while the
    parent process will get() them as long as there is an active
    subprocess. So I did this:
    from multiprocessing import Process, Queue, active_children

    def f(q):
    ... q.put(1)
    ... q.close()
    ...
    queue = Queue()
    Process(target=f, args=(queue,)).start()
    while len(active_children()) > 0:
    ... print queue.get()
    ...
    1

    This (of course?) hangs after first iteration of the loop. Delaying
    second iteration by putting a sleep() call fixes the problem, since
    the result of active_children is being some kind of refreshed, but
    it's not the solution. One could say to iterate the exact number of
    subprocesses I have, but let's presume such information isn't
    available.

    Due to my failure with queues I decided to have a try with pipes, and
    again I found a behavior, which is at least surprising and not
    obvious. When I use a pipe within a process everything works
    perfectly:
    from multiprocessing import Pipe
    parent, child = Pipe()
    child.send(1)
    child.close()
    parent.recv()
    1
    child.closed
    True
    parent.recv()
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    EOFError

    The problems appear in subprocess communication using pipes, though.
    from multiprocessing import Process, Pipe
    def f(child):
    ... child.send(1)
    ... child.close()
    ...
    parent, child = Pipe()
    Process(target=f, args=(child,)).start()
    parent.recv()
    1
    child.closed
    False
    parent.recv()
    ... and hangs. No idea of fixing this, not even of a workaround, which
    would solve my problem.

    Most possibly I'm missing something in philosophy of multiprocessing,
    but I couldn't find anything covering such a situation. I'd appreciate
    any kind of hint on this topic, as it became a riddle I just have to
    solve. :-)
    You could do this:

    from multiprocessing.queues import Empty

    queue = Queue()
    Process(target=f, args=(queue,)).start()
    while active_children():
    try:
    print queue.get(timeout=1)
    except Empty:
    pass
  • Scott David Daniels at Apr 22, 2009 at 8:30 pm

    Michal Chruszcz wrote:
    ... First idea, which came to my mind, was using a queue. I've got many
    producers (all of the workers) and one consumer. Seams quite simple,
    but it isn't, at least for me. I presumed that each worker will put()
    its results to the queue, and finally will close() it, while the
    parent process will get() them as long as there is an active
    subprocess....
    Well, if the protocol for a worker is:
    <someloop>:
    <calculate>
    queue.put(result)
    queue.put(<worker_end_sentinel>)
    queue.close()


    Then you can keep count of how many have finished in the consumer.

    --Scott David Daniels
    Scott.Daniels at Acm.Org
  • Michal Chruszcz at Apr 23, 2009 at 9:15 am

    On Apr 22, 10:30?pm, Scott David Daniels wrote:
    Michal Chruszcz wrote:
    ... First idea, which came to my mind, was using a queue. I've got many
    producers (all of the workers) and one consumer. Seams quite simple,
    but it isn't, at least for me. I presumed that each worker will put()
    its results to the queue, and finally will close() it, while the
    parent process will get() them as long as there is an active
    subprocess....
    Well, if the protocol for a worker is:
    ? ? ?<someloop>:
    ? ? ? ? ? <calculate>
    ? ? ? ? ? queue.put(result)
    ? ? ?queue.put(<worker_end_sentinel>)
    ? ? ?queue.close()

    Then you can keep count of how many have finished in the consumer.
    Yes, I could, but I don't like the idea of using a sentinel, if I
    still need to close the queue. I mean, if I mark queue closed or close
    a connection through a pipe why do I still have to "mark" it closed
    using a sentinel? From my point of view it's a duplication. Thus I
    dare to say multiprocessing module misses something quite important.

    Probably it is possible to retain a pipe state using a multiprocessing
    manager, thus omitting the state exchange duplication, but I haven't
    tried it yet.

    Best regards,
    Michal Chruszcz
  • Jesse Noller at Apr 23, 2009 at 11:18 am

    On Thu, Apr 23, 2009 at 5:15 AM, Michal Chruszcz wrote:
    On Apr 22, 10:30?pm, Scott David Daniels wrote:
    Michal Chruszcz wrote:
    ... First idea, which came to my mind, was using a queue. I've got many
    producers (all of the workers) and one consumer. Seams quite simple,
    but it isn't, at least for me. I presumed that each worker will put()
    its results to the queue, and finally will close() it, while the
    parent process will get() them as long as there is an active
    subprocess....
    Well, if the protocol for a worker is:
    ? ? ?<someloop>:
    ? ? ? ? ? <calculate>
    ? ? ? ? ? queue.put(result)
    ? ? ?queue.put(<worker_end_sentinel>)
    ? ? ?queue.close()

    Then you can keep count of how many have finished in the consumer.
    Yes, I could, but I don't like the idea of using a sentinel, if I
    still need to close the queue. I mean, if I mark queue closed or close
    a connection through a pipe why do I still have to "mark" it closed
    using a sentinel? From my point of view it's a duplication. Thus I
    dare to say multiprocessing module misses something quite important.

    Probably it is possible to retain a pipe state using a multiprocessing
    manager, thus omitting the state exchange duplication, but I haven't
    tried it yet.

    Best regards,
    Michal Chruszcz
    --
    http://mail.python.org/mailman/listinfo/python-list
    Using a sentinel, or looping on get/Empty pattern are both valid, and
    correct suggestions.

    If you think it's a bug, or you want a new feature, post it,
    preferably with a patch, to bugs.python.org. Add me to the +noisy, or
    if you can assign it to me.

    Jesse
  • Michal Chruszcz at Apr 23, 2009 at 8:21 am

    On Apr 22, 6:33?pm, MRAB wrote:
    You could do this:

    ? ? ?from multiprocessing.queues import Empty

    ? ? ?queue = Queue()
    ? ? ?Process(target=f, args=(queue,)).start()
    ? ? ?while active_children():
    ? ? ? ? ?try:
    ? ? ? ? ? ? ?print queue.get(timeout=1)
    ? ? ? ? ?except Empty:
    ? ? ? ? ? ? ?pass
    This one isn't generic. When I have tasks that all finish within 0.1
    seconds the above gives 10x overhead. On the other hand, if I know the
    results will be available after 10 hours there's no use in checking
    every second.

    Best regards,
    Michal Chruszcz
  • MRAB at Apr 23, 2009 at 12:09 pm

    Michal Chruszcz wrote:
    On Apr 22, 6:33 pm, MRAB wrote:
    You could do this:

    from multiprocessing.queues import Empty

    queue = Queue()
    Process(target=f, args=(queue,)).start()
    while active_children():
    try:
    print queue.get(timeout=1)
    except Empty:
    pass
    This one isn't generic. When I have tasks that all finish within 0.1
    seconds the above gives 10x overhead. On the other hand, if I know the
    results will be available after 10 hours there's no use in checking
    every second.
    If there is a result in the queue then it will return immediately; if
    not, then it will wait for up to 1 second for a result to arrive, but if
    none arrives in that time then it will raise an Empty exception. Raising
    only 1 exception every second won't consume a negligible amount of
    processing power (I wouldn't worry about it).
  • Paul Boddie at Apr 23, 2009 at 1:02 pm

    On 22 Apr, 17:43, Michal Chruszcz wrote:
    I am adding support for parallel processing to an existing program
    which fetches some data and then performs some computation with
    results saved to a database. Everything went just fine until I wanted
    to gather all of the results from the subprocesses.
    [Queue example]

    I have to say that I'm not familiar with the multiprocessing API, but
    for this kind of thing, there needs to be some reasonably complicated
    stuff happening in the background to test for readable conditions on
    the underlying pipes or sockets. In the pprocess module [1], I had to
    implement a poll-based framework (probably quite similar to Twisted
    and asyncore) to avoid deadlocks and other undesirable conditions.

    [Pipe example]

    Again, it's really awkward to monitor pipes between processes and to
    have them "go away" when closed. Indeed, I found that you don't really
    want them to disappear before everyone has finished reading from them,
    but Linux (at least) tends to break pipes quite readily. I got round
    this problem by having acknowledgements in pprocess, but it felt like
    a hack.
    Most possibly I'm missing something in philosophy of multiprocessing,
    but I couldn't find anything covering such a situation. I'd appreciate
    any kind of hint on this topic, as it became a riddle I just have to
    solve. :-)
    The multiprocessing module appears to offer map-based conveniences
    (Pool objects) where you indicate that you want the same callable
    executed multiple times and the results to be returned, so perhaps
    this is really what you want. In pprocess, there's a certain amount of
    flexibility exposed in the API, so that you can choose to use a map-
    like function, or you can open everything up and use the
    communications primitives directly (which would appear to be similar
    to the queue-oriented programming mentioned in the multiprocessing
    documentation).

    One thing that pprocess exposes (and which must be there in some form
    in the multiprocessing module) is the notion of an "exchange" which is
    something that monitors a number of communications channels between
    processes and is able to detect and act upon readable channels in an
    efficient way. If it's not the Pool class in multiprocessing that
    supports such things, I'd take a look for the component which does
    support them, if I were you, because this seems to be the
    functionality you need.

    Paul

    [1] http://www.boddie.org.uk/python/pprocess.html

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouppython-list @
categoriespython
postedApr 22, '09 at 3:43p
activeApr 23, '09 at 1:02p
posts8
users5
websitepython.org

People

Translate

site design / logo © 2022 Grokbase