Skip to content

Instantly share code, notes, and snippets.

@amiller
Created November 11, 2018 06:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amiller/8fdec381d43ccd882e5f62e07135dacd to your computer and use it in GitHub Desktop.
Save amiller/8fdec381d43ccd882e5f62e07135dacd to your computer and use it in GitHub Desktop.
import asyncio
import concurrent.futures
import logging
import time
import cffi
import random
# Examples of running in executor
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# https://pymotw.com/3/asyncio/executors.html
# Use CFFI just for an example, defining a slow CPU-bound routine
ffi = cffi.FFI()
ffi.set_source("_module", """
int doQuadraticWork(int k) {
//
int *buf = malloc(k * sizeof(int));
for (int i = 0; i < k; i++) {
for (int j = 0; j < k; j++) {
buf[i] += k*j;
}
}
int result = buf[0];
free(buf);
return result;
}
""")
ffi.cdef("int doQuadraticWork(int k);")
ffi.compile()
from _module import ffi, lib
def slow_computation(n):
# Invoke a CFFI function that takes a while
log = logging.getLogger('slow_computation({})'.format(n))
log.info('running')
n = lib.doQuadraticWork(random.randint(50000,100000))
log.info('done')
return n
async def test_threads():
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2,
)
loop = asyncio.get_event_loop()
# Wrap a slow computation by calling it in the thread pool
def wrap_slow_computation(n):
return loop.run_in_executor(executor, slow_computation, n)
log = logging.getLogger('test_threads')
log.info('creating executor tasks')
tasks = [wrap_slow_computation(i) for i in range(7)]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s'
)
loop = asyncio.get_event_loop()
loop.run_until_complete(test_threads())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment