Skip to content

Instantly share code, notes, and snippets.

@josemarcosrf
Last active October 13, 2023 16:14
Show Gist options
  • Save josemarcosrf/6ce2037ae927f1c519ef4c96b7a069b2 to your computer and use it in GitHub Desktop.
Save josemarcosrf/6ce2037ae927f1c519ef4c96b7a069b2 to your computer and use it in GitHub Desktop.
Python helper functions, decorators and context managers
import signal
from contextlib import contextmanager
from time import perf_counter
@contextmanager
def timeout(duration: int):
def timeout_handler(signum, frame):
raise Exception(f"Block timed out after {duration} seconds")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
try:
yield
finally:
signal.alarm(0)
class catchtime:
def __enter__(self):
self.time = perf_counter()
return self
def __exit__(self, type, value, traceback):
self.time = perf_counter() - self.time
self.readout = f"⏳️ Done in: {self.time:.3f} seconds"
rprint(self.readout)
import asyncio
import os
import re
import requests
import yaml
import numpy as np
from concurrent.futures.thread import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry # noqa
from itertools import chain
from typing import Any
from typing import Dict
from typing import Callable
from typing import List
from typing import Text
from typing import Tuple
from typing import Union
REPL_1 = re.compile('(.)([A-Z][a-z]+)')
REPL_2 = re.compile('([a-z0-9])([A-Z])')
def camel_to_snake(name):
name = re.sub(REPL_1, r'\1_\2', name)
return re.sub(REPL_2, r'\1_\2', name).lower()
def read_config(conf_file: Text) -> Dict[Any, Any]:
def env_replace(elem: Any) -> Any:
if isinstance(elem, dict):
return {k: env_replace(v) for k, v in elem.items()}
elif isinstance(elem, list):
return [env_replace(v) for v in elem]
elif isinstance(elem, str) and elem.startswith("$"):
var_name = elem[1:]
return os.getenv(var_name) # set to None if it doesn't exist
else:
return elem
with open(conf_file, "r") as f:
config = yaml.safe_load(f) or {}
return env_replace(config)
def configure_request_session(
total: int = 10, connect: int = 3, backoff: int = 1
) -> requests.Session:
session = requests.Session()
retries = Retry(total=total, connect=connect, backoff_factor=backoff)
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
async def parallel_requests(
func_calls: List[Tuple[Union[Text, int], Callable, Tuple[Any]]],
max_workers: int = 10,
):
"""Calls a series of functions in parallel.
Args:
func_calls (List[Tuple[Union[Text, int], Callable, Tuple[Any]]]):
list of tuples; (function ID, function, function parameters)
max_workers (int, optional): [description]. Defaults to 10.
"""
def mapping_func_call(func_id, func, *args, **kwargs):
return {func_id: func(*args, **kwargs)}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Initialize the event loop
loop = asyncio.get_event_loop()
# Create all tasks
tasks = [
loop.run_in_executor(
executor, mapping_func_call, *tuple([f_id, func, *f_params])
)
for f_id, func, f_params in func_calls
]
# Initializes the tasks to run and awaits their results
results = {}
for response in await asyncio.gather(*tasks):
results.update(response)
return results
def flatten(lol: List[List[Any]]) -> List[Any]:
return list(chain.from_iterable(lol))
def as_batch(x: Union[List, np.ndarray]) -> np.ndarray:
x = np.array(x)
if x.ndim < 2:
x = np.expand_dims(x, axis=0)
return x
class dotdict(dict):
""" dot.notation access to dictionary attributes """
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
import asyncio
import time
import signal
import numpy as np
from functools import wraps
from typing import Callable
from pstats import SortKey
from pstats import Stats
def profiled(*filters):
def wrapper(func: Callable) -> Callable:
@wraps(func)
def profile_wrapper(*args, **kwargs):
# https://docs.python.org/3/library/profile.html#module-cProfile
with Profile(builtins=False) as profile:
res = func(*args, **kwargs)
(
Stats(profile)
.strip_dirs()
.sort_stats(SortKey.CUMULATIVE)
.print_stats(*filters) # print top 10% by cumulative time
)
return res
return profile_wrapper
return wrapper
def force_async(fn):
"""
turns a sync function to async function using threads
"""
from concurrent.futures import ThreadPoolExecutor
import asyncio
pool = ThreadPoolExecutor()
@wraps(fn)
def wrapper(*args, **kwargs):
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
def force_sync(fn):
"""
turn an async function to sync function
"""
import asyncio
@wraps(fn)
def wrapper(*args, **kwargs):
res = fn(*args, **kwargs)
if asyncio.iscoroutine(res):
return asyncio.get_event_loop().run_until_complete(res)
return res
return wrapper
def fire_and_forget(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)
return wrapped
def as_numpy_array(func, dtype=np.float32):
@wraps(func)
def arg_wrapper(self, *args, **kwargs):
r = func(self, *args, **kwargs)
r_type = type(r).__name__
if r_type in {"ndarray", "EagerTensor", "Tensor", "list"}:
return np.array(r, dtype)
else:
raise TypeError("unrecognized type {}: {}".format(r_type, type(r)))
return arg_wrapper
def as_numpy_batch(func, dtype=np.float32):
@wraps(func)
def arg_wrapper(self, *args, **kwargs):
r = func(self, *args, **kwargs)
r_type = type(r).__name__
if r_type in {"ndarray", "EagerTensor", "Tensor", "list"}:
r = np.array(r, dtype)
else:
raise TypeError("unrecognized type {}: {}".format(r_type, type(r)))
if r.ndim < 2:
r = np.expand_dims(r, axis=0)
return r
return arg_wrapper
def batched(batch_size):
def wrapper(func):
@wraps(func)
def batcher(*args, **kwargs):
long_list = args[0]
n = len(long_list)
start = 0
end = start + batch_size
chunk = long_list[start:end]
while chunk:
print(f"sending from {start} to {end}")
func(chunk)
start = end
end += batch_size
chunk = long_list[start:end]
return batcher
return wrapper
def f_timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")
# Register the signal function handler
signal.signal(signal.SIGALRM, handler)
# Define a timeout for your function
signal.alarm(timeout_secs)
result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)
return result
return time_limited
return wrapper
def timeit(func: Callable) -> Callable:
@wraps(func)
def timed_wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
delta = time.time() - start
return res, delta
return timed_wrapper
@josemarcosrf
Copy link
Author

Added a timeout wrapper to interrupt a function's execution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment