FAQ
I'm working my first multithreaded Python program. This program
compares the contents of an ftp site with a local copy. (It is not a
pure mirror site, but there is a 1-1 correspondence of files.) It then
ftp's only files which are newer on the source site than the target
site. There are often many files, so the ftp is set up to use multiple
threads, each of which repeatedly locks a global semaphore, pops a task
off a stack, releases the semaphore, and executes the task (a file to
ftp) ad nauseam.
My problem is this: I want to run this program as a scheduled task
under a job scheduler (currently testing cronDsys from #ifdef). The
single-threaded version of the program posts an appropriate code to the
environment via sys.exit(). The job scheduler traps this and takes
conditional action. So far, so good.
In the multi-threaded version of the program, the exit codes are not
getting posted and the main process is not terminating cleanly, at
least from the perspective of the job scheduler -- it appears to run
until manually terminated from the scheduler control console although
the NT command window disappears from the target desktop as expected.
The code fragments below are the ftp thread class and the __main__
section.
Can anybody help me with this ??
class FTPThread(Threading.Thread):
# def __init__(self,group,target,name,args,kwargs):
# print "in Init"
def run(self):
print self._Thread__name
print self._Thread__args
try:
ftp=ftplib.FTP('ftp.microsoft.com')
ftp.login('anonymous','dickenson at serviceware.com')
ftp.cwd('misc/kb')
except:
sys.exit(3)
subdir=self._Thread__args[0]
# print self,self._Thread__args
b=stacklen(self._Thread__args[1])
print "b at start=",b
while (b > 0):
# print "thread ",self._Thread__name," attempting to get
semaphore"
a=stacksem.acquire()
# print "thread ",self._Thread__name," got semaphore"
msfilepath=getstack(self._Thread__args[1])
newtime=getstack(self._Thread__args[2])
# print "thread ",self._Thread__name," attempting to release
semaphore"
a=stacksem.release()
# print "thread ",self._Thread__name," released semaphore"
#print "msfilepath",msfilepath
local0=string.replace(msfilepath,".","",1)
local1=string.replace(local0,"\\","")
localdrive=Target_Root+"updates\\\\"+subdir+"\\\\"
local=[localdrive,local1]
local2=string.join(local)
localfilepath=string.replace(local2," ","")
file=open(localfilepath,"w")
cmdstrings=["retr ",msfilepath]
cmd=string.join(cmdstrings)
try:
ftp.retrbinary(cmd,file.write)
print b," - file", msfilepath, "retrieved successfully
by thread ",self._Thread__name
if subdir=="revised":
revisecount=revisecount+1
if subdir=="new":
newcount=newcount+1
#print "success by thread ",self._Thread__name
#filecount=filecount+1
#print "filecount=",filecount
except NameError:
pass
else:
print sys.exc_info()
print "file ", msfilepath, " was not retrieved"
#print "failure by
thread ",self._Thread__name
file.close()
os.system("erase "+localfilepath)
sys.exit(1)
file.close()
# print "newtime",newtime
os.utime(localfilepath,(newtime,newtime))
b=stacklen(self._Thread__args[1])
# print "b at exit=",b
ftp.quit
sys.exit(0)
def notify(target,newcount,revisecount):
host="SNERT"
fromstring="Python Auto-FTP"
# tostring=["dickenson at serviceware.com"]
tostring["dickenson at serviceware.com","hlewis at serviceware.com","gkindel at servicewa
re.com","mciaramitaro at serviceware.com"]
msg="Subject: AutoFTP results\n\nUpdated files: " + str(revisecount)
+"\nNew files: "+str(newcount)+"\nlocated in subdirs off "+target
+ "\\\\updates"
s=smtplib.SMTP(host)
s.sendmail(fromstring,tostring,msg)
if __name__=='__main__':
Threadpool
stacksem=Threading.Semaphore(1)
Month_Num_Dict{'Jan':1,'Feb':2,'Mar':3,'Apr':4,'May':5, 'Jun':6,'Jul':7,'Aug':8,'Sep':
9,'Oct':10,'Nov':11,'Dec':12}
Target_Dict={}
Source_Dict={}
Fix_These=Stack(500)
Fix_Time=Stack(500)
New_These=Stack(500)
New_Time=Stack(500)
newcount=0
revisecount=0
Target_Root="S:\\\\Microsoft\\\\"
#Target_Root="M:\\\\msftp\\\\"
print "read source"
aÚteTime.now()
print a.hour, a.minute, a.second
read_source()
print "read target"
aÚteTime.now()
print a.hour, a.minute, a.second
read_target()
print "lookup source"
aÚteTime.now()
print a.hour, a.minute, a.second
newcount,revisecount=lookup_source()
print "start ftp..."
aÚteTime.now()
print a.hour, a.minute,a.second
threads=[]
for i in range(Threadpool):
my_thread=FTPThread(group=None,target=None,name=None,args("revised","Fix_These","Fix_Time"),kwargs={})
#my_thread.__init__(group=None,target=None,name="Revised
1",args=("revised","Fix_These","Fix_Time"),kwargs={})
my_thread.start()
threads.append(my_thread)
# for i in range(6):
my_thread=FTPThread(group=None,target=None,name=None,args("new","New_These","New_Time"),kwargs={})
# my_thread.__init__(group=None,target=None,name="New 1",args("new","New_These","New_Time"),kwargs={})
my_thread.start()
threads.append(my_thread)
for thread in threads:
thread.join()
print "all threads finished"
try:
notify(Target_Root,newcount,revisecount)
print "notify done"
finally:
sys.exit(0)


Sent via Deja.com http://www.deja.com/
Before you buy.

Search Discussions

  • David Fisher at Apr 1, 2000 at 4:52 am
    ----- Original Message -----
    From: <dickenson at serviceware.com>
    Newsgroups: comp.lang.python
    To: <python-list at python.org>
    Sent: Friday, March 31, 2000 1:26 PM
    Subject: Python Threading and sys.exit() codes
    My problem is this: I want to run this program as a scheduled task
    under a job scheduler (currently testing cronDsys from #ifdef). The
    single-threaded version of the program posts an appropriate code to the
    environment via sys.exit(). The job scheduler traps this and takes
    conditional action. So far, so good.
    In the multi-threaded version of the program, the exit codes are not
    getting posted and the main process is not terminating cleanly, at
    least from the perspective of the job scheduler -- it appears to run
    until manually terminated from the scheduler control console although
    the NT command window disappears from the target desktop as expected.
    Well for one thing, sys.exit() doesn't do what you think it does.
    Sys.exit() raises an exception SystemExit which if not caught will cause the
    thread to exit silently. Read the docs for thread which threading is built
    on. I don't know of a way to catch an exception that another thread raises.
    Doesn't mean there isn't one, just that I don't know one.

    Another thing you look at is the Queue module. That's what got my attention
    because you are setting a global lock and poping stuff off of a fifo stack,
    and Queue does all that for you. Here's some code:

    import Queue
    import thread #yes, I know I'm supposed to use threading

    def worker(jobqueue):
    while 1:
    job = jobqueue.get() # this blocks until a job is available
    print job # or whatever

    def main(joblist):
    jq = Queue.Queue(0)
    for i in range(10):
    thread.start_new_thread(worker,(jq,))
    for job in joblist:
    jq.put(job)

    There are lots of things that could be prettified in this code, but I hope
    it gives you some ideas. You might try a second Queue to gather return
    information.

    Good luck,
    David
  • Bob Dickenson at Apr 1, 2000 at 3:56 pm
    These are good suggestions. A couple of questions/comments.

    I thought about using the Queue model, but I was experimenting with MxStack
    at the time. It seemed to work fine, so...

    I think I need the global lock to get proper thread sync because I have to
    pop values off two stacks for each "job" - the file to be ftp'd and it's
    time stamp on the source machine (for resetting it locally after the
    ftp.retrbinary so that the next time the job runs it does valid date
    compares.) If I'm missing something with this logic, I'd appreciate a
    clarification. (I suppose I could compose a tuple and push/pop that, but
    otherwise....)

    I have read the thread doc and know that the sys.exit() in the FTPthread
    class def is bogus on my part--part of the "prettifying" the code needs (mea
    culpa).

    It's the sys.exit() (explicit or implied) from the __main__ thread after the
    iterative thread.join of the ftp worker threads finishes that seems to be
    causing me problems. The process isn't "finishing" from the perspective of
    the job scheduler. The single-threaded version surfaces both an implied or
    explicit exit code to the OS, the multi-threaded version does not surface
    anything. The SMTP message gets sent (last item before the sys.exit() in
    __main__) AND the command shell window closes, but something seems to be at
    loose ends on the process termination cleanup side.

    (BTW - the single threaded version of this program had about 60 files per
    minute throughput on the ftp; with 12 worker threads I'm getting between
    800-1000 files per minute. The flux rate on the target site is several
    thousand files per day, so it's really worth it to use the multi-threaded
    version)

    Bob
    "For a list of the ways which technology has failed to improve our
    quality of life, press 3"



    -----Original Message-----
    From: David Fisher [mailto:python at rose164.wuh.wustl.edu]
    Sent: Friday, March 31, 2000 11:52 PM
    To: dickenson at serviceware.com
    Cc: python-list at python.org
    Subject: Re: Python Threading and sys.exit() codes


    ----- Original Message -----
    From: <dickenson at serviceware.com>
    Newsgroups: comp.lang.python
    To: <python-list at python.org>
    Sent: Friday, March 31, 2000 1:26 PM
    Subject: Python Threading and sys.exit() codes
    My problem is this: I want to run this program as a scheduled task
    under a job scheduler (currently testing cronDsys from #ifdef). The
    single-threaded version of the program posts an appropriate code to the
    environment via sys.exit(). The job scheduler traps this and takes
    conditional action. So far, so good.
    In the multi-threaded version of the program, the exit codes are not
    getting posted and the main process is not terminating cleanly, at
    least from the perspective of the job scheduler -- it appears to run
    until manually terminated from the scheduler control console although
    the NT command window disappears from the target desktop as expected.
    Well for one thing, sys.exit() doesn't do what you think it does.
    Sys.exit() raises an exception SystemExit which if not caught will cause the
    thread to exit silently. Read the docs for thread which threading is built
    on. I don't know of a way to catch an exception that another thread raises.
    Doesn't mean there isn't one, just that I don't know one.

    Another thing you look at is the Queue module. That's what got my attention
    because you are setting a global lock and poping stuff off of a fifo stack,
    and Queue does all that for you. Here's some code:

    import Queue
    import thread #yes, I know I'm supposed to use threading

    def worker(jobqueue):
    while 1:
    job = jobqueue.get() # this blocks until a job is available
    print job # or whatever

    def main(joblist):
    jq = Queue.Queue(0)
    for i in range(10):
    thread.start_new_thread(worker,(jq,))
    for job in joblist:
    jq.put(job)

    There are lots of things that could be prettified in this code, but I hope
    it gives you some ideas. You might try a second Queue to gather return
    information.

    Good luck,
    David
  • David Fisher at Apr 1, 2000 at 6:40 pm
    ----- Original Message -----
    From: "Bob Dickenson" <bdickenson at serviceware.com>
    To: "'David Fisher'" <python at rose164.wuh.wustl.edu>; "Bob Dickenson"
    <bdickenson at serviceware.com>
    Cc: <python-list at python.org>
    Sent: Saturday, April 01, 2000 9:56 AM
    Subject: RE: Python Threading and sys.exit() codes

    These are good suggestions. A couple of questions/comments.
    You got lucky, normally my suggestions are pretty lame. :)
    I thought about using the Queue model, but I was experimenting with MxStack
    at the time. It seemed to work fine, so...

    I think I need the global lock to get proper thread sync because I have to
    pop values off two stacks for each "job" - the file to be ftp'd and it's
    time stamp on the source machine (for resetting it locally after the
    ftp.retrbinary so that the next time the job runs it does valid date
    compares.) If I'm missing something with this logic, I'd appreciate a
    clarification. (I suppose I could compose a tuple and push/pop that, but
    otherwise....)
    Yes, that was exactly what I was thinking of, pushing a tuple or list. Hey,
    if you've already done the work and it's doing the job, don't mess with it.
    I just think Queues are cool. I'll attach a program that was inspired by
    your first post.
    I have read the thread doc and know that the sys.exit() in the FTPthread
    class def is bogus on my part--part of the "prettifying" the code needs (mea
    culpa).

    It's the sys.exit() (explicit or implied) from the __main__ thread after the
    iterative thread.join of the ftp worker threads finishes that seems to be
    causing me problems. The process isn't "finishing" from the perspective of
    the job scheduler. The single-threaded version surfaces both an implied or
    explicit exit code to the OS, the multi-threaded version does not surface
    anything. The SMTP message gets sent (last item before the sys.exit() in
    __main__) AND the command shell window closes, but something seems to be at
    loose ends on the process termination cleanup side.
    Well one thing you might try it to call setDaemon(1) for the worker threads
    before start(). In threading.py sys.exitfunc() is overriden for the main
    thread. Sys.exitfunc() is called by the interpreter for cleanup after a
    sys.exit(). It sounds like your program is hanging inside there. Setting
    all the threads to daemonic should bypass the logic in the new
    sys.exitfunc(). I'm still a little confused though. The only thing
    sys.exitfunc() seems to do it join() to all the non-daemon threads, and
    you've already done that, so it shouldn't hang. But it's worth a try. If
    it were me, I would hack threading.py to not override sys.exitfunc() to see
    if that fixed the problem.
    (BTW - the single threaded version of this program had about 60 files per
    minute throughput on the ftp; with 12 worker threads I'm getting between
    800-1000 files per minute. The flux rate on the target site is several
    thousand files per day, so it's really worth it to use the multi-threaded
    version)
    Yeah, threads kick butt.
    After your first post, I got to thinkin' about Queue and threads and wrote
    this program to scan the internet randomly for NNTP servers that allow
    public access. So thanks, I got some inspiration from you.

    Good luck,
    David

    from nntplib import NNTP
    from Queue import Queue
    import thread
    import whrandom
    ##import daemonize

    def checkserv(servq,repq):
    while 1:
    serv = servq.get()
    try:
    nn = NNTP(serv)
    except:
    repq.put((serv,''))
    else:
    repq.put((serv,'connect'))

    def randurl(servq):
    rip = str(whrandom.randint(0,255))
    for i in range(3):
    rip = rip + '.' + str(whrandom.randint(0,255))
    try:
    rurl = socket.gethostbyaddr(rip)[0]
    except:
    servq.put(rip)
    else:
    servq.put(rurl)

    def main():
    N_THREADS = 100
    ## daemonize.become_daemon()
    f = open('openserv.txt','w')
    servq = Queue(0)
    repq = Queue(0)
    for i in range(N_THREADS * 2):
    thread.start_new_thread(randurl,(servq,))
    for i in range(N_THREADS):
    thread.start_new_thread(checkserv,(servq,repq))
    while 1:
    resp = repq.get()
    if resp[1]:
    f.write(`resp[0]`)
    f.flush()
    thread.start_new_thread(randurl,(servq,))

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouppython-list @
categoriespython
postedMar 31, '00 at 7:26p
activeApr 1, '00 at 6:40p
posts4
users3
websitepython.org

People

Translate

site design / logo © 2022 Grokbase