Created
March 20, 2024 15:45
-
-
Save alecgorge/63e52b764b10adc46d61b1773f9a2085 to your computer and use it in GitHub Desktop.
Example of how to use Temporal's asyncio based SDK in a gevent setting. Uses a separate native thread to execute asyncio code and keep it away from gevent. Activities are still executed on the gevent thread. Tested (but not extensively) with real world workload activities that make heavy use of gevent patched functionality.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import logging | |
from concurrent.futures import Future, ThreadPoolExecutor | |
from typing import Any, Callable, Generic, Tuple, TypeVar | |
from gevent.threadpool import ThreadPoolExecutor as NativeThreadPoolExecutor # type: ignore[attr-defined] | |
from typing_extensions import ParamSpec | |
logger = logging.getLogger(__name__) | |
T = TypeVar('T') | |
P = ParamSpec('P') | |
def gevent_future_to_native_future(fn: Any) -> Tuple[Future, Any]: | |
# Gevent's returned futures do not map well to Python futures, so we must translate. We can't just use set_result/ | |
# set_exception because done callbacks are not always called in gevent's case and it doesn't seem to support cancel, | |
# so we instead wrap the caller function. | |
python_fut: Future = Future() | |
@functools.wraps(fn) # type: ignore[arg-type] | |
def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None: | |
try: | |
result = fn(*w_args, **w_kwargs) | |
if not python_fut.cancelled(): | |
# checking cancelled like this is recommended instead of catching the internal InvalidStateException: | |
# https://bugs.python.org/issue21886 | |
python_fut.set_result(result) | |
except Exception as exc: | |
if not python_fut.cancelled(): | |
python_fut.set_exception(exc) | |
else: | |
# TODO: should this re-raise or silently swallow? | |
raise | |
return python_fut, wrapper | |
class NativeThreadpoolExecutor(Generic[T], NativeThreadPoolExecutor): | |
def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> Future: | |
# print(f'NativeThreadpoolExecutor: fn={fn}, args={args}, kwargs={kwargs}') | |
python_future, wrapper = gevent_future_to_native_future(fn) | |
super().submit(wrapper, *args, **kwargs) | |
return python_future | |
# Use the gevent monkeypatched executor | |
class GeventExecutor(Generic[T], ThreadPoolExecutor): | |
def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> Future: # type: ignore[override] | |
# print(f'GeventExecutor: fn={fn}, args={args}, kwargs={kwargs}') | |
python_future, wrapper = gevent_future_to_native_future(fn) | |
super().submit(wrapper, *args, **kwargs) | |
return python_future |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio import AbstractEventLoop | |
from typing import TYPE_CHECKING, Awaitable, Optional, Tuple, TypeVar | |
import gevent.monkey | |
_NativeThread = gevent.monkey.get_original('threading', 'Thread') # type: ignore[attr-defined] | |
if TYPE_CHECKING: | |
from threading import Thread as NativeThreadType | |
T = TypeVar('T') | |
def run_task_on_asyncio_thread(coro: Awaitable[T]) -> T: | |
loop, _ = _event_loop_thread() | |
""" | |
I am *pretty* sure that this will yield properly to gevent. In testing it works exactly as I expect. | |
Internally, Future.result() uses threading.Condition[1]. Gevent patches threading.Condition[2]. As mentioned on a | |
Stack Overflow post[3], gevent doesn't advertise `concurrent.futures` patching so theoretically this could break. | |
However, in practice it seems like if it hasn't broken in the last 9 years we should be able to rely on it. | |
As the comment[4] on the Stack Overflow says: | |
> Indeed, after implementing and testing, everything works as advertised - or at least implied. | |
[1]: https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/_base.py#L318 | |
[2]: https://github.com/gevent/gevent/blob/22.10.1/src/gevent/_threading.py#L52 | |
[3]: https://stackoverflow.com/questions/21104177/using-concurrent-futures-future-with-greenlets-gevent | |
[4]: https://stackoverflow.com/questions/21104177/using-concurrent-futures-future-with-greenlets-gevent/21104754#comment31798369_21104754 | |
""" | |
return asyncio.run_coroutine_threadsafe(coro, loop).result() | |
_ASYNCIO_THREAD: 'Optional[NativeThreadType]' = None | |
_ASYNCIO_LOOP: Optional[AbstractEventLoop] = None | |
def _event_loop_thread() -> 'Tuple[AbstractEventLoop, NativeThreadType]': | |
global _ASYNCIO_THREAD, _ASYNCIO_LOOP | |
assert (_ASYNCIO_LOOP is None) == (_ASYNCIO_THREAD is None) | |
if _ASYNCIO_LOOP is not None and _ASYNCIO_THREAD is not None: | |
return _ASYNCIO_LOOP, _ASYNCIO_THREAD | |
def start_background_loop(loop: AbstractEventLoop) -> None: | |
asyncio.set_event_loop(loop) | |
loop.run_forever() | |
eventloop = asyncio.new_event_loop() | |
thread = _NativeThread(target=start_background_loop, args=(eventloop,), daemon=True) | |
thread.start() | |
_ASYNCIO_THREAD = thread | |
_ASYNCIO_LOOP = eventloop | |
return eventloop, thread |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Optional, TypeVar | |
from temporalio.client import Client, WorkflowHandle | |
from temporalio.types import MethodAsyncSingleParam | |
from .gevent_asyncio_native_thread_isolated_asyncio import run_task_on_asyncio_thread | |
from .worker import DEFAULT_TEMPORAL_TASK_QUEUE | |
TEMPORAL_ADDRESS = '...' | |
SelfType = TypeVar('SelfType') | |
ParamType = TypeVar('ParamType') | |
ReturnType = TypeVar('ReturnType', covariant=True) | |
def create_temporal_client(temporal_url: str) -> Client: | |
async def _async_create_temporal_client() -> Client: | |
return await Client.connect(temporal_url, data_converter=pydantic_data_converter) | |
return run_task_on_asyncio_thread(_async_create_temporal_client()) | |
# TODO: remove and replace with DI | |
_SHARED_CLIENT: Optional[Client] = None | |
def _shared_temporal_client() -> Client: | |
global _SHARED_CLIENT | |
if not _SHARED_CLIENT: | |
_SHARED_CLIENT = create_temporal_client(TEMPORAL_ADDRESS) | |
return _SHARED_CLIENT | |
def start_temporal_workflow( | |
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], | |
arg: ParamType, | |
id: str, | |
) -> WorkflowHandle[SelfType, ReturnType]: | |
return run_task_on_asyncio_thread( | |
_shared_temporal_client().start_workflow( | |
workflow=workflow, | |
arg=arg, | |
id=id, | |
task_queue=DEFAULT_TEMPORAL_TASK_QUEUE, | |
) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import signal | |
from typing import Optional | |
import gevent | |
from temporalio.client import Client | |
from temporalio.worker import Worker | |
from .lib.temporal.client import create_temporal_client | |
from .gevent_asyncio_executors import GeventExecutor, NativeThreadpoolExecutor | |
from .gevent_asyncio_native_thread_isolated_asyncio import run_task_on_asyncio_thread | |
DEFAULT_TEMPORAL_TASK_QUEUE = 'default' | |
ALL_WORKFLOWS = [] | |
ALL_ACTIVITIES = [] | |
logger = logging.getLogger('temporal-worker') | |
def run_worker(temporal_url: str, task_queue: Optional[str] = None) -> int: | |
logger.debug('connecting to temporal client') | |
client = create_temporal_client(temporal_url) | |
logger.debug('connected!') | |
run_task_on_asyncio_thread(async_main(client, task_queue or DEFAULT_TEMPORAL_TASK_QUEUE)) | |
return 0 | |
async def async_main(client: Client, task_queue: str) -> None: | |
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the | |
# asyncio event. But asyncio calls are not thread safe, so we have to invoke | |
# it via call_soon_threadsafe. | |
interrupt_event = asyncio.Event() | |
gevent.signal_handler( | |
signal.SIGINT, | |
asyncio.get_running_loop().call_soon_threadsafe, | |
interrupt_event.set, | |
) | |
# Create an executor for use by Temporal. This cannot be the outer one | |
# running this async main. The max_workers here needs to have enough room to | |
# support the max concurrent activities/workflows settings. | |
with GeventExecutor(max_workers=24) as gevent_executor, NativeThreadpoolExecutor(max_workers=12) as native_executor: # type: ignore[var-annotated] | |
# Run a worker for the workflow and activities | |
async with Worker( | |
client, | |
task_queue=task_queue, | |
workflows=ALL_WORKFLOWS, | |
activities=ALL_ACTIVITIES, | |
# Execute activities in a gevent greenlet on the main hub | |
activity_executor=gevent_executor, | |
# Execute IO free workflows on separate threads with an asyncio event loop | |
workflow_task_executor=native_executor, | |
max_concurrent_activities=24, | |
max_concurrent_workflow_tasks=12, | |
): | |
# Wait until interrupted | |
logger.info('Temporal Worker started, ctrl+c to exit') | |
await interrupt_event.wait() | |
logger.info('Shutting down') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment