FAQ

[Python] Suggested generator to add to threading module.

Andrae Muys
Jan 16, 2004 at 4:58 am
Found myself needing serialised access to a shared generator from
multiple threads. Came up with the following

def serialise(gen):
lock = threading.Lock()
while 1:
lock.acquire()
try:
next = gen.next()
finally:
lock.release()
yield next

I considered suggesting it for itertools, but really it's thread
specific so I am suggesting it for the threading module.

Andrae Muys
reply

Search Discussions

18 responses

  • Martin v. Löwis at Jan 16, 2004 at 7:44 am

    Andrae Muys wrote:
    I considered suggesting it for itertools, but really it's thread
    specific so I am suggesting it for the threading module.
    If you are willing to contribute this function, please submit
    a patch to sf.net/projects/python, including documentation changes,
    and test cases.

    Regards,
    Martin
  • Aahz at Jan 16, 2004 at 6:07 pm
    In article <7934d084.0401152058.164a240c at posting.google.com>,
    Andrae Muys wrote:
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    I'm not sure this is generic enough to go in the standard library.
    Usually, I'd recommend that someone wanting this functionality consider
    other options in addition to this (such as using Queue.Queue()).
    --
    Aahz (aahz at pythoncraft.com) <*> http://www.pythoncraft.com/

    A: No.
    Q: Is top-posting okay?
  • Andrae Muys at Jan 17, 2004 at 9:08 am

    Aahz wrote:
    In article <7934d084.0401152058.164a240c at posting.google.com>,
    Andrae Muys wrote:
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next

    I'm not sure this is generic enough to go in the standard library.
    Usually, I'd recommend that someone wanting this functionality consider
    other options in addition to this (such as using Queue.Queue()).
    While I fully appreciate the importance of a Queue.Queue in implementing
    a producer/consumer task relationship, this particular function provides
    serialised access to a *passive* data-stream. With the increasing
    sophistication of itertools and I feel there maybe an argument for
    supporting shared access to a generator.

    Anyway I thought it was worth offering as a possible bridge between the
    itertools and threading modules. If I'm mistaken, then it's no major loss.

    Andrae
  • Andrae Muys at Jan 19, 2004 at 2:08 am
    aahz at pythoncraft.com (Aahz) wrote in message news:<bu998t$o62$1 at panix1.panix.com>...
    In article <7934d084.0401152058.164a240c at posting.google.com>,
    Andrae Muys wrote:
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    I'm not sure this is generic enough to go in the standard library.
    Usually, I'd recommend that someone wanting this functionality consider
    other options in addition to this (such as using Queue.Queue()).
    I'm curious to know how a Queue.Queue() provides the same
    functionality? I have always considered a Queue.Queue() to be an
    inter-thread communcation primitive. serialise() (at least the
    corrected version discussed later in this thread) is strictly a
    synchronisation primitive.

    Andrae
  • Aahz at Jan 19, 2004 at 5:37 am
    In article <7934d084.0401181808.6e698042 at posting.google.com>,
    Andrae Muys wrote:
    aahz at pythoncraft.com (Aahz) wrote in message news:<bu998t$o62$1 at panix1.panix.com>...
    In article <7934d084.0401152058.164a240c at posting.google.com>,
    Andrae Muys wrote:
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    I'm not sure this is generic enough to go in the standard library.
    Usually, I'd recommend that someone wanting this functionality consider
    other options in addition to this (such as using Queue.Queue()).
    I'm curious to know how a Queue.Queue() provides the same
    functionality? I have always considered a Queue.Queue() to be an
    inter-thread communcation primitive. serialise() (at least the
    corrected version discussed later in this thread) is strictly a
    synchronisation primitive.
    Well, yes; Queue.Queue() provides both synchronization *and* data
    protection. In some ways, it's overkill for this specific problem, but
    my experience is that there are so many different ways to approach this
    class of problems and so many ways to screw up threaded applications,
    it's best to learn one swiss-army knife that can handle almost everything
    you want to throw at it.
    --
    Aahz (aahz at pythoncraft.com) <*> http://www.pythoncraft.com/

    A: No.
    Q: Is top-posting okay?
  • Alan Kennedy at Jan 19, 2004 at 12:02 pm
    [Andrae Muys]
    I'm curious to know how a Queue.Queue() provides the same
    functionality? I have always considered a Queue.Queue() to be an
    inter-thread communcation primitive.
    Not exactly.

    Queue.Queue is a *thread-safe* communication primitive: you're not
    required to have seperate threads at both ends of a Queue.Queue, but
    it is guaranteed to work correctly if you do have multiple threads.
    From the module documentation
    """
    The Queue module implements a multi-producer, multi-consumer FIFO
    queue. It is especially useful in threads programming when information
    must be exchanged safely between multiple threads. The Queue class in
    this module implements all the required locking semantics. It depends
    on the availability of thread support in Python.
    """

    http://www.python.org/doc/current/lib/module-Queue.html
    serialise() (at least the
    corrected version discussed later in this thread) is strictly a
    synchronisation primitive.
    Just as Queue.Queue is a synchronisation primitive: a very flexible
    and useful primitive that happens to be usable in a host of different
    scenarios.

    I think I'm with Aahz on this one: when faced with this kind of
    problem, I think it is best to use a tried and tested inter-thread
    communication paradigm, such as Queue.Queue. In this case, Queue.Queue
    fits the problem (which is just a variation of the producer/consumer
    problem) naturally. Also, I doubt very much if there is much excessive
    resource overhead when using Queue.Queues.

    As you've already seen from your first cut of the code, writing
    thread-safe code is an error-prone process, and it's sometimes
    difficult to figure out all the possibile calling combinations when
    multiple threads are involved.

    But if you'd used Queue.Queue, well this whole conversation would
    never have come up, would it ;-)

    regards,

    --
    alan kennedy
    ------------------------------------------------------
    check http headers here: http://xhaus.com/headers
    email alan: http://xhaus.com/contact/alan
  • Ype Kingma at Jan 16, 2004 at 7:42 pm

    Andrae Muys wrote:

    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    Is there any reason why the lock is not shared among threads?
    From the looks of this, it doesn't synchronize anything
    between different threads. Am I missing something?

    Kind regards,
    Ype

    email at xs4all.nl
  • Jeff Epler at Jan 16, 2004 at 8:02 pm

    On Fri, Jan 16, 2004 at 08:42:36PM +0100, Ype Kingma wrote:
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    Is there any reason why the lock is not shared among threads?
    From the looks of this, it doesn't synchronize anything
    between different threads. Am I missing something?
    Yes, I think so. You'd use the same "serialise" generator object in
    multiple threads, like this:

    p = seralise(producer_generator())
    threads = [thread.start_new(worker_thread, (p,))
    for t in range(num_workers)]

    Jeff
  • Alan Kennedy at Jan 16, 2004 at 10:57 pm
    [Andrae Muys]
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    [Ype Kingma]
    Is there any reason why the lock is not shared among threads?
    From the looks of this, it doesn't synchronize anything
    between different threads. Am I missing something?
    [Jeff Epler]
    Yes, I think so. You'd use the same "serialise" generator object in
    multiple threads, like this:

    p = seralise(producer_generator())
    threads = [thread.start_new(worker_thread, (p,))
    for t in range(num_workers)]
    Hmm. I think Ype is right: the above code does not correctly serialise
    access to a generator.

    The above serialise function is a generator which wraps a generator.
    This presumably is in order to prevent the wrapped generators .next()
    method being called simultaneously from multiple threads (which is
    barred: PEP 255: "Restriction: A generator cannot be resumed while it
    is actively running")

    http://www.python.org/peps/pep-0255.html

    However, the above implementation re-creates the problem by using an
    outer generator to wrap the inner one. The outer's .next() method will
    then potentially be called simultaneously by multiple threads. The
    following code illustrates the problem

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import time
    import thread
    import threading

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next

    def squares(n):
    i = 1
    while i < n:
    yield i*i
    i = i+1

    def worker_thread(iter, markers):
    markers[thread.get_ident()] = 1
    results = [] ; clashes = 0
    while 1:
    try:
    results.append(iter.next())
    except StopIteration:
    break
    except ValueError, ve:
    if str(ve) == "generator already executing":
    clashes = clashes + 1
    del markers[thread.get_ident()]
    print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
    len(results), clashes)

    numthreads = 10 ; threadmarkers = {}
    serp = serialise(squares(100))
    threads = [thread.start_new_thread(worker_thread,\
    (serp, threadmarkers)) for t in xrange(numthreads)]
    while len(threadmarkers.keys()) > 0:
    time.sleep(0.1)
    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    I believe that the following definition of serialise will correct the
    problem (IFF I've understood the problem correctly :-)

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import time
    import thread
    import threading

    class serialise:
    "Wrap a generator in an iterator for thread-safe access"

    def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

    def __iter__(self):
    return self

    def next(self):
    self.lock.acquire()
    try:
    return self.gen.next()
    finally:
    self.lock.release()

    def squares(n):
    i = 1
    while i < n:
    yield i*i
    i = i+1

    def worker_thread(iter, markers):
    markers[thread.get_ident()] = 1
    results = [] ; clashes = 0
    while 1:
    try:
    results.append(iter.next())
    except StopIteration:
    break
    except ValueError, ve:
    if str(ve) == "generator already executing":
    clashes = clashes + 1
    del markers[thread.get_ident()]
    print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
    len(results), clashes)

    numthreads = 10 ; threadmarkers = {}
    serp = serialise(squares(100))
    threads = [thread.start_new_thread(worker_thread,\
    (serp, threadmarkers)) for t in xrange(numthreads)]
    while len(threadmarkers.keys()) > 0:
    time.sleep(0.1)
    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    Also, I don't know if I'm happy with relying on the fact that the
    generator raises StopIteration for *every* .next() call after the
    actual generated sequence has ended. The above code depends on the
    exhausted generator raising StopIteration in every thread. This seems
    to me the kind of thing that might be python-implementation specific.
    For example, the original "Simple Generators" specification, PEP 255,
    makes no mention of expected behaviour of generators when multiple
    calls are made to the its .next() method after the iteration is
    exhausted. That I can see anyway? Am I wrong?

    http://www.python.org/peps/pep-0255.html

    regards,

    --
    alan kennedy
    ------------------------------------------------------
    check http headers here: http://xhaus.com/headers
    email alan: http://xhaus.com/contact/alan
  • Ype Kingma at Jan 17, 2004 at 7:37 pm
    Alan,

    you wrote:
    [Andrae Muys]
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    [Ype Kingma]
    Is there any reason why the lock is not shared among threads?
    From the looks of this, it doesn't synchronize anything
    between different threads. Am I missing something?
    [Jeff Epler]
    Yes, I think so. You'd use the same "serialise" generator object in
    multiple threads, like this:

    p = seralise(producer_generator())
    threads = [thread.start_new(worker_thread, (p,))
    for t in range(num_workers)]
    Hmm. I think Ype is right: the above code does not correctly serialise
    access to a generator.
    Well, I just reread PEP 255, and I can assure you a was missing something...
    The above serialise function is a generator which wraps a generator.
    This presumably is in order to prevent the wrapped generators .next()
    method being called simultaneously from multiple threads (which is
    barred: PEP 255: "Restriction: A generator cannot be resumed while it
    is actively running")

    http://www.python.org/peps/pep-0255.html

    However, the above implementation re-creates the problem by using an
    outer generator to wrap the inner one. The outer's .next() method will
    then potentially be called simultaneously by multiple threads. The
    I agree (after rereading the PEP.)
    following code illustrates the problem

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import time
    import thread
    import threading

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next

    def squares(n):
    i = 1
    while i < n:
    yield i*i
    i = i+1

    def worker_thread(iter, markers):
    markers[thread.get_ident()] = 1
    results = [] ; clashes = 0
    while 1:
    try:
    results.append(iter.next())
    except StopIteration:
    break
    except ValueError, ve:
    if str(ve) == "generator already executing":
    clashes = clashes + 1
    del markers[thread.get_ident()]
    print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
    len(results), clashes)

    numthreads = 10 ; threadmarkers = {}
    serp = serialise(squares(100))
    threads = [thread.start_new_thread(worker_thread,\
    (serp, threadmarkers)) for t in xrange(numthreads)]
    while len(threadmarkers.keys()) > 0:
    time.sleep(0.1)
    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    I believe that the following definition of serialise will correct the
    problem (IFF I've understood the problem correctly :-)

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import time
    import thread
    import threading

    class serialise:
    "Wrap a generator in an iterator for thread-safe access"

    def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

    def __iter__(self):
    return self

    def next(self):
    self.lock.acquire()
    try:
    return self.gen.next()
    finally:
    self.lock.release()
    Looks like a candidate for inclusion in a standard library to me.
    def squares(n):
    i = 1
    while i < n:
    yield i*i
    i = i+1

    def worker_thread(iter, markers):
    markers[thread.get_ident()] = 1
    results = [] ; clashes = 0
    while 1:
    try:
    results.append(iter.next())
    except StopIteration:
    break
    except ValueError, ve:
    if str(ve) == "generator already executing":
    clashes = clashes + 1
    del markers[thread.get_ident()]
    print "Thread %5s: %d results: %d clashes." % (thread.get_ident(),\
    len(results), clashes)

    numthreads = 10 ; threadmarkers = {}
    serp = serialise(squares(100))
    threads = [thread.start_new_thread(worker_thread,\
    (serp, threadmarkers)) for t in xrange(numthreads)]
    while len(threadmarkers.keys()) > 0:
    time.sleep(0.1)
    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    Also, I don't know if I'm happy with relying on the fact that the
    generator raises StopIteration for *every* .next() call after the
    actual generated sequence has ended. The above code depends on the
    exhausted generator raising StopIteration in every thread. This seems
    to me the kind of thing that might be python-implementation specific.
    For example, the original "Simple Generators" specification, PEP 255,
    makes no mention of expected behaviour of generators when multiple
    calls are made to the its .next() method after the iteration is
    exhausted. That I can see anyway? Am I wrong?
    Quoting from PEP 234:
    http://www.python.org/peps/pep-0234.html

    "Once a particular iterator object has raised StopIteration, will
    it also raise StopIteration on all subsequent next() calls?
    ...
    Resolution: once StopIteration is raised, calling it.next()
    continues to raise StopIteration."

    Thanks to all for the help,

    Ype
  • Alan Kennedy at Jan 18, 2004 at 4:49 pm
    [Andrae Muys]
    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock()
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next
    [Ype Kingma]
    Is there any reason why the lock is not shared among threads?
    From the looks of this, it doesn't synchronize anything
    between different threads. Am I missing something?
    [Jeff Epler]
    Yes, I think so. You'd use the same "serialise" generator object in
    multiple threads, like this:

    p = seralise(producer_generator())
    threads = [thread.start_new(worker_thread, (p,))
    for t in range(num_workers)]
    [Alan Kennedy]
    Hmm. I think Ype is right: the above code does not correctly serialise
    access to a generator.
    [Ype Kingma]
    Well, I just reread PEP 255, and I can assure you a was missing
    something...
    Ype,

    Ah: I see now. You thought it didn't work, but for a different reason
    than the one I pointed out. You thought that the lock was not shared
    between threads, though as Jeff pointed out, it is if you use it the
    right way.

    But it still doesn't work.

    [Alan Kennedy]
    I believe that the following definition of serialise will correct the
    problem (IFF I've understood the problem correctly :-)

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import time
    import thread
    import threading

    class serialise:
    "Wrap a generator in an iterator for thread-safe access"

    def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

    def __iter__(self):
    return self

    def next(self):
    self.lock.acquire()
    try:
    return self.gen.next()
    finally:
    self.lock.release()
    [Ype Kingma]
    Looks like a candidate for inclusion in a standard library to me.
    Well, maybe :-)

    To be honest, I don't have the time to write test cases, docs and
    patches. So I think I'll just leave it for people to find in the
    Google Groups archives ...

    [Alan Kennedy]
    Also, I don't know if I'm happy with relying on the fact that the
    generator raises StopIteration for *every* .next() call after the
    actual generated sequence has ended. The above code depends on the
    exhausted generator raising StopIteration in every thread. This seems
    to me the kind of thing that might be python-implementation specific.
    For example, the original "Simple Generators" specification, PEP 255,
    makes no mention of expected behaviour of generators when multiple
    calls are made to the its .next() method after the iteration is
    exhausted. That I can see anyway? Am I wrong?
    [Ype Kingma]
    Quoting from PEP 234:
    http://www.python.org/peps/pep-0234.html

    "Once a particular iterator object has raised StopIteration, will
    it also raise StopIteration on all subsequent next() calls?
    ...
    Resolution: once StopIteration is raised, calling it.next()
    continues to raise StopIteration."
    Yes, that clears the issue up nicely. Thanks for pointing that out.

    So the same code will run correctly in Jython 2.3 and IronPython
    (awaited with anticipation).

    regards,

    --
    alan kennedy
    ------------------------------------------------------
    check http headers here: http://xhaus.com/headers
    email alan: http://xhaus.com/contact/alan
  • Andrae Muys at Jan 19, 2004 at 2:05 am
    [Subject line changed to allow thread to be found more easily in
    google-groups]

    Alan Kennedy <alanmk at hotmail.com> wrote in message news:[Alan Kennedy]
    I believe that the following definition of serialise will correct the
    problem (IFF I've understood the problem correctly :-)
    It does look like the following version will work, I was too focused
    on synchronising the underlying generator, and forgot that my code
    also needed to be re-entrant. Thanks for catching my mistake.
    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import time
    import thread
    import threading

    class serialise:
    "Wrap a generator in an iterator for thread-safe access"

    def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

    def __iter__(self):
    return self

    def next(self):
    self.lock.acquire()
    try:
    return self.gen.next()
    finally:
    self.lock.release()
    [Ype Kingma]
    Looks like a candidate for inclusion in a standard library to me.
    Well, maybe :-)

    To be honest, I don't have the time to write test cases, docs and
    patches. So I think I'll just leave it for people to find in the
    Google Groups archives ...
    Andrae Muys
  • Alan Kennedy at Jan 21, 2004 at 8:53 pm
    [Andrae Muys]
    Moved to email for higher bandwidth. Feel free to quote to usenet if
    you desire.
    [Alan Kennedy]
    I think I'm with Aahz on this one: when faced with this kind of
    problem, I think it is best to use a tried and tested inter-thread
    communication paradigm, such as Queue.Queue. In this case, Queue.Queue
    fits the problem (which is just a variation of the producer/consumer
    problem) naturally. Also, I doubt very much if there is much excessive
    resource overhead when using Queue.Queues.
    [Andrae Muys]
    Well I'm puzzled, because I couldn't see an easy way to use Queue.Queue
    to achieve this because this isn't a strict producer/consumer problem.
    I am trying to synchronise multiple consumers, but I don't have a
    producer. So the only way I can see to use Queue.Queue to achieve
    this is to spawn a thread specifically to convert the iterator in to
    a producer.
    Andrae,

    I thought it best to continue this discussion on UseNet, to perhaps
    get more opinions.

    Yes, you're right. Using a Queue in this situation does require the
    use of a dedicated thread for the producer. There is no way to "pull"
    values from a generator to multiple consumers through a Queue.Queue.
    The values have to be "pushed" onto the Queue.Queue by some producing
    thread of execution.

    The way I see it, the options are

    Option 1. Spawn a separate thread to execute the producing generator.
    However, this has problems:-

    A: How do the threads recognise the end of the generated sequence?
    This is not a simple problem: the Queue simply being empty does not
    necessarily signify the end of the sequence (e.g., the producer thread
    might not be getting its fair share of CPU time).

    B: The Queue acts as a (potentially infinite) buffer for the generated
    values, thus eliminating one of the primary benefits of generators:
    their efficient "generate when required" nature. This can be helped
    somewhat by limiting the number of entries in the Queue, but it is
    still slightly unsatisfactory.

    C: A thread of execution has to be dedicated to the producer, thus
    consuming resources.

    Option 2. Fill the Queue with values from a main thread which executes
    the generator to exhaustion. The consuming threads simply peel values
    from the Queue. Although this saves on thread overhead, it is the
    least desirable in terms of memory overhead: the number of values
    generated by the generator and buffered in the Queue could be very
    large.

    Option 3. Use the same paradigm as your original paradigm, i.e. there
    is no producer thread and the consuming threads are themselves
    responsible for calling the generator.next() method: access to this
    method is synchronised on a threading.Lock. I really like this
    solution, because values are only generated on demand, with no
    temporary storage of values required.

    I think that an ideal solution would be to create a dedicated class
    for synchronising a generator, as my example did, BUT to implement the
    same interface as Queue.Queue, so that client code would remain
    ignorant that it was dealing with a generator.

    Here is my version of such a beast

    # -=-=-=-=-= file GenQueue.py =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
    import threading

    class Empty(Exception): pass
    class Exhausted(StopIteration): pass
    class IllegalOperation(Exception): pass

    class GenQueue:
    "Simulate a Queue.Queue, with values produced from a generator"

    def __init__(self, gen):
    self.lock = threading.Lock()
    self.gen = gen

    def __iter__(self):
    return self

    def _get(self, block=1):
    if self.lock.acquire(block):
    try:
    try:
    return self.gen.next()
    except StopIteration:
    raise Exhausted
    finally:
    self.lock.release()
    else:
    raise Empty

    def next(self):
    return self._get(1)

    def get(self, block=1):
    return self._get(block)

    def get_nowait(self):
    return self._get(0)

    def put(self, item, block=1):
    raise IllegalOperation

    def put_nowait(self, item):
    self.put(item, 0)

    def full(self):
    return False

    def empty(self):
    return False

    def qsize(self):
    return 1j

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    And here is some code that tests it

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    import sys
    import time
    import thread
    import GenQueue

    def squares(n):
    i = 1
    while i <= n:
    yield i*i
    i = i+1

    def test_blockget(results, queue):
    while 1:
    try:
    results.append(queue.get())
    except GenQueue.Empty:
    raise TestFailure
    except GenQueue.Exhausted:
    break

    def test_iter(results, queue):
    for v in queue:
    results.append(v)

    def test_noblockget(results, queue):
    while 1:
    try:
    results.append(queue.get_nowait())
    except GenQueue.Empty:
    pass
    except GenQueue.Exhausted:
    break

    def threadwrap(func, queue, markers):
    markers[thread.get_ident()] = 1
    results = []
    func(results, queue)
    print "Test %s: Thread %5s: %d results." % (func.__name__, \
    thread.get_ident(), len(results))
    del markers[thread.get_ident()]

    def test():
    numthreads = 10
    for tfunc in (test_blockget, test_iter, test_noblockget):
    print "Test: %s ------------------------------->" % tfunc.__name__
    threadmarkers = {}
    q = GenQueue.GenQueue(squares(100))
    threads = [thread.start_new_thread(threadwrap,\
    (tfunc, q, threadmarkers)) for t in
    xrange(numthreads)]
    while len(threadmarkers.keys()) > 0:
    time.sleep(0.1)

    if __name__ == "__main__":
    test()

    #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

    I find the combination of the iteration protocol and a Queue
    intriguing: in this case, Queue.get() and iter.next() mean the same
    thing. Or maybe I'm just being sucked in by the syntactic niceties of
    something like

    def worker(inq, outq):
    for thing in inq: outq.put(thing.work())

    I'm interested to hear other opinions about the commonalities and
    differences between Queues and iterators.

    One problem that is always in the back of my mind these days is how
    one could write a dispatch-based coroutine scheduler that would work
    efficiently when in communication (through Queue.Queues?) with
    independently executing coroutine schedulers running on other
    processors in the box. (And before you jump in shouting "Global
    Interpreter Lock!", remember jython + generators will be able to do
    this).

    Not that I need such a thing: it's just a fun thing to think about,
    like crosswords :-)

    cheers,

    --
    alan kennedy
    ------------------------------------------------------
    check http headers here: http://xhaus.com/headers
    email alan: http://xhaus.com/contact/alan
  • Josiah Carlson at Jan 21, 2004 at 9:17 pm
    Even easier:

    Q = Queue.Queue()
    Q.put(gen)
    def thread():
    a = Q.get()
    use = a.next()
    Q.put(a)
    #do whatever you need

    Of course you could just as easily use a single lock and a class:

    class lockedgen:
    def __init__(self, gen):
    self.g = gen
    self.l = threading.Lock()
    def get(self):
    self.l.acquire()
    a = self.g.next()
    self.l.release()
    return a

    generator = lockedgen(gen)
    def thread():
    use = generator.get()
    #do what you need


    - Josiah
  • Aahz at Jan 22, 2004 at 12:53 am
    In article <20040121131300.D8E9.JCARLSON at uci.edu>,
    Josiah Carlson wrote:
    Of course you could just as easily use a single lock and a class:

    class lockedgen:
    def __init__(self, gen):
    self.g = gen
    self.l = threading.Lock()
    def get(self):
    self.l.acquire()
    a = self.g.next()
    self.l.release()
    return a
    This is one case where you *DEFINITELY* want to use try/finally:

    class lockedgen:
    def __init__(self, gen):
    self.g = gen
    self.l = threading.Lock()
    def get(self):
    self.l.acquire()
    try:
    a = self.g.next()
    finally:
    self.l.release()
    return a
    --
    Aahz (aahz at pythoncraft.com) <*> http://www.pythoncraft.com/

    A: No.
    Q: Is top-posting okay?
  • Aahz at Jan 22, 2004 at 1:01 am
    In article <400EE6B5.27C6B694 at hotmail.com>,
    Alan Kennedy wrote:
    Yes, you're right. Using a Queue in this situation does require the
    use of a dedicated thread for the producer. There is no way to "pull"
    values from a generator to multiple consumers through a Queue.Queue.
    The values have to be "pushed" onto the Queue.Queue by some producing
    thread of execution. Correct.
    The way I see it, the options are

    Option 1. Spawn a separate thread to execute the producing generator.
    However, this has problems:-

    A: How do the threads recognise the end of the generated sequence?
    This is not a simple problem: the Queue simply being empty does not
    necessarily signify the end of the sequence (e.g., the producer thread
    might not be getting its fair share of CPU time).

    B: The Queue acts as a (potentially infinite) buffer for the generated
    values, thus eliminating one of the primary benefits of generators:
    their efficient "generate when required" nature. This can be helped
    somewhat by limiting the number of entries in the Queue, but it is
    still slightly unsatisfactory.

    C: A thread of execution has to be dedicated to the producer, thus
    consuming resources.
    There are a number of ways of mitigating A and B; they mostly involve
    using an extra Queue.Queue to send tokens to the generator thread when a
    consumer wants data. The generator thread then sends back a token that
    (among other things) contains an attribute specifically for notifying
    the consumer that the generator is exhausted. See
    http://www.pythoncraft.com/OSCON2001/ThreadPoolSpider.py
    and
    http://www.pythoncraft.com/OSCON2001/FibThreaded.py
    for examples that show the technique, though they're not directly
    relevant to this case.

    My point is that I haven't (yet) seen many good use cases for sharing a
    generator between threads, and I'm guessing that many people will try
    using generators inappropriately for problems that really are better
    suited to Queue.Queue.
    --
    Aahz (aahz at pythoncraft.com) <*> http://www.pythoncraft.com/

    A: No.
    Q: Is top-posting okay?


    From http Thu Jan 22 02:35:01 2004
    From: http (Paul Rubin)
    Date: 21 Jan 2004 17:35:01 -0800
    Subject: Secure Voting software
    References: <20040121174434.28446.00000428@mb-m15.aol.com>
    Message-ID: <7xvfn4lq9m.fsf@ruckus.brouhaha.com>

    piedmontbiz at aol.com (PiedmontBiz) writes:
    What things must I keep in mind when I design a python application to be
    secure?

    Since python is developed using C, can python be free from the
    buffer overrun problems which plague other C programs?
    Buffer overruns are just one narrow type of security failure.
    Security is really a hard subject and even systems built by experts
    often have security holes. There are various books written on how to
    write secure software, and also some HOWTO's. For systems like voting
    machines, there are a lot of non-software issues you have to deal with too.

    The book "Security Engineering" by Ross Anderson is a good place to start
    reading if you're interested in the subject.
  • Jim Jewett at Jan 22, 2004 at 4:00 pm
    aahz at pythoncraft.com (Aahz) wrote in message news:<bun7cn$le7$1 at panix1.panix.com>...
    My point is that I haven't (yet) seen many good use cases for sharing a
    generator between threads, and I'm guessing that many people will try
    using generators inappropriately for problems that really are better
    suited to Queue.Queue.
    A globally unique ID, such as:

    "What filename should I store this page under?"

    The standard library has (several versions of) similar functionality
    for temporary filenames. They aren't all threadsafe, they often
    enforce the "temporary" aspect, they run into hashing collision
    problems eventually, there is no good way to include even approximate
    ordering information, etc...

    The fact that these are in the standard library suggests that it is a
    common use case. The fact that there are several different versions
    each with their own problems suggests that the problem is hard enough
    to justify putting a good solution in the library.

    --

    -jJ Take only memories. Leave not even footprints.
  • Aahz at Jan 31, 2004 at 2:00 am
    In article <cab22418.0401220800.1d5f8594 at posting.google.com>,
    Jim Jewett wrote:
    aahz at pythoncraft.com (Aahz) wrote in message news:<bun7cn$le7$1 at panix1.panix.com>...
    My point is that I haven't (yet) seen many good use cases for sharing a
    generator between threads, and I'm guessing that many people will try
    using generators inappropriately for problems that really are better
    suited to Queue.Queue.
    A globally unique ID, such as:

    "What filename should I store this page under?"

    The standard library has (several versions of) similar functionality
    for temporary filenames. They aren't all threadsafe, they often
    enforce the "temporary" aspect, they run into hashing collision
    problems eventually, there is no good way to include even approximate
    ordering information, etc...

    The fact that these are in the standard library suggests that it is a
    common use case. The fact that there are several different versions
    each with their own problems suggests that the problem is hard enough
    to justify putting a good solution in the library.
    You've got a good point. All right, I suggest you subscribe to
    python-dev (if you're not) and bring it up there so we can hash out
    which location would be best for this functionality.
    --
    Aahz (aahz at pythoncraft.com) <*> http://www.pythoncraft.com/

    "The joy of coding Python should be in seeing short, concise, readable
    classes that express a lot of action in a small amount of clear code --
    not in reams of trivial code that bores the reader to death." --GvR

Related Discussions