FAQ
i'm getting the wrong output for the 'title' attributes for this
data. the queue holds a data structure (item name, position, and list
to store results in). each thread takes in an item name and queries a
database for various attributes. from the debug statements the item
names are being retrieved correctly, but the attributes returned are
those of other items in the queue - not its own item. however, the
model variable is not a global variable... so i'm not sure what's
wrong.

i've declared a bunch of worker threads (100) and a queue into which
new requests are inserted, like so:

queue = Queue.Queue(0)
WORKERS0
for i in range(WORKERS):
thread = SDBThread(queue)
thread.setDaemon(True)
thread.start()

the thread:

class SimpleDBThread ( threading.Thread ):
def __init__ ( self, queue ):
self.__queue = queue
threading.Thread.__init__ ( self )
def run ( self ):
while 1:
item = self.__queue.get()
if item!=None:
model = domain.get_item(item[0])
logger.debug('sdbthread item:'+item[0])
title = model['title']
scraped = model['scraped']
logger.debug("sdbthread title:"+title)

any suggestions?
thanks

Search Discussions

  • Castironpi at May 8, 2008 at 11:54 pm

    On May 8, 5:45?pm, skunkwerk wrote:
    i'm getting the wrong output for the 'title' attributes for this
    data. ?the queue holds a data structure (item name, position, and list
    to store results in). ?each thread takes in an item name and queries a
    database for various attributes. ?from the debug statements the item
    names are being retrieved correctly, but the attributes returned are
    those of other items in the queue - not its own item. ?however, the
    model variable is not a global variable... so i'm not sure what's
    wrong.

    i've declared a bunch of worker threads (100) and a queue into which
    new requests are inserted, like so:

    queue = Queue.Queue(0)
    ?WORKERS0
    for i in range(WORKERS):
    ? ? ? ? thread = SDBThread(queue)
    ? ? ? ? thread.setDaemon(True)
    ? ? ? ? thread.start()

    the thread:

    class SimpleDBThread ( threading.Thread ):
    ? ?def __init__ ( self, queue ):
    ? ? ? ? ? ? ? ? self.__queue = queue
    ? ? ? ? ? ? ? ? threading.Thread.__init__ ( self )
    ? ?def run ( self ):
    ? ? ? ? ? ? ? ? while 1:
    ? ? ? ? ? ? ? ? ? ? ? ? item = self.__queue.get()
    ? ? ? ? ? ? ? ? ? ? ? ? if item!=None:
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? model = domain.get_item(item[0])
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? logger.debug('sdbthread item:'+item[0])
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? title = model['title']
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? scraped = model['scraped']
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? logger.debug("sdbthread title:"+title)

    any suggestions?
    thanks
    I'll base this on terminology: if a model is in a brain (throughout
    the known universe), and a dollar's a dollar, it may not pay to build
    a computer out of brains.

    If man arises as a tool-carrier, we will carry tools, not people.
    Don't use Python to make people; make money, and not too much. Pick a
    wage and you might get somewhere.
  • Skunkwerk at May 9, 2008 at 4:30 am

    On May 8, 4:54 pm, castiro... at gmail.com wrote:
    On May 8, 5:45 pm, skunkwerk wrote:


    i'm getting the wrong output for the 'title' attributes for this
    data. the queue holds a data structure (item name, position, and list
    to store results in). each thread takes in an item name and queries a
    database for various attributes. from the debug statements the item
    names are being retrieved correctly, but the attributes returned are
    those of other items in the queue - not its own item. however, the
    model variable is not a global variable... so i'm not sure what's
    wrong.
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    queue = Queue.Queue(0)
    WORKERS0
    for i in range(WORKERS):
    thread = SDBThread(queue)
    thread.setDaemon(True)
    thread.start()
    the thread:
    class SimpleDBThread ( threading.Thread ):
    def __init__ ( self, queue ):
    self.__queue = queue
    threading.Thread.__init__ ( self )
    def run ( self ):
    while 1:
    item = self.__queue.get()
    if item!=None:
    model = domain.get_item(item[0])
    logger.debug('sdbthread item:'+item[0])
    title = model['title']
    scraped = model['scraped']
    logger.debug("sdbthread title:"+title)
    any suggestions?
    thanks
    I'll base this on terminology: if a model is in a brain (throughout
    the known universe), and a dollar's a dollar, it may not pay to build
    a computer out of brains.

    If man arises as a tool-carrier, we will carry tools, not people.
    Don't use Python to make people; make money, and not too much. Pick a
    wage and you might get somewhere.
    excuse me?
  • Gabriel Genellina at May 9, 2008 at 5:42 am
    En Fri, 09 May 2008 01:30:32 -0300, skunkwerk <skunkwerk at gmail.com>
    escribi?:
    On May 8, 4:54 pm, castiro... at gmail.com wrote:
    On May 8, 5:45 pm, skunkwerk wrote:


    i'm getting the wrong output for the 'title' attributes for this
    data. the queue holds a data structure (item name, position, and list
    to store results in). each thread takes in an item name and queries a
    database for various attributes. from the debug statements the item
    names are being retrieved correctly, but the attributes returned are
    those of other items in the queue - not its own item. however, the
    model variable is not a global variable... so i'm not sure what's
    wrong.
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    class SimpleDBThread ( threading.Thread ):
    def __init__ ( self, queue ):
    self.__queue = queue
    threading.Thread.__init__ ( self )
    def run ( self ):
    while 1:
    item = self.__queue.get()
    if item!=None:
    model = domain.get_item(item[0])
    logger.debug('sdbthread
    item:'+item[0])
    title = model['title']
    scraped = model['scraped']
    logger.debug("sdbthread title:"+title)
    any suggestions?
    If man arises as a tool-carrier, we will carry tools, not people.
    Don't use Python to make people; make money, and not too much. Pick a
    wage and you might get somewhere.
    excuse me?
    (Please forgive our local pet)

    is "item" a list? Perhaps the *same* list as other requests? Be careful
    when you put mutable objects in a queue.
    And what about domain.get_item? is it a thread safe operation?
    You said the model is not a global - but is it a *different* object for
    each request?

    --
    Gabriel Genellina
  • John Nagle at May 9, 2008 at 7:12 am

    skunkwerk wrote:
    i'm getting the wrong output for the 'title' attributes for this
    data. the queue holds a data structure (item name, position, and list
    to store results in). each thread takes in an item name and queries a
    database for various attributes. from the debug statements the item
    names are being retrieved correctly, but the attributes returned are
    those of other items in the queue - not its own item. however, the
    model variable is not a global variable... so i'm not sure what's
    wrong.

    i've declared a bunch of worker threads (100) and a queue into which
    new requests are inserted, like so:

    queue = Queue.Queue(0)
    WORKERS0
    for i in range(WORKERS):
    thread = SDBThread(queue)
    thread.setDaemon(True)
    thread.start()

    the thread:

    class SimpleDBThread ( threading.Thread ):
    def __init__ ( self, queue ):
    self.__queue = queue
    threading.Thread.__init__ ( self )
    def run ( self ):
    while 1:
    item = self.__queue.get()
    if item!=None:
    model = domain.get_item(item[0])
    logger.debug('sdbthread item:'+item[0])
    title = model['title']
    scraped = model['scraped']
    logger.debug("sdbthread title:"+title)

    any suggestions?
    thanks
    Hm. We don't have enough code here to see what's wrong.
    For one thing, we're not seeing how items get put on the queue. The
    trouble might be at the "put" end.

    Make sure that "model", "item", "title", and "scraped" are not globals.
    Remember, any assignment to them in a global context makes them a global.

    You should never get "None" from the queue unless you put a "None"
    on the queue. "get()" blocks until there's work to do.

    John Nagle
  • Skunkwerk at May 9, 2008 at 3:40 pm

    On May 9, 12:12?am, John Nagle wrote:
    skunkwerk wrote:
    i'm getting the wrong output for the 'title' attributes for this
    data. ?the queue holds a data structure (item name, position, and list
    to store results in). ?each thread takes in an item name and queries a
    database for various attributes. ?from the debug statements the item
    names are being retrieved correctly, but the attributes returned are
    those of other items in the queue - not its own item. ?however, the
    model variable is not a global variable... so i'm not sure what's
    wrong.
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    queue = Queue.Queue(0)
    ?WORKERS0
    for i in range(WORKERS):
    ? ?thread = SDBThread(queue)
    ? ?thread.setDaemon(True)
    ? ?thread.start()
    the thread:
    class SimpleDBThread ( threading.Thread ):
    ? ?def __init__ ( self, queue ):
    ? ? ? ? ? ?self.__queue = queue
    ? ? ? ? ? ?threading.Thread.__init__ ( self )
    ? ?def run ( self ):
    ? ? ? ? ? ?while 1:
    ? ? ? ? ? ? ? ? ? ?item = self.__queue.get()
    ? ? ? ? ? ? ? ? ? ?if item!=None:
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?model = domain.get_item(item[0])
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?logger.debug('sdbthread item:'+item[0])
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?title = model['title']
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?scraped = model['scraped']
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?logger.debug("sdbthread title:"+title)
    any suggestions?
    thanks
    ? ?Hm. ?We don't have enough code here to see what's wrong.
    For one thing, we're not seeing how items get put on the queue. ?The
    trouble might be at the "put" end.

    ? ?Make sure that "model", "item", "title", and "scraped" are not globals.
    Remember, any assignment to them in a global context makes them a global.

    ? ?You should never get "None" from the queue unless you put a "None"
    on the queue. ?"get()" blocks until there's work to do.

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? John Nagle
    thanks John, Gabriel,
    here's the 'put' side of the requests:

    def prepSDBSearch(results):
    modelList = [0]
    counter=1
    for result in results:
    data = [result.item, counter, modelList]
    queue.put(data)
    counter+=1
    while modelList[0] < len(results):
    print 'waiting...'#wait for them to come home
    modelList.pop(0)#now remove '0'
    return modelList

    responses to your follow ups:
    1) 'item' in the threads is a list that corresponds to the 'data'
    list in the above function. it's not global, and the initial values
    seem ok, but i'm not sure if every time i pass in data to the queue it
    passes in the same memory address or declares a new 'data' list (which
    I guess is what I want)
    2) john, i don't think any of the variables you mentioned are
    global. the 'none' check was just for extra safety.
    3) the first item in the modelList is a counter that keeps track of
    the number of threads for this call that have completed - is there any
    better way of doing this?

    thanks again
  • Skunkwerk at May 11, 2008 at 4:16 pm

    On May 10, 1:31?pm, Dennis Lee Bieber wrote:
    On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<skunkw... at gmail.com>
    declaimed the following in comp.lang.python:

    ? ? ? ? Coming in late...
    On May 9, 12:12?am, John Nagle wrote:
    skunkwerkwrote:
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    ? ? ? ? <snip>


    queue = Queue.Queue(0)
    ?WORKERS0
    for i in range(WORKERS):
    ? ?thread = SDBThread(queue)
    ? ?thread.setDaemon(True)
    ? ?thread.start()
    the thread:
    class SimpleDBThread ( threading.Thread ):
    ? ?def __init__ ( self, queue ):
    ? ? ? ? ? ?self.__queue = queue
    ? ? ? ? Note: double-leading __ means "name mangling" -- typically only
    needed when doing multiple layers of inheritance where different parents
    have similar named items that need to be kept independent; a single _ is
    the convention for "don't touch me unless you know what you are doing"
    ? ? ? ? ? ?threading.Thread.__init__ ( self )
    ? ?def run ( self ):
    ? ? ? ? ? ?while 1:
    ? ? ? ? ? ? ? ? ? ?item = self.__queue.get()
    ? ? ? ? ? ? ? ? ? ?if item!=None:
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?model = domain.get_item(item[0])
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?logger.debug('sdbthread item:'+item[0])
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?title = model['title']
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?scraped = model['scraped']
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?logger.debug("sdbthread title:"+title)
    any suggestions?
    thanks
    ? ? ? ? <snip>
    thanks John, Gabriel,
    ? here's the 'put' side of the requests:
    def prepSDBSearch(results):
    ? ?modelList = [0]
    ? ?counter=1
    ? ?for result in results:
    ? ? ? ? ? ?data = [result.item, counter, modelList]
    ? ? ? ? ? ?queue.put(data)
    ? ? ? ? ? ?counter+=1
    ? ?while modelList[0] < len(results):
    ? ? ? ? ? ?print 'waiting...'#wait for them to come home
    ? ?modelList.pop(0)#now remove '0'
    ? ?return modelList
    ? ? ? ? My suggestion, if you really want diagnostic help -- follow the
    common recommendation of posting the minimal /runable (if erroneous)/
    code... If "domain.get_item()" is some sort of RDBM access, you might
    fake it using a pre-loaded dictionary -- anything that allows it to
    return something when given the key value.
    responses to your follow ups:
    1) ?'item' in thethreadsis a list that corresponds to the 'data'
    list in the above function. ?it's not global, and the initial values
    seem ok, but i'm not sure if every time i pass in data to the queue it
    passes in the same memory address or declares a new 'data' list (which
    I guess is what I want)
    ? ? ? ? Rather confusing usage... In your "put" you have a list whose first
    element is "result.item", but then in the work thread, you refer to the
    entire list as "item"
    3) ?the first item in the modelList is a counter that keeps track of
    the number ofthreadsfor this call that have completed - is there any
    better way of doing this?
    ? ? ? ? Where? None of your posted code shows either "counter" or modelList
    being used by thethreads.

    ? ? ? ? And yes, if you havethreadstrying to update a shared mutable, you
    have a race condition.

    ? ? ? ? You also have a problem if you are using "counter" to define where
    in modelList a thread is supposed to store its results -- as you can not
    access an element that doesn't already exist...

    a = [0]
    a[3] = 1 ? ? ? ?#failure, need to create elements 1, 2, 3 first

    ? ? ? ? Now, if position is irrelevant, and a thread just appends its
    results to modelList, then you don't need some counter, all you need is
    to check the length of modelList against the count expected.

    ? ? ? ? Overall -- even though you are passing things via the queue, the
    contents being pass via the queue are being treated as if they were
    global entities (you could make modelList a global, remove it from the
    queue entries, and have the same net access)...

    ? ? ? ? IOWs, you have too much coupling between thethreadsand the feed
    routine...

    ? ? ? ? As for me... I'd be using a second queue for return values...

    WORKERTHREADS = 100
    feed = Queue.Queue()
    result = Queue.Queue()

    def worker():
    ? ? ? ? while True:
    ? ? ? ? ? ? ? ? (ID, item) = feed.get() ? ? ? ? ? ? ? ? #I leave the queues globals
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? #since they perform locking
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? #internally
    ? ? ? ? ? ? ? ? model = domain.get_item(item)
    ? ? ? ? ? ? ? ? results.put( (ID, model["title"], model["scraped"]) )

    for i in range(WORKERTHREADS):
    ? ? ? ? aThread = threading.Thread(target=worker)
    ? ? ? ? ? ? ? ? #overkill to subclass as there is now no specialized init
    ? ? ? ? ? ? ? ? #and if I really wanted to make the queues non-global
    ? ? ? ? ? ? ? ? #I'd pass them as arguments:
    ? ? ? ? ? ? ? ? # ? ? ? threading.Thread(target=worker, args=(feed, results))
    ? ? ? ? ? ? ? ? #where worker is now
    ? ? ? ? ? ? ? ? # ? ? ? def worker(feed, results):
    ? ? ? ? aThread.setDaemon(True)
    ? ? ? ? aThread.start()

    ...

    def prepSearch(searches):
    ? ? ? ? modelList = []
    ? ? ? ? counter = 0
    ? ? ? ? for searchItem in searches:
    ? ? ? ? ? ? ? ? feed.put( (counter, searchItem) )
    ? ? ? ? ? ? ? ? counter += 1
    ? ? ? ? ? ? ? ? modelList.append(None) ?#extend list one element per search
    ? ? ? ? while counter:
    ? ? ? ? ? ? ? ? (ID, title, scraped) = results.get()
    ? ? ? ? ? ? ? ? modelList[ID] = (title, scraped)
    ? ? ? ? ? ? ? ? counter -= 1
    ? ? ? ? return modelList

    ? ? ? ? The only place counter and modelList are modified are within the
    prepSearch. I'm passing counter out and back to use as an ID value if
    the final results are supposed to be in order -- that way if one thread
    finishes before another, the items can be placed into the list where
    they'd have been sequentially.

    ? ? ? ? I can only hope that "domain.get_item" is an activity that is I/O
    bound AND that it supports parallel accesses... Otherwise the above
    workerthreadsseem to be adding a lot of overhead for queue I/O and
    threading swaps for what is otherwise a rather linear process.

    ? ? ? ? Perhaps your posts don't reveal enough... Maybe you have multiple
    mainthreadsthat are posting to the worker feed queue (and you were
    using separate mutables for storing the results). In this situation, I'd
    remove the results queue from being a global entity, create one queue
    per main processing thread, and pass the queue as one of the parameters.
    This way, a worker can return data to any source thread by using the
    supplied queue for the return...

    Modify prepSearch with:

    ? ? ? ? myQueue = Queue.Queue()
    ...
    ? ? ? ? feed.put( (counter, searchItem, myQueue) )
    ...
    ? ? ? ? (ID, title, scraped) = myQueue.get()

    Modify worker with:

    ? ? ? ? (ID, item, retqueue) = feed.get()
    ...
    ? ? ? ? retqueue.put( (ID, model["title"], model["scraped"]) )
    --
    ? ? ? ? Wulfraed ? ? ? ?Dennis Lee Bieber ? ? ? ? ? ? ? KD6MOG
    ? ? ? ? wlfr... at ix.netcom.com ? ? ? ? ? ? ? wulfr... at bestiaria.com
    ? ? ? ? ? ? ? ? HTTP://wlfraed.home.netcom.com/
    ? ? ? ? (Bestiaria Support Staff: ? ? ? ? ? ? ? web-a... at bestiaria.com)
    ? ? ? ? ? ? ? ? HTTP://www.bestiaria.com/
    wow. thanks for the help.
    i seem to have fixed my problem though - it turns out
    domain.get_item was not thread safe as it was using the underlying
    httplib. the solution was to create a new connection to the database
    for each thread (which is ok as the database is meant to be queried in
    a massively paralell fashion). the missing part of the code included
    a part where i inserted the results at the given position into the
    list.

    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access? It wouldn't help having a .pyc or using
    psycho, correct, as the time is being spent in the runtime? something
    along the lines of 'start a new thread every minute until you get to a
    100' without blocking the execution of the rest of the code in that
    file? or maybe any time i need to do a search, start a new thread if
    the #threads is <100?

    thanks again
  • Gabriel Genellina at May 12, 2008 at 4:10 am

    En Sun, 11 May 2008 13:16:25 -0300, skunkwerk <skunkwerk at gmail.com> escribi?:

    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access?
    Like using a separate thread to create the other 100?

    --
    Gabriel Genellina
  • Rhamphoryncus at May 12, 2008 at 8:40 am

    On May 11, 10:16 am, skunkwerk wrote:
    On May 10, 1:31 pm, Dennis Lee Bieber wrote:


    On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<skunkw... at gmail.com>
    declaimed the following in comp.lang.python:
    Coming in late...
    On May 9, 12:12 am, John Nagle wrote:
    skunkwerkwrote:
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    <snip>
    queue = Queue.Queue(0)
    WORKERS0
    for i in range(WORKERS):
    thread = SDBThread(queue)
    thread.setDaemon(True)
    thread.start()
    the thread:
    class SimpleDBThread ( threading.Thread ):
    def __init__ ( self, queue ):
    self.__queue = queue
    Note: double-leading __ means "name mangling" -- typically only
    needed when doing multiple layers of inheritance where different parents
    have similar named items that need to be kept independent; a single _ is
    the convention for "don't touch me unless you know what you are doing"
    threading.Thread.__init__ ( self )
    def run ( self ):
    while 1:
    item = self.__queue.get()
    if item!=None:
    model = domain.get_item(item[0])
    logger.debug('sdbthread item:'+item[0])
    title = model['title']
    scraped = model['scraped']
    logger.debug("sdbthread title:"+title)
    any suggestions?
    thanks
    <snip>
    thanks John, Gabriel,
    here's the 'put' side of the requests:
    def prepSDBSearch(results):
    modelList = [0]
    counter=1
    for result in results:
    data = [result.item, counter, modelList]
    queue.put(data)
    counter+=1
    while modelList[0] < len(results):
    print 'waiting...'#wait for them to come home
    modelList.pop(0)#now remove '0'
    return modelList
    My suggestion, if you really want diagnostic help -- follow the
    common recommendation of posting the minimal /runable (if erroneous)/
    code... If "domain.get_item()" is some sort of RDBM access, you might
    fake it using a pre-loaded dictionary -- anything that allows it to
    return something when given the key value.
    responses to your follow ups:
    1) 'item' in thethreadsis a list that corresponds to the 'data'
    list in the above function. it's not global, and the initial values
    seem ok, but i'm not sure if every time i pass in data to the queue it
    passes in the same memory address or declares a new 'data' list (which
    I guess is what I want)
    Rather confusing usage... In your "put" you have a list whose first
    element is "result.item", but then in the work thread, you refer to the
    entire list as "item"
    3) the first item in the modelList is a counter that keeps track of
    the number ofthreadsfor this call that have completed - is there any
    better way of doing this?
    Where? None of your posted code shows either "counter" or modelList
    being used by thethreads.
    And yes, if you havethreadstrying to update a shared mutable, you
    have a race condition.
    You also have a problem if you are using "counter" to define where
    in modelList a thread is supposed to store its results -- as you can not
    access an element that doesn't already exist...
    a = [0]
    a[3] = 1 #failure, need to create elements 1, 2, 3 first
    Now, if position is irrelevant, and a thread just appends its
    results to modelList, then you don't need some counter, all you need is
    to check the length of modelList against the count expected.
    Overall -- even though you are passing things via the queue, the
    contents being pass via the queue are being treated as if they were
    global entities (you could make modelList a global, remove it from the
    queue entries, and have the same net access)...
    IOWs, you have too much coupling between thethreadsand the feed
    routine...
    As for me... I'd be using a second queue for return values...
    WORKERTHREADS = 100
    feed = Queue.Queue()
    result = Queue.Queue()
    def worker():
    while True:
    (ID, item) = feed.get() #I leave the queues globals
    #since they perform locking
    #internally
    model = domain.get_item(item)
    results.put( (ID, model["title"], model["scraped"]) )
    for i in range(WORKERTHREADS):
    aThread = threading.Thread(target=worker)
    #overkill to subclass as there is now no specialized init
    #and if I really wanted to make the queues non-global
    #I'd pass them as arguments:
    # threading.Thread(target=worker, args=(feed, results))
    #where worker is now
    # def worker(feed, results):
    aThread.setDaemon(True)
    aThread.start()
    ...
    def prepSearch(searches):
    modelList = []
    counter = 0
    for searchItem in searches:
    feed.put( (counter, searchItem) )
    counter += 1
    modelList.append(None) #extend list one element per search
    while counter:
    (ID, title, scraped) = results.get()
    modelList[ID] = (title, scraped)
    counter -= 1
    return modelList
    The only place counter and modelList are modified are within the
    prepSearch. I'm passing counter out and back to use as an ID value if
    the final results are supposed to be in order -- that way if one thread
    finishes before another, the items can be placed into the list where
    they'd have been sequentially.
    I can only hope that "domain.get_item" is an activity that is I/O
    bound AND that it supports parallel accesses... Otherwise the above
    workerthreadsseem to be adding a lot of overhead for queue I/O and
    threading swaps for what is otherwise a rather linear process.
    Perhaps your posts don't reveal enough... Maybe you have multiple
    mainthreadsthat are posting to the worker feed queue (and you were
    using separate mutables for storing the results). In this situation, I'd
    remove the results queue from being a global entity, create one queue
    per main processing thread, and pass the queue as one of the parameters.
    This way, a worker can return data to any source thread by using the
    supplied queue for the return...
    Modify prepSearch with:
    myQueue = Queue.Queue()
    ...
    feed.put( (counter, searchItem, myQueue) )
    ...
    (ID, title, scraped) = myQueue.get()
    Modify worker with:
    (ID, item, retqueue) = feed.get()
    ...
    retqueue.put( (ID, model["title"], model["scraped"]) )
    wow. thanks for the help.
    i seem to have fixed my problem though - it turns out
    domain.get_item was not thread safe as it was using the underlying
    httplib. the solution was to create a new connection to the database
    for each thread (which is ok as the database is meant to be queried in
    a massively paralell fashion). the missing part of the code included
    a part where i inserted the results at the given position into the
    list.

    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access? It wouldn't help having a .pyc or using
    psycho, correct, as the time is being spent in the runtime? something
    along the lines of 'start a new thread every minute until you get to a
    100' without blocking the execution of the rest of the code in that
    file? or maybe any time i need to do a search, start a new thread if
    the #threads is <100?
    $ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
    t.start(); t.join()'
    10000 loops, best of 3: 131 usec per loop

    Clearly it is not the threads themselves, but something else which is
    expensive.

    It's not clear why you need threads at all. Unless you've got a lot
    of cpus/cores running that DBMS, or it's got fairly high latency (and
    no way to pipeline), attacking it with more threads isn't gonna give
    significant speedups.
  • Skunkwerk at May 12, 2008 at 7:31 pm

    On May 12, 1:40 am, Rhamphoryncus wrote:
    On May 11, 10:16 am,skunkwerkwrote:


    On May 10, 1:31 pm, Dennis Lee Bieber wrote:

    On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<skunkw... at gmail.com>
    declaimed the following in comp.lang.python:
    Coming in late...
    On May 9, 12:12 am, John Nagle wrote:
    skunkwerkwrote:
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    <snip>
    queue = Queue.Queue(0)
    WORKERS0
    for i in range(WORKERS):
    thread = SDBThread(queue)
    thread.setDaemon(True)
    thread.start()
    the thread:
    class SimpleDBThread ( threading.Thread ):
    def __init__ ( self, queue ):
    self.__queue = queue
    Note: double-leading __ means "name mangling" -- typically only
    needed when doing multiple layers of inheritance where different parents
    have similar named items that need to be kept independent; a single _ is
    the convention for "don't touch me unless you know what you are doing"
    threading.Thread.__init__ ( self )
    def run ( self ):
    while 1:
    item = self.__queue.get()
    if item!=None:
    model = domain.get_item(item[0])
    logger.debug('sdbthread item:'+item[0])
    title = model['title']
    scraped = model['scraped']
    logger.debug("sdbthread title:"+title)
    any suggestions?
    thanks
    <snip>
    thanks John, Gabriel,
    here's the 'put' side of the requests:
    def prepSDBSearch(results):
    modelList = [0]
    counter=1
    for result in results:
    data = [result.item, counter, modelList]
    queue.put(data)
    counter+=1
    while modelList[0] < len(results):
    print 'waiting...'#wait for them to come home
    modelList.pop(0)#now remove '0'
    return modelList
    My suggestion, if you really want diagnostic help -- follow the
    common recommendation of posting the minimal /runable (if erroneous)/
    code... If "domain.get_item()" is some sort of RDBM access, you might
    fake it using a pre-loaded dictionary -- anything that allows it to
    return something when given the key value.
    responses to your follow ups:
    1) 'item' in thethreadsis a list that corresponds to the 'data'
    list in the above function. it's not global, and the initial values
    seem ok, but i'm not sure if every time i pass in data to the queue it
    passes in the same memory address or declares a new 'data' list (which
    I guess is what I want)
    Rather confusing usage... In your "put" you have a list whose first
    element is "result.item", but then in the work thread, you refer to the
    entire list as "item"
    3) the first item in the modelList is a counter that keeps track of
    the number ofthreadsfor this call that have completed - is there any
    better way of doing this?
    Where? None of your posted code shows either "counter" or modelList
    being used by thethreads.
    And yes, if you havethreadstrying to update a shared mutable, you
    have a race condition.
    You also have a problem if you are using "counter" to define where
    in modelList a thread is supposed to store its results -- as you can not
    access an element that doesn't already exist...
    a = [0]
    a[3] = 1 #failure, need to create elements 1, 2, 3 first
    Now, if position is irrelevant, and a thread just appends its
    results to modelList, then you don't need some counter, all you need is
    to check the length of modelList against the count expected.
    Overall -- even though you are passing things via the queue, the
    contents being pass via the queue are being treated as if they were
    global entities (you could make modelList a global, remove it from the
    queue entries, and have the same net access)...
    IOWs, you have too much coupling between thethreadsand the feed
    routine...
    As for me... I'd be using a second queue for return values...
    WORKERTHREADS = 100
    feed = Queue.Queue()
    result = Queue.Queue()
    def worker():
    while True:
    (ID, item) = feed.get() #I leave the queues globals
    #since they perform locking
    #internally
    model = domain.get_item(item)
    results.put( (ID, model["title"], model["scraped"]) )
    for i in range(WORKERTHREADS):
    aThread = threading.Thread(target=worker)
    #overkill to subclass as there is now no specialized init
    #and if I really wanted to make the queues non-global
    #I'd pass them as arguments:
    # threading.Thread(target=worker, args=(feed, results))
    #where worker is now
    # def worker(feed, results):
    aThread.setDaemon(True)
    aThread.start()
    ...
    def prepSearch(searches):
    modelList = []
    counter = 0
    for searchItem in searches:
    feed.put( (counter, searchItem) )
    counter += 1
    modelList.append(None) #extend list one element per search
    while counter:
    (ID, title, scraped) = results.get()
    modelList[ID] = (title, scraped)
    counter -= 1
    return modelList
    The only place counter and modelList are modified are within the
    prepSearch. I'm passing counter out and back to use as an ID value if
    the final results are supposed to be in order -- that way if one thread
    finishes before another, the items can be placed into the list where
    they'd have been sequentially.
    I can only hope that "domain.get_item" is an activity that is I/O
    bound AND that it supports parallel accesses... Otherwise the above
    workerthreadsseem to be adding a lot of overhead for queue I/O and
    threading swaps for what is otherwise a rather linear process.
    Perhaps your posts don't reveal enough... Maybe you have multiple
    mainthreadsthat are posting to the worker feed queue (and you were
    using separate mutables for storing the results). In this situation, I'd
    remove the results queue from being a global entity, create one queue
    per main processing thread, and pass the queue as one of the parameters.
    This way, a worker can return data to any source thread by using the
    supplied queue for the return...
    Modify prepSearch with:
    myQueue = Queue.Queue()
    ...
    feed.put( (counter, searchItem, myQueue) )
    ...
    (ID, title, scraped) = myQueue.get()
    Modify worker with:
    (ID, item, retqueue) = feed.get()
    ...
    retqueue.put( (ID, model["title"], model["scraped"]) )
    wow. thanks for the help.
    i seem to have fixed my problem though - it turns out
    domain.get_item was not thread safe as it was using the underlying
    httplib. the solution was to create a new connection to the database
    for each thread (which is ok as the database is meant to be queried in
    a massively paralell fashion). the missing part of the code included
    a part where i inserted the results at the given position into the
    list.
    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access? It wouldn't help having a .pyc or using
    psycho, correct, as the time is being spent in the runtime? something
    along the lines of 'start a new thread every minute until you get to a
    100' without blocking the execution of the rest of the code in that
    file? or maybe any time i need to do a search, start a new thread if
    the #threads is <100?
    $ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
    t.start(); t.join()'
    10000 loops, best of 3: 131 usec per loop

    Clearly it is not the threads themselves, but something else which is
    expensive.

    It's not clear why you need threads at all. Unless you've got a lot
    of cpus/cores running that DBMS, or it's got fairly high latency (and
    no way to pipeline), attacking it with more threads isn't gonna give
    significant speedups.
    correct. the threads themselves are not taking up the time, but the
    initialization of each thread (which includes making a new connection
    to the database) - typically this is ~3 seconds. The database is
    amazon's simpleDB, which is meant to support massively parallel
    queries. once the connection has been made, queries are very fast.

    thanks
  • Rhamphoryncus at May 12, 2008 at 8:19 pm

    On May 12, 1:31 pm, skunkwerk wrote:
    On May 12, 1:40 am, Rhamphoryncus wrote:


    On May 11, 10:16 am,skunkwerkwrote:
    On May 10, 1:31 pm, Dennis Lee Bieber wrote:

    On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<skunkw... at gmail.com>
    declaimed the following in comp.lang.python:
    Coming in late...
    On May 9, 12:12 am, John Nagle wrote:
    skunkwerkwrote:
    i've declared a bunch of workerthreads(100) and a queue into which
    new requests are inserted, like so:
    <snip>
    queue = Queue.Queue(0)
    WORKERS0
    for i in range(WORKERS):
    thread = SDBThread(queue)
    thread.setDaemon(True)
    thread.start()
    the thread:
    class SimpleDBThread ( threading.Thread ):
    def __init__ ( self, queue ):
    self.__queue = queue
    Note: double-leading __ means "name mangling" -- typically only
    needed when doing multiple layers of inheritance where different parents
    have similar named items that need to be kept independent; a single _ is
    the convention for "don't touch me unless you know what you are doing"
    threading.Thread.__init__ ( self )
    def run ( self ):
    while 1:
    item = self.__queue.get()
    if item!=None:
    model = domain.get_item(item[0])
    logger.debug('sdbthread item:'+item[0])
    title = model['title']
    scraped = model['scraped']
    logger.debug("sdbthread title:"+title)
    any suggestions?
    thanks
    <snip>
    thanks John, Gabriel,
    here's the 'put' side of the requests:
    def prepSDBSearch(results):
    modelList = [0]
    counter=1
    for result in results:
    data = [result.item, counter, modelList]
    queue.put(data)
    counter+=1
    while modelList[0] < len(results):
    print 'waiting...'#wait for them to come home
    modelList.pop(0)#now remove '0'
    return modelList
    My suggestion, if you really want diagnostic help -- follow the
    common recommendation of posting the minimal /runable (if erroneous)/
    code... If "domain.get_item()" is some sort of RDBM access, you might
    fake it using a pre-loaded dictionary -- anything that allows it to
    return something when given the key value.
    responses to your follow ups:
    1) 'item' in thethreadsis a list that corresponds to the 'data'
    list in the above function. it's not global, and the initial values
    seem ok, but i'm not sure if every time i pass in data to the queue it
    passes in the same memory address or declares a new 'data' list (which
    I guess is what I want)
    Rather confusing usage... In your "put" you have a list whose first
    element is "result.item", but then in the work thread, you refer to the
    entire list as "item"
    3) the first item in the modelList is a counter that keeps track of
    the number ofthreadsfor this call that have completed - is there any
    better way of doing this?
    Where? None of your posted code shows either "counter" or modelList
    being used by thethreads.
    And yes, if you havethreadstrying to update a shared mutable, you
    have a race condition.
    You also have a problem if you are using "counter" to define where
    in modelList a thread is supposed to store its results -- as you can not
    access an element that doesn't already exist...
    a = [0]
    a[3] = 1 #failure, need to create elements 1, 2, 3 first
    Now, if position is irrelevant, and a thread just appends its
    results to modelList, then you don't need some counter, all you need is
    to check the length of modelList against the count expected.
    Overall -- even though you are passing things via the queue, the
    contents being pass via the queue are being treated as if they were
    global entities (you could make modelList a global, remove it from the
    queue entries, and have the same net access)...
    IOWs, you have too much coupling between thethreadsand the feed
    routine...
    As for me... I'd be using a second queue for return values...
    WORKERTHREADS = 100
    feed = Queue.Queue()
    result = Queue.Queue()
    def worker():
    while True:
    (ID, item) = feed.get() #I leave the queues globals
    #since they perform locking
    #internally
    model = domain.get_item(item)
    results.put( (ID, model["title"], model["scraped"]) )
    for i in range(WORKERTHREADS):
    aThread = threading.Thread(target=worker)
    #overkill to subclass as there is now no specialized init
    #and if I really wanted to make the queues non-global
    #I'd pass them as arguments:
    # threading.Thread(target=worker, args=(feed, results))
    #where worker is now
    # def worker(feed, results):
    aThread.setDaemon(True)
    aThread.start()
    ...
    def prepSearch(searches):
    modelList = []
    counter = 0
    for searchItem in searches:
    feed.put( (counter, searchItem) )
    counter += 1
    modelList.append(None) #extend list one element per search
    while counter:
    (ID, title, scraped) = results.get()
    modelList[ID] = (title, scraped)
    counter -= 1
    return modelList
    The only place counter and modelList are modified are within the
    prepSearch. I'm passing counter out and back to use as an ID value if
    the final results are supposed to be in order -- that way if one thread
    finishes before another, the items can be placed into the list where
    they'd have been sequentially.
    I can only hope that "domain.get_item" is an activity that is I/O
    bound AND that it supports parallel accesses... Otherwise the above
    workerthreadsseem to be adding a lot of overhead for queue I/O and
    threading swaps for what is otherwise a rather linear process.
    Perhaps your posts don't reveal enough... Maybe you have multiple
    mainthreadsthat are posting to the worker feed queue (and you were
    using separate mutables for storing the results). In this situation, I'd
    remove the results queue from being a global entity, create one queue
    per main processing thread, and pass the queue as one of the parameters.
    This way, a worker can return data to any source thread by using the
    supplied queue for the return...
    Modify prepSearch with:
    myQueue = Queue.Queue()
    ...
    feed.put( (counter, searchItem, myQueue) )
    ...
    (ID, title, scraped) = myQueue.get()
    Modify worker with:
    (ID, item, retqueue) = feed.get()
    ...
    retqueue.put( (ID, model["title"], model["scraped"]) )
    wow. thanks for the help.
    i seem to have fixed my problem though - it turns out
    domain.get_item was not thread safe as it was using the underlying
    httplib. the solution was to create a new connection to the database
    for each thread (which is ok as the database is meant to be queried in
    a massively paralell fashion). the missing part of the code included
    a part where i inserted the results at the given position into the
    list.
    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access? It wouldn't help having a .pyc or using
    psycho, correct, as the time is being spent in the runtime? something
    along the lines of 'start a new thread every minute until you get to a
    100' without blocking the execution of the rest of the code in that
    file? or maybe any time i need to do a search, start a new thread if
    the #threads is <100?
    $ python2.5 -m timeit -s 'import threading' 't = threading.Thread();
    t.start(); t.join()'
    10000 loops, best of 3: 131 usec per loop
    Clearly it is not the threads themselves, but something else which is
    expensive.
    It's not clear why you need threads at all. Unless you've got a lot
    of cpus/cores running that DBMS, or it's got fairly high latency (and
    no way to pipeline), attacking it with more threads isn't gonna give
    significant speedups.
    correct. the threads themselves are not taking up the time, but the
    initialization of each thread (which includes making a new connection
    to the database) - typically this is ~3 seconds. The database is
    amazon's simpleDB, which is meant to support massively parallel
    queries. once the connection has been made, queries are very fast.
    So it shouldn't take much more than 3 seconds to create all 100
    threads. It certainly should take 5 minutes. However, 3 seconds *
    100 = 5 minutes, so it sounds like the connection process is getting
    serialized somehow.

    Maybe you're doing the connection in the thread's __init__? Note that
    __init__ is called when the thread *object* is created, by the main
    thread, and not when you *start* the thread.

    I find the threading.Thread API to be vastly over complicated. It's
    much simpler to wrap it like this:

    def start_thread(func, *args, **kwargs):
    t = threading.Thread(target=func, args=args, kwargs=kwargs)
    t.start()
    return t

    Then you can pass a function to start_thread and it will run in the
    new child thread.
  • Skunkwerk at May 12, 2008 at 7:29 pm

    On May 11, 1:55 pm, Dennis Lee Bieber wrote:
    On Sun, 11 May 2008 09:16:25 -0700 (PDT),skunkwerk
    <skunkw... at gmail.com> declaimed the following in comp.lang.python:


    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access? It wouldn't help having a .pyc or using
    psycho, correct, as the time is being spent in the runtime? something
    along the lines of 'start a new thread every minute until you get to a
    100' without blocking the execution of the rest of the code in that
    file? or maybe any time i need to do a search, start a new thread if
    the #threads is <100?
    Is this running as part of the server process, or as a client
    accessing the server?

    Alternative question: Have you tried measuring the performance using
    /fewer/ threads... 25 or less? I believe I'd mentioned prior that you
    seem to have a lot of overhead code for what may be a short query.

    If the .get_item() code is doing a full sequence of: connect to
    database; format&submit query; fetch results; disconnect from
    database... I'd recommend putting the connect/disconnect outside of the
    thread while loop (though you may then need to put sentinel values into
    the feed queue -- one per thread -- so they can cleanly exit and
    disconnect rather than relying on daemonization for exit).

    thread:
    dbcon = ...
    while True:
    query = Q.get()
    if query == SENTINEL: break
    result = get_item(dbcon, query)
    ...
    dbcon.close()

    Third alternative: Find some way to combine the database queries.
    Rather than 100 threads each doing a single lookup (from your code, it
    appears that only 1 result is expected per search term), run 10 threads
    each looking up 10 items at once...

    thread:
    dbcon = ...
    terms = []
    terminate = False
    while not terminate:
    while len(terms) < 10:
    query = Q.get_nowait()
    if not query: break
    if query == SENTINEL:
    terminate = True
    break
    terms.append(query)
    results = get_item(dbcon, terms)
    terms = []
    #however you are returning items; match the query term to the
    #key item in the list of returned data?
    dbcon.close()

    where the final select statement looks something like:

    SQL = """select key, title, scraped from ***
    where key in ( %s )""" % ", ".join("?" for x in terms)
    #assumes database adapter uses ? for placeholder
    dbcur.execute(SQL, terms)
    --
    Wulfraed Dennis Lee Bieber KD6MOG
    wlfr... at ix.netcom.com wulfr... at bestiaria.com
    HTTP://wlfraed.home.netcom.com/
    (Bestiaria Support Staff: web-a... at bestiaria.com)
    HTTP://www.bestiaria.com/
    thanks again Dennis,
    i chose 100 threads so i could do 10 simultaneous searches (where
    each search contains 10 terms - using 10 threads). the .get_item()
    code is not doing the database connection - rather the intialization
    is done in the initialization of each thread. so basically once a
    thread starts the database connection is persistent and .get_item
    queries are very fast. this is running as a server process (using
    django).

    cheers
  • Skunkwerk at May 12, 2008 at 7:33 pm

    On May 11, 9:10 pm, "Gabriel Genellina" wrote:
    En Sun, 11 May 2008 13:16:25 -0300,skunkwerk<skunkw... at gmail.com> escribi?:
    the only issue i have now is that it takes a long time for 100 threads
    to initialize that connection (>5 minutes) - and as i'm doing this on
    a webserver any time i update the code i have to restart all those
    threads, which i'm doing right now in a for loop. is there any way I
    can keep the thread stuff separate from the rest of the code for this
    file, yet allow access?
    Like using a separate thread to create the other 100?

    --
    Gabriel Genellina
    thanks Gabriel,
    i think that could do it - let me try it out. don't know why i
    didn't think of it earlier.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouppython-list @
categoriespython
postedMay 8, '08 at 10:45p
activeMay 12, '08 at 8:19p
posts13
users5
websitepython.org

People

Translate

site design / logo © 2022 Grokbase