Skip to content

Instantly share code, notes, and snippets.

@mithrandi
Created October 7, 2015 19:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mithrandi/b5d8d4a0887385797b19 to your computer and use it in GitHub Desktop.
Save mithrandi/b5d8d4a0887385797b19 to your computer and use it in GitHub Desktop.
trial / threadpool interaction
from twisted.internet import reactor
from twisted.internet.defer import gatherResults
from twisted.internet.threads import deferToThread
from twisted.trial.unittest import TestCase
from time import sleep
class Case(TestCase):
def test_threads(self):
return start()
def counter(x):
c = 0
while True:
c += x
print c
sleep(1)
def start():
print reactor.getThreadPool().started
d1 = deferToThread(counter, 2)
d2 = deferToThread(counter, 5)
return gatherResults([d1, d2], consumeErrors=True)
if __name__ == '__main__':
reactor.callWhenRunning(start)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment