Last active
December 17, 2015 10:58
-
-
Save wuub/5598688 to your computer and use it in GitHub Desktop.
explanation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
## I want a worker that can perform jobs declared in 'tasks' entry_points | |
## of installed packages. | |
## | |
## This allows external services to schedule [package_name].[task_name] | |
## w/o knowledge about internal layout of python package providing the task. | |
## | |
## With this pattern it's also relatively easy to disable support | |
## for arbitrary code execution (think os.unlink) | |
## | |
class UnknownTaskError(RuntimeError): | |
pass | |
class EntryPointsAwareJob(rq.job.Job): | |
"""rq.job.Job subclass that understands entry points""" | |
@property | |
def func(self): | |
func_name = self.func_name | |
if func_name is None: | |
return None | |
if self.instance: | |
return getattr(self.instance, func_name) | |
package_name, task_name = func_name.split('.', 1) | |
try: | |
entry_map = pkg_resources.get_entry_map(package_name, 'tasks') | |
except (pkg_resources.DistributionNotFound): | |
module_name, func_name = func_name.rsplit('.', 1) | |
module = importlib.import_module(module_name) | |
func = getattr(module, func_name) | |
else: | |
try: | |
func = entry_map[task_name].load() | |
except KeyError: | |
raise UnknownTaskError(package_name, task_name) | |
return func | |
## Then in setup.py | |
setup( | |
name='test', | |
entry_points={'tasks': ['fix_something = test.sth.fixers.dangerous:fix_something'} | |
) | |
## ... and finally | |
rq.enqueue_call('tasks.fix_something', ...) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment