Skip to content

Instantly share code, notes, and snippets.

@dfee
Last active May 17, 2017 22:41
Show Gist options
  • Save dfee/e1dc510c24cedf7f79fb9e34024cda5f to your computer and use it in GitHub Desktop.
Save dfee/e1dc510c24cedf7f79fb9e34024cda5f to your computer and use it in GitHub Desktop.
Guarding functions in asphalt that should only run in an executor.
import asyncio
from asphalt.core import Context
import functools
def guard_executor(func):
no_ctx_msg = ('Context must be the first arg, or an instance variable of '
'`self` (if wrapping an instance method).')
not_in_executor_msg = '{} must be run in an executor'.format(func.__name__)
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
ctx = next(arg for arg in args[:2] if isinstance(arg, Context))
except StopIteration:
if len(args):
ctx = getattr(args[0], 'ctx', None)
if not isinstance(ctx, Context):
raise RuntimeError(no_ctx_msg)
else:
raise RuntimeError(no_ctx_msg)
# Assert in executor
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = None
if loop == ctx.loop:
raise RuntimeError(not_in_executor_msg)
return func(*args, **kwargs)
return wrapper
from asphalt.core import executor
import pytest
from spark.core.utils import guard_executor
class GuardKls:
def __init__(self, **kwargs):
for key, val in kwargs.items():
setattr(self, key, val)
self.called = False
@guard_executor
def run(self, *args):
self.called = True
@guard_executor
def guard_fn(*args, **kwargs):
return True
class TestGuardExecutor:
@pytest.mark.asyncio
async def test_class_with_ctx(self, ctx):
guarded = GuardKls(ctx=ctx)
await executor(guarded.run)()
assert guarded.called
@pytest.mark.asyncio
async def test_class_without_ctx(self, ctx):
guarded = GuardKls()
await executor(guarded.run)(ctx)
assert guarded.called
@pytest.mark.asyncio
async def test_class_without_ctx_raises(self, ctx):
guarded = GuardKls()
with pytest.raises(RuntimeError) as excinfo:
await executor(guarded.run)()
assert excinfo.value.args[0] == (
'Context must be the first arg, or an instance variable of '
'`self` (if wrapping an instance method).'
)
assert not guarded.called
@pytest.mark.asyncio
async def test_function(self, ctx):
result = await executor(guard_fn)(ctx)
assert result
@pytest.mark.asyncio
async def test_function_without_ctx_raises(self):
with pytest.raises(RuntimeError) as excinfo:
await executor(guard_fn)()
assert excinfo.value.args[0] == (
'Context must be the first arg, or an instance variable of '
'`self` (if wrapping an instance method).'
)
def test_in_loop_thread_raises(self, ctx):
with pytest.raises(RuntimeError) as excinfo:
result = guard_fn(ctx)
assert excinfo.value.args[0] == 'guard_fn must be run in an executor'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment