Skip to content

Instantly share code, notes, and snippets.

@utilitarianexe
Last active December 27, 2015 15:19
Show Gist options
  • Save utilitarianexe/7346938 to your computer and use it in GitHub Desktop.
Save utilitarianexe/7346938 to your computer and use it in GitHub Desktop.
Python Decorator to turn a normal function into a function that splits its work among threads. First argument of original function is a list that is split into pieces for each thread to run separately.
import threading
from functools import wraps
def threadify(num_threads,is_method=True,outputs_list=False,outputs_bool = False,outputs_gen=False):
''' Apply this decorator to a function. It turns that function into a new one that splits its work up among threads. This only works on special functions though. The first argument must be a list. This list will be split into even pieces with each piece passed to a thread. Your function must return either None,a list, or a dictionary. If it is a list make sure you specify outputs list as true. Lists are not guaranteed to return in order so dicts are best in most cases. The return of each thread will automatically be combined for you. The overhead is quite low'''
def threadify_real(func):
def threaded(*args,**kwargs):
threads = []
#need to use array for this even though it is just a value(reference)
#this is because python 2 does not support closures correctly
if outputs_list:
return_data = [[]]
elif outputs_bool:
return_data = [False]
else:
return_data = [{}]
def new_func(*args,**kwargs):
thread_results = func(*args,**kwargs)
if outputs_bool:
if thread_results:
return_data[0] = True
for thread in threads:
try:
thread._Thread__stop()
except:
print(str(thread.getName()) + ' could not be terminated')
return
if outputs_gen or (thread_results is not None and len(thread_results) > 0):
if outputs_list:
return_data[0] = return_data[0] + thread_results
elif outputs_gen:
return_data[0] = itertools.chain(thread_results,return_data[0])
else:
return_data[0].update(thread_results)
if is_method:
tasks = split_job(args[1],num_threads)
else:
tasks = split_job(args[0],num_threads)
for task in tasks:
if is_method:
if len(args[2:]) > 0:
my_args = [args[0],task]+list(args[2:])
else:
my_args = [args[0],task]
else:
if len(args[1:]) > 0:
my_args = [task]+list(args[1:])
else:
my_args = [task]
g1 = threading.Thread(target=new_func,args=my_args,kwargs=kwargs)
g1.start()
threads.append(g1)
for thread in threads:
thread.join()
return return_data[0]
return threaded
return threadify_real
def split_job(job,num_workers):
tasks = [[] for _ in range(num_workers)]
i = 0
for task in job:
tasks[i].append(task)
i = i + 1
if i >= len(tasks):
i = 0
return tasks
#example
@thread_tools.threadify(30,is_method=False)
def resource_feeds(resource_names):
feeds = {}
for name in resource_names:
feeds[name]=get_resource(name,use_json)
return feeds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment