Skip to content

Instantly share code, notes, and snippets.

@alecgorge
Created March 20, 2024 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alecgorge/63e52b764b10adc46d61b1773f9a2085 to your computer and use it in GitHub Desktop.
Save alecgorge/63e52b764b10adc46d61b1773f9a2085 to your computer and use it in GitHub Desktop.
Example of how to use Temporal's asyncio based SDK in a gevent setting. Uses a separate native thread to execute asyncio code and keep it away from gevent. Activities are still executed on the gevent thread. Tested (but not extensively) with real world workload activities that make heavy use of gevent patched functionality.
import functools
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Generic, Tuple, TypeVar
from gevent.threadpool import ThreadPoolExecutor as NativeThreadPoolExecutor # type: ignore[attr-defined]
from typing_extensions import ParamSpec
logger = logging.getLogger(__name__)
T = TypeVar('T')
P = ParamSpec('P')
def gevent_future_to_native_future(fn: Any) -> Tuple[Future, Any]:
# Gevent's returned futures do not map well to Python futures, so we must translate. We can't just use set_result/
# set_exception because done callbacks are not always called in gevent's case and it doesn't seem to support cancel,
# so we instead wrap the caller function.
python_fut: Future = Future()
@functools.wraps(fn) # type: ignore[arg-type]
def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None:
try:
result = fn(*w_args, **w_kwargs)
if not python_fut.cancelled():
# checking cancelled like this is recommended instead of catching the internal InvalidStateException:
# https://bugs.python.org/issue21886
python_fut.set_result(result)
except Exception as exc:
if not python_fut.cancelled():
python_fut.set_exception(exc)
else:
# TODO: should this re-raise or silently swallow?
raise
return python_fut, wrapper
class NativeThreadpoolExecutor(Generic[T], NativeThreadPoolExecutor):
def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> Future:
# print(f'NativeThreadpoolExecutor: fn={fn}, args={args}, kwargs={kwargs}')
python_future, wrapper = gevent_future_to_native_future(fn)
super().submit(wrapper, *args, **kwargs)
return python_future
# Use the gevent monkeypatched executor
class GeventExecutor(Generic[T], ThreadPoolExecutor):
def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> Future: # type: ignore[override]
# print(f'GeventExecutor: fn={fn}, args={args}, kwargs={kwargs}')
python_future, wrapper = gevent_future_to_native_future(fn)
super().submit(wrapper, *args, **kwargs)
return python_future
import asyncio
from asyncio import AbstractEventLoop
from typing import TYPE_CHECKING, Awaitable, Optional, Tuple, TypeVar
import gevent.monkey
_NativeThread = gevent.monkey.get_original('threading', 'Thread') # type: ignore[attr-defined]
if TYPE_CHECKING:
from threading import Thread as NativeThreadType
T = TypeVar('T')
def run_task_on_asyncio_thread(coro: Awaitable[T]) -> T:
loop, _ = _event_loop_thread()
"""
I am *pretty* sure that this will yield properly to gevent. In testing it works exactly as I expect.
Internally, Future.result() uses threading.Condition[1]. Gevent patches threading.Condition[2]. As mentioned on a
Stack Overflow post[3], gevent doesn't advertise `concurrent.futures` patching so theoretically this could break.
However, in practice it seems like if it hasn't broken in the last 9 years we should be able to rely on it.
As the comment[4] on the Stack Overflow says:
> Indeed, after implementing and testing, everything works as advertised - or at least implied.
[1]: https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/_base.py#L318
[2]: https://github.com/gevent/gevent/blob/22.10.1/src/gevent/_threading.py#L52
[3]: https://stackoverflow.com/questions/21104177/using-concurrent-futures-future-with-greenlets-gevent
[4]: https://stackoverflow.com/questions/21104177/using-concurrent-futures-future-with-greenlets-gevent/21104754#comment31798369_21104754
"""
return asyncio.run_coroutine_threadsafe(coro, loop).result()
_ASYNCIO_THREAD: 'Optional[NativeThreadType]' = None
_ASYNCIO_LOOP: Optional[AbstractEventLoop] = None
def _event_loop_thread() -> 'Tuple[AbstractEventLoop, NativeThreadType]':
global _ASYNCIO_THREAD, _ASYNCIO_LOOP
assert (_ASYNCIO_LOOP is None) == (_ASYNCIO_THREAD is None)
if _ASYNCIO_LOOP is not None and _ASYNCIO_THREAD is not None:
return _ASYNCIO_LOOP, _ASYNCIO_THREAD
def start_background_loop(loop: AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
eventloop = asyncio.new_event_loop()
thread = _NativeThread(target=start_background_loop, args=(eventloop,), daemon=True)
thread.start()
_ASYNCIO_THREAD = thread
_ASYNCIO_LOOP = eventloop
return eventloop, thread
from typing import Optional, TypeVar
from temporalio.client import Client, WorkflowHandle
from temporalio.types import MethodAsyncSingleParam
from .gevent_asyncio_native_thread_isolated_asyncio import run_task_on_asyncio_thread
from .worker import DEFAULT_TEMPORAL_TASK_QUEUE
TEMPORAL_ADDRESS = '...'
SelfType = TypeVar('SelfType')
ParamType = TypeVar('ParamType')
ReturnType = TypeVar('ReturnType', covariant=True)
def create_temporal_client(temporal_url: str) -> Client:
async def _async_create_temporal_client() -> Client:
return await Client.connect(temporal_url, data_converter=pydantic_data_converter)
return run_task_on_asyncio_thread(_async_create_temporal_client())
# TODO: remove and replace with DI
_SHARED_CLIENT: Optional[Client] = None
def _shared_temporal_client() -> Client:
global _SHARED_CLIENT
if not _SHARED_CLIENT:
_SHARED_CLIENT = create_temporal_client(TEMPORAL_ADDRESS)
return _SHARED_CLIENT
def start_temporal_workflow(
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
arg: ParamType,
id: str,
) -> WorkflowHandle[SelfType, ReturnType]:
return run_task_on_asyncio_thread(
_shared_temporal_client().start_workflow(
workflow=workflow,
arg=arg,
id=id,
task_queue=DEFAULT_TEMPORAL_TASK_QUEUE,
)
)
import asyncio
import logging
import signal
from typing import Optional
import gevent
from temporalio.client import Client
from temporalio.worker import Worker
from .lib.temporal.client import create_temporal_client
from .gevent_asyncio_executors import GeventExecutor, NativeThreadpoolExecutor
from .gevent_asyncio_native_thread_isolated_asyncio import run_task_on_asyncio_thread
DEFAULT_TEMPORAL_TASK_QUEUE = 'default'
ALL_WORKFLOWS = []
ALL_ACTIVITIES = []
logger = logging.getLogger('temporal-worker')
def run_worker(temporal_url: str, task_queue: Optional[str] = None) -> int:
logger.debug('connecting to temporal client')
client = create_temporal_client(temporal_url)
logger.debug('connected!')
run_task_on_asyncio_thread(async_main(client, task_queue or DEFAULT_TEMPORAL_TASK_QUEUE))
return 0
async def async_main(client: Client, task_queue: str) -> None:
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the
# asyncio event. But asyncio calls are not thread safe, so we have to invoke
# it via call_soon_threadsafe.
interrupt_event = asyncio.Event()
gevent.signal_handler(
signal.SIGINT,
asyncio.get_running_loop().call_soon_threadsafe,
interrupt_event.set,
)
# Create an executor for use by Temporal. This cannot be the outer one
# running this async main. The max_workers here needs to have enough room to
# support the max concurrent activities/workflows settings.
with GeventExecutor(max_workers=24) as gevent_executor, NativeThreadpoolExecutor(max_workers=12) as native_executor: # type: ignore[var-annotated]
# Run a worker for the workflow and activities
async with Worker(
client,
task_queue=task_queue,
workflows=ALL_WORKFLOWS,
activities=ALL_ACTIVITIES,
# Execute activities in a gevent greenlet on the main hub
activity_executor=gevent_executor,
# Execute IO free workflows on separate threads with an asyncio event loop
workflow_task_executor=native_executor,
max_concurrent_activities=24,
max_concurrent_workflow_tasks=12,
):
# Wait until interrupted
logger.info('Temporal Worker started, ctrl+c to exit')
await interrupt_event.wait()
logger.info('Shutting down')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment