-
-
Save jimbaker/169945e9ae42ce85409e to your computer and use it in GitHub Desktop.
Python queue question on jython-users
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ python test_queue.py | |
Made it to 'finally' in consumer 0 | |
Made it to 'finally' in consumer 1 | |
Made it to 'finally' in consumer 2 | |
Made it to 'finally' in consumer 3 | |
Made it to 'finally' in populator 0 | |
Made it to 'finally' in populator 1 | |
Made it to 'finally' in populator 2 | |
Waiting to join 3 populator threads | |
Adding sentinel values to end of queue | |
Waiting to join consumer/writer threads | |
Waiting on a consumer/writer | |
Waiting on a consumer/writer | |
Waiting on a consumer/writer | |
Waiting on a consumer/writer | |
Waiting to join queue with 4 items | |
# used control-\ to exit | |
$ jython27 test_queue.py | |
Made it to 'finally' in consumer 0 | |
Made it to 'finally' in consumer 1 | |
Made it to 'finally' in consumer 2 | |
Made it to 'finally' in consumer 3 | |
Made it to 'finally' in populator 0 | |
Made it to 'finally' in populator 1 | |
Waiting to join 3 populator threads | |
Made it to 'finally' in populator 2 | |
Adding sentinel values to end of queue | |
Waiting to join consumer/writer threads | |
Waiting on a consumer/writer | |
Waiting on a consumer/writer | |
Waiting on a consumer/writer | |
Waiting on a consumer/writer | |
Waiting to join queue with 4 items | |
# used control-c to exit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time | |
import threading | |
import Queue | |
class PopulatorThread(threading.Thread): | |
def __init__(self, mod, mods, queue): | |
super(PopulatorThread, self).__init__() | |
self.mod = mod | |
self.mods = mods | |
self.queue = queue | |
def run(self): | |
# Create db connection | |
# ... | |
try: | |
pass | |
# Select one segment of records using 'id % mods = mod' | |
# Process these records & slap them onto the queue | |
# ... | |
except: | |
#con.rollback() | |
raise | |
finally: | |
print "Made it to 'finally' in populator %d" % self.mod | |
#con.close() | |
class ConsumerThread(threading.Thread): | |
def __init__(self, mod, queue): | |
super(ConsumerThread, self).__init__() | |
self.mod = mod | |
self.queue = queue | |
def run(self): | |
# Create db connection | |
# ... | |
try: | |
pass | |
# Pull records off the queue and put them into | |
# a different database | |
# ... | |
except: | |
#con.rollback() | |
raise | |
finally: | |
print "Made it to 'finally' in consumer %d" % self.mod | |
#con.close() | |
def main(argv): | |
tread1Count = 3 | |
tread2Count = 4 | |
# This is the notefactsselector data queue | |
nfsQueue = Queue.Queue() | |
# Start consumer/writer threads | |
j = 0 | |
treads2 = [] | |
while j < tread2Count: | |
treads2.append(ConsumerThread(j, nfsQueue)) | |
treads2[-1].start() | |
j += 1 | |
# Start reader/populator threads | |
i = 0 | |
treads1 = [] | |
while i < tread1Count: | |
treads1.append(PopulatorThread(i, tread1Count, nfsQueue)) | |
treads1[-1].start() | |
i += 1 | |
# Wait for reader/populator threads | |
print "Waiting to join %d populator threads" % len(treads1) | |
i = 0 | |
for tread in treads1: | |
"Waiting to join a populator thread %d" % i | |
tread.join() | |
i += 1 | |
#Add one sentinel value to queue for each write thread | |
print "Adding sentinel values to end of queue" | |
for tread in treads2: | |
nfsQueue.put(None) | |
# Wait for consumer/writer threads | |
print "Waiting to join consumer/writer threads" | |
for tread in treads2: | |
print "Waiting on a consumer/writer" | |
tread.join() | |
# Wait for Queue | |
print "Waiting to join queue with %d items" % nfsQueue.qsize() | |
nfsQueue.join() | |
print "Queue has been joined" | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment