Skip to content

Instantly share code, notes, and snippets.

@geeknam
Created February 20, 2017 03:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save geeknam/e5b4adf0a955748487f383cbe21211bd to your computer and use it in GitHub Desktop.
Save geeknam/e5b4adf0a955748487f383cbe21211bd to your computer and use it in GitHub Desktop.
Serverless async task via SNS
import os
import json
import importlib
import inspect
import boto3
SNS_ARN = os.environ.get('SNS_ARN', None)
def import_and_get_task(task_path):
"""
Given a modular path to a function, import that module
and return the function.
"""
module, function = task_path.rsplit('.', 1)
app_module = importlib.import_module(module)
app_function = getattr(app_module, function)
return app_function
def route_task(event, context):
"""
Gets SNS Message, deserialises the message,
imports the function, calls the function with args
"""
record = event['Records'][0]
message = json.loads(
record['Sns']['Message']
)
func = import_and_get_task(
message['task_path'])
return func(
*message['args'], **message['kwargs']
)
def send_async_task(task_path, *args, **kwargs):
"""
Send a SNS message to a specified SNS topic
Serialise the func path and arguments
"""
client = boto3.client('sns')
message = {
'task_path': task_path,
'args': args,
'kwargs': kwargs
}
return client.publish(
TargetArn=SNS_ARN,
Message=json.dumps(message),
)
def task():
"""
Async task decorator for a function.
Serialises and dispatches the task to SNS.
Lambda subscribes to SNS topic and gets this message
Lambda routes the message to the same function
Example:
@task()
def my_async_func(*args, **kwargs):
dosomething()
my_async_func.delay(*args, **kwargs)
"""
def _delay(func):
def _delay_inner(*args, **kwargs):
module_path = inspect.getmodule(func).__name__
task_path = '{module_path}.{func_name}'.format(
module_path=module_path,
func_name=func.__name__
)
if SNS_ARN:
return send_async_task(task_path, *args, **kwargs)
return func(*args, **kwargs)
return _delay_inner
def _wrap(func):
func.delay = _delay(func)
return func
return _wrap
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment