Skip to content

Instantly share code, notes, and snippets.

@insomniacslk
Last active March 23, 2016 22:34
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save insomniacslk/ae390e04b8f06cd84286 to your computer and use it in GitHub Desktop.
import os
import sys
import time
if sys.version_info[:2] < (3, 4):
print('Python 3.4+ is required')
sys.exit(os.EX_SOFTWARE)
import ipyparallel as parallel
# Run `ipcluster3 start -n 4` or similar beforehand
client = parallel.Client()
print('Parallel tasks: {p}'.format(p=client.ids))
lview = client.load_balanced_view()
def fibonacci(n):
a, b = 0, 1
for i in range(n):
a, b = b, a + b
return n, a
tick = time.time()
MAX = 100
amr = lview.map(fibonacci, range(MAX), chunksize=4, ordered=False)
pending = set(amr.msg_ids)
new_tasks = []
while pending:
try:
client.wait(pending, 1e-3)
except parallel.TimeoutError:
pass
finished = pending.difference(client.outstanding)
pending = pending.difference(finished)
for msgid in finished:
ares = client.get_result(msgid)
print('== Engine ID: {eid} =='.format(eid=ares.engine_id))
try:
for n, fib in ares.get():
print(' fib({n}): {f}'.format(
n=n,
f=fib,
))
if n % 10 == 0:
print('Injecting new job at n == {n}, '
'computing fibonacci({m})'.format(n=n, m=MAX + n + 1))
new_ares = lview.map(fibonacci, [MAX + n + 1])
pending = pending.union(set(new_ares.msg_ids))
new_tasks.extend(new_ares.msg_ids)
except Exception:
import traceback
traceback.print_exc()
import pdb
pdb.set_trace()
print(new_tasks)
print('Total time: {t}'.format(t=time.time() - tick))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment