Skip to content

Instantly share code, notes, and snippets.

@idettman
Last active March 12, 2020 05:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save idettman/49aec7c1d8f1c397cd06c982f3be2cf7 to your computer and use it in GitHub Desktop.
Save idettman/49aec7c1d8f1c397cd06c982f3be2cf7 to your computer and use it in GitHub Desktop.
Python Multithreading: Queues

Python Multithreading and Queues

A queue is similar to a list:

from Queue import Queue

my_list = []
my_list.append(1)
my_list.append(2)
my_list.append(3)
print my_list.pop(0)
# Outputs: 1

The above code creates a list, assigns it three values, then removes the first value in so the list now has only 2 values (which are 2 and 3).

from Queue import Queue

my_queue = Queue(maxsize=0)
my_queue.put(1)
my_queue.put(2)
my_queue.put(3)
print my_queue.get()
my_queue.task_done()
# Outputs: 1

There are a couple differences in how queues work visually. First we set a maximum size to the queue, where 0 means infinite.

The second visual difference is the task_done() bit at the end. That tells the queue that not only have I retrieved the information from the list, but I’ve finished with it. If I don’t call task_done() then I run into trouble in threading. So let’s just say in Queues, you have to call this.

The big important point about Queues is that they work really well with threading. In fact, you just can’t use lists the way you can use queues in threading. That’s why I’m even bothering to bring them up here.

Here’s an example of a simple program that uses Queues:

from Queue import Queue

def do_stuff(q):
  while not q.empty():
    print q.get()
    q.task_done()

q = Queue(maxsize=0)

for x in range(20):
  q.put(x)

do_stuff(q)

It outputs 0-19. In like the most complicated way possible to output 0-19.

Notice how do_stuff() is just whipping through the whole queue. But what if it was trying to do a big task, or a task that required a lot of waiting? Assume for example that do_stuff() takes 30 second to run each time and it’s just waiting on APIs to return something. The function would take 30 seconds every time it ran, and it would run 20 times so it would take 10 minutes to get through just 20 items.

Enter Python Threading.

from Queue import Queue
from threading import Thread

Threads use a worker function to get stuff done, they run at the same time, and you can pull them all together when they’re done.

So first you need to set up a worker function:

def do_stuff(q):
  while True:
    print q.get()
    q.task_done()

We’re more or less just stealing the function from the last bit except we’re setting it up for an infinite loop (while True). It just means that I want my threads always ready to accept new tasks.

Now I want to create the actual threads and set them running. Before I do that, though, I need to give them a Queue to work with. The Queue doesn’t have to have anything on it, it just needs to be defined so that my threads know what they’ll be working on. Here’s how I set my (10) threads running:

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
  worker = Thread(target=do_stuff, args=(q,))
  worker.setDaemon(True)
  worker.start()

So you see the Queue set up (as “q”), then I define a loop to run the thread creation bits 10 times. The first line in the loop sets up a thread and points it first at the do_stuff function, and then passes it “q” which is the Queue we just defined. Then something about a daemon, and we start the bugger. That’s 10 threads running (remember the infinite loop in do_stuff()?) and waiting for me to put something in the Queue.

The rest of the code is the same as the Queue example so I’m just going to put it all together and let you figure it out:

from Queue import Queue
from threading import Thread

def do_stuff(q):
  while True:
    print q.get()
    q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
  worker = Thread(target=do_stuff, args=(q,))
  worker.setDaemon(True)
  worker.start()

for x in range(100):
  q.put(x)

q.join()

The only bit that should be new is the q.join() bit right at the end. This basically just waits until the queue is empty and all of the threads are done working (which it knows because task_done() will have been called on every element of the queue). If you were running a program in batches, you might use q.join() to wait for the batch to finish and then write the results to a file, and then just throw more tasks into the queue.

Consider revising the last 3 lines into a loop:

for y in range (10):
  for x in range(100):
    q.put(x + y * 100)
  q.join()
  print "Batch " + str(y) + " Done"

It’s cool that Queues can get added to willy nilly and these Threads will just pick them up, and whenever I want to I can stop and join all of them together for a second so I can check in, maybe write to a file or database or just let the user know that I’m still working away.

Remember the example I gave before about each run of do_stuff() taking 30 seconds? And since I had to run it 20 times it’d take 10 minutes? Now I can just run 20 different threads and the whole program will be done in about 30 seconds rather than 10 minutes. Obviously your results may vary, but it’s definitely faster.

python-threading-timer-repeat-function-every-n-seconds

Q

I want to fire off a function every 0.5 seconds and be able to start and stop and reset the timer. I'm not too knowledgeable of how Python threads work and am having difficulties with the python timer.

However, I keep getting RuntimeError: threads can only be started once when I execute threading.timer.start() twice. Is there a work around for this? I tried applying threading.timer.cancel() before each start.

Pseudo code:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()

A

The best way is to start the timer thread once. Inside your timer thread you'd code the following

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

In the code that started the timer, you can then set the stopped event to stop the timer.

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment