Created
June 4, 2022 03:00
-
-
Save darkerego/119990a990ca0828c3571aec7eaa239e to your computer and use it in GitHub Desktop.
Python Function Helpers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import threading | |
import time | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import List, Any | |
class AsyncThreadPool(ThreadPoolExecutor): | |
_futures: List[asyncio.Future] | |
_loop: asyncio.AbstractEventLoop | |
def __init__(self, max_workers=None): | |
super().__init__(max_workers) | |
self._futures = [] | |
def queue(self, fn): | |
self._loop = asyncio.get_event_loop() | |
fut = self._loop.create_future() | |
self._futures.append(fut) | |
self.submit(self._entry, fn, fut) | |
def queueAsync(self, coroutine): | |
def newLoop(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
return loop.run_until_complete(coroutine) | |
self.queue(newLoop) | |
def _entry(self, fn, fut: asyncio.Future): | |
try: | |
result = fn() | |
self._loop.call_soon_threadsafe(fut.set_result, result) | |
except Exception as e: | |
self._loop.call_soon_threadsafe(fut.set_exception, e) | |
async def gather(self) -> List[Any]: | |
return await asyncio.gather(*self._futures) | |
async def run_in_executor(func, *args): | |
""" | |
Wrapper that handles legacy sync functions and coroutines! | |
:param func: function to run | |
:param args: arguments for func | |
:return: results of func | |
""" | |
if asyncio.iscoroutinefunction(func): | |
return await func(*args) | |
loop = asyncio.get_event_loop() | |
return await loop.run_in_executor(None, func, *args) | |
"""async def func_in_event_loop(corofn, *args): | |
loop = asyncio.new_event_loop() | |
try: | |
coro = await corofn(*args) | |
asyncio.set_event_loop(loop) | |
return loop.run_until_complete(coro) | |
finally: | |
loop.close()""" | |
def catch_all(func): | |
def silenceit(*args,**kwargs): | |
try: | |
func(*args, **kwargs) | |
except: | |
print('Error') | |
return silenceit | |
async def func_in_event_loop(func, *args, **kwargs): | |
def wrap(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(func(*args, **kwargs)) | |
return wrap | |
def threaded_event_loop(func): | |
""" | |
Run a coroutine in a thread, inside a new event loop | |
:param func: coroutine | |
:return: None | |
coro = my_async_function() | |
threaded_event_loop(coro) | |
""" | |
def launch(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(func) | |
threading.Thread(target=launch).start() | |
class FunctionTimer: | |
def __init__(self, description=None): | |
self.description = description | |
def func_timer(self, func): | |
'''Decorator that reports the execution time.''' | |
def wrap(*args, **kwargs): | |
start = time.time() | |
result = func(*args, **kwargs) | |
end = time.time() | |
if self.description: | |
print(f'{self.description} : {end-start}') | |
else: | |
print(func.__name__, end - start) | |
return result | |
return wrap |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment