Skip to content

Instantly share code, notes, and snippets.

@jimbaker
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jimbaker/169945e9ae42ce85409e to your computer and use it in GitHub Desktop.
Save jimbaker/169945e9ae42ce85409e to your computer and use it in GitHub Desktop.
Python queue question on jython-users
$ python test_queue.py
Made it to 'finally' in consumer 0
Made it to 'finally' in consumer 1
Made it to 'finally' in consumer 2
Made it to 'finally' in consumer 3
Made it to 'finally' in populator 0
Made it to 'finally' in populator 1
Made it to 'finally' in populator 2
Waiting to join 3 populator threads
Adding sentinel values to end of queue
Waiting to join consumer/writer threads
Waiting on a consumer/writer
Waiting on a consumer/writer
Waiting on a consumer/writer
Waiting on a consumer/writer
Waiting to join queue with 4 items
# used control-\ to exit
$ jython27 test_queue.py
Made it to 'finally' in consumer 0
Made it to 'finally' in consumer 1
Made it to 'finally' in consumer 2
Made it to 'finally' in consumer 3
Made it to 'finally' in populator 0
Made it to 'finally' in populator 1
Waiting to join 3 populator threads
Made it to 'finally' in populator 2
Adding sentinel values to end of queue
Waiting to join consumer/writer threads
Waiting on a consumer/writer
Waiting on a consumer/writer
Waiting on a consumer/writer
Waiting on a consumer/writer
Waiting to join queue with 4 items
# used control-c to exit
import sys
import time
import threading
import Queue
class PopulatorThread(threading.Thread):
def __init__(self, mod, mods, queue):
super(PopulatorThread, self).__init__()
self.mod = mod
self.mods = mods
self.queue = queue
def run(self):
# Create db connection
# ...
try:
pass
# Select one segment of records using 'id % mods = mod'
# Process these records & slap them onto the queue
# ...
except:
#con.rollback()
raise
finally:
print "Made it to 'finally' in populator %d" % self.mod
#con.close()
class ConsumerThread(threading.Thread):
def __init__(self, mod, queue):
super(ConsumerThread, self).__init__()
self.mod = mod
self.queue = queue
def run(self):
# Create db connection
# ...
try:
pass
# Pull records off the queue and put them into
# a different database
# ...
except:
#con.rollback()
raise
finally:
print "Made it to 'finally' in consumer %d" % self.mod
#con.close()
def main(argv):
tread1Count = 3
tread2Count = 4
# This is the notefactsselector data queue
nfsQueue = Queue.Queue()
# Start consumer/writer threads
j = 0
treads2 = []
while j < tread2Count:
treads2.append(ConsumerThread(j, nfsQueue))
treads2[-1].start()
j += 1
# Start reader/populator threads
i = 0
treads1 = []
while i < tread1Count:
treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
treads1[-1].start()
i += 1
# Wait for reader/populator threads
print "Waiting to join %d populator threads" % len(treads1)
i = 0
for tread in treads1:
"Waiting to join a populator thread %d" % i
tread.join()
i += 1
#Add one sentinel value to queue for each write thread
print "Adding sentinel values to end of queue"
for tread in treads2:
nfsQueue.put(None)
# Wait for consumer/writer threads
print "Waiting to join consumer/writer threads"
for tread in treads2:
print "Waiting on a consumer/writer"
tread.join()
# Wait for Queue
print "Waiting to join queue with %d items" % nfsQueue.qsize()
nfsQueue.join()
print "Queue has been joined"
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment