Skip to content

Instantly share code, notes, and snippets.

@Lokaltog
Created March 12, 2013 14:37
Show Gist options
  • Save Lokaltog/5143402 to your computer and use it in GitHub Desktop.
Save Lokaltog/5143402 to your computer and use it in GitHub Desktop.
Powerline client-server memoization test
from multiprocessing.connection import Client
import time
for i in range(1, 10):
conn = Client('/tmp/pl-mp-test')
print('**** Query server')
conn.send(['f1', dict(param='test')])
conn.send(['f2', dict()])
try:
while True:
data = conn.recv()
if data:
func, param, result = data
print('[{0}] Get result [param={1}]: {2}'.format(func, param, result))
except EOFError:
print('**** No more results')
finally:
print('**** Closing connection\n')
conn.close()
time.sleep(5)
from multiprocessing.connection import Listener
from functools import wraps
from threading import Thread
from queue import Queue
import time
def default_cache_key(**kwargs):
return frozenset(kwargs.items())
class async(object):
def __init__(self, timeout, cache_key=default_cache_key):
self.q = Queue()
self.timeout = timeout
self.active = True
self.cache = {}
self.cache_key = cache_key
def __call__(self, func):
@wraps(func)
def decorated_func(**kwargs):
key = self.cache_key(**kwargs)
try:
cached = self.cache.get(key, None)
except TypeError:
return func(**kwargs)
if cached is None or time.time() - cached['time'] > self.timeout:
t = Thread(target=self._run_async, args=(func,), kwargs=kwargs)
t.daemon = True
t.start()
cached = self.cache[key] = {
'result': cached['result'] if cached else None,
'time': time.time(),
}
while not self.q.empty():
cached['result'] = self.q.get_nowait()
return cached['result']
return decorated_func
def _run_async(self, func, **kwargs):
self.q.put_nowait(func(**kwargs))
@async(timeout=10)
def f1(param='whatever'):
print('[f1] Doing some heavy lifting')
time.sleep(3)
print('[f1] Done!')
return ['f1', param, time.strftime('%H:%M:%S')]
@async(timeout=10)
def f2(param='whatever'):
print('[f2] Doing some heavy lifting')
time.sleep(3)
print('[f2] Done!')
return ['f2', param, time.strftime('%H:%M:%S')]
listener = Listener('/tmp/pl-mp-test')
while True:
conn = listener.accept()
print('**** Got connection')
try:
while True:
if conn.poll():
func, kwargs = conn.recv()
print('[server] [{0}] Running func with kwargs: {1}'.format(func, kwargs))
result = locals()[func](**kwargs)
print('[server] [{0}] Sending result to client: {1}'.format(func, result))
conn.send(result)
else:
raise EOFError
except EOFError:
print('**** No more results')
finally:
print('**** Closing connection\n')
conn.close()
listener.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment