Skip to content

Instantly share code, notes, and snippets.

@blink1073
Last active April 9, 2024 06:57
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blink1073/969aeba85f32c285235750626f2eadd8 to your computer and use it in GitHub Desktop.
Save blink1073/969aeba85f32c285235750626f2eadd8 to your computer and use it in GitHub Desktop.
Wrap an Asynchronous Class
Copyright (c) 2022 Steven Silvester
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import asyncio
import atexit
import functools
import inspect
import threading
from concurrent.futures import wait
from typing import Optional
class TaskRunner:
"""A singleton task runner that runs an asyncio event loop on a background thread."""
__instance = None
@staticmethod
def getInstance():
if TaskRunner.__instance is None:
TaskRunner()
assert TaskRunner.__instance is not None
return TaskRunner.__instance
def __init__(self):
if TaskRunner.__instance is not None:
raise Exception("This class is a singleton!")
else:
TaskRunner.__instance = self
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
self.__runner_thread: Optional[threading.Thread] = None
self.__lock = threading.Lock()
atexit.register(self._close)
def _close(self):
if self.__io_loop:
self.__io_loop.stop()
def _runner(self):
loop = self.__io_loop
assert loop is not None
try:
loop.run_forever()
finally:
loop.close()
def run(self, coro):
"""Synchronously run a coroutine on a background thread."""
with self.__lock:
if self.__io_loop is None:
self.__io_loop = asyncio.new_event_loop()
self.__runner_thread = threading.Thread(target=self._runner, daemon=True)
self.__runner_thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
wait([fut])
return fut.result()
def synchronize_method(async_method, doc=None):
"""Decorate `async_method` so it runs synchronously.
The method runs on an event loop.
:Parameters:
- `async_method`: Unbound method of pymongo Collection, Database,
MongoClient, etc.
- `doc`: Optionally override async_method's docstring
"""
@functools.wraps(async_method)
def method(self, *args, **kwargs):
runner = TaskRunner.getInstance()
coro = async_method(self, *args, **kwargs)
return runner.run(coro)
# This is for the benefit of generating documentation with Sphinx.
method.is_sync_method = True # type: ignore[attr-defined]
if doc is not None:
method.__doc__ = doc
return method
def synchronize_class(kls):
"""Create a synchronous class from an asynchronous one."""
name = kls.__name__.replace("Async", "")
subclass = type(name, (kls,), {})
subclass._async_marker = False
for name in dir(subclass):
if name.startswith('_'):
continue
obj = getattr(subclass, name)
if inspect.iscoroutinefunction(obj):
setattr(subclass, name, synchronize_method(obj))
return subclass
class AsyncMongoClient:
_async_marker = True
async def find_one(self):
kls = AsyncCursor if self._async_marker else Cursor
return kls()
class AsyncCursor:
_async_marker = True
def __iter__(self):
return self
def __next__(self):
raise StopIteration
async def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
async def fetch_something(self):
return 'foo'
MongoClient = synchronize_class(AsyncMongoClient)
Cursor = synchronize_class(AsyncCursor)
client = MongoClient()
print(client)
cursor = client.find_one()
print(cursor.fetch_something())
for item in cursor:
print(item)
print(cursor)
print(str(cursor))
@Teekeks
Copy link

Teekeks commented Aug 9, 2022

Hello!
I would like to use this for my twitchAPI library if your license allows for that, which License is this code using?
Thanks in advance!

@blink1073
Copy link
Author

Hi @Teekeks! I added a BSD 3-Clause license to the gist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment