Skip to content

Instantly share code, notes, and snippets.

@markrwilliams
Last active June 4, 2016 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markrwilliams/2e40ed0fa06a9e653609dd61fd80ca95 to your computer and use it in GitHub Desktop.
Save markrwilliams/2e40ed0fa06a9e653609dd61fd80ca95 to your computer and use it in GitHub Desktop.
import sys
from threading import Thread
from time import time
from Queue import Queue
def f(a):
return a
def worker(inq, outq):
while True:
try:
startTime, f = inq.get()
f(1)
outq.put(startTime)
finally:
inq.task_done()
def testRun(inq, outq, results):
t1 = 0
t2 = 0
for i in range(10000):
inq.put((time(), f))
try:
startTime = outq.get()
t1 += time() - startTime
startTime = time()
f(1)
t2 += time() - startTime
finally:
outq.task_done()
inq.join()
outq.join()
results.append("threadpool: avg %f us, sync: avg %f us, %.2fx increase" % (
t1/i*1000**2, t2/i*1000**2, (t1/i)/(t2/i)))
def run():
inq, outq = Queue(), Queue()
pool = []
for _ in range(64):
t = Thread(target=worker, args=(inq, outq))
t.daemon = True
t.start()
pool.append(t)
results = []
workers = []
for i in range(int(sys.argv[1])):
t = Thread(target=testRun, args=(inq, outq, results))
t.start()
workers.append(t)
for t in workers:
t.join()
for result in results:
print result
run()
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet.task import react
from time import time
import sys
def func(a):
return a
@inlineCallbacks
def dtt(reactor, pool):
while True:
t1 = 0
t2 = 0
for i in xrange(100000):
st = time()
yield deferToThreadPool(reactor, pool, func, i)
t1 += time()-st
st = time()
func(i)
t2 += time()-st
print ("deferToThread: avg %f us, sync: avg %f us, %.2fx increase" % (
t1/i*1000**2, t2/i*1000**2, (t1/i)/(t2/i)))
def deferredDTT(reactor, pool):
times = [0, 0]
count = 10000
counter = iter(xrange(count))
def tick(ignored):
i = next(counter)
deferredStart = time()
d = deferToThreadPool(reactor, pool, func, i)
d.addCallback(afterTick, i, deferredStart)
d.addCallback(tick)
d.addErrback(afterLastTick)
def afterTick(ignored, i, deferredStart):
times[0] += time()-deferredStart
syncStart = time()
func(i)
times[1] += time()-syncStart
def afterLastTick(failure):
failure.trap(StopIteration)
t1, t2 = times
print ("deferToThread: avg %f us, sync: avg %f us, %.2fx increase" % (
t1/count*1000**2, t2/count*1000**2, (t1/count)/(t2/count)))
reactor.callWhenRunning(deferredDTT, reactor, pool)
tick(None)
def run(reactor, *argv):
pressure = int(argv[1])
if len(argv) > 2 and argv[2] == 'deferred':
dttImplementation = deferredDTT
else:
dttImplementation = dtt
pool = ThreadPool(minthreads=64, maxthreads=64)
pool.start()
reactor.addSystemEventTrigger("before", "shutdown", pool.stop)
for i in range(pressure):
dttImplementation(reactor, pool)
return Deferred()
react(run, sys.argv)
@markrwilliams
Copy link
Author

PyPy vmprof results:

deferredDTT (no inlineCallbacks):
http://vmprof.com/#/01d0b89a510e875e12957407ece70e81

dtt (inlineCallbacks):
http://vmprof.com/#/69bed0aeee29887d117952879bff27af

@markrwilliams
Copy link
Author

inlineCallbacks performs well in the above vmprof trace because it doesn't use twisted.internet.defer.returnValue. See: https://twistedmatrix.com/trac/ticket/6278

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment