Skip to content

Instantly share code, notes, and snippets.

@iissme
iissme / interval_runner.py
Created December 26, 2018 08:32
Simple asyncio interval runner
import asyncio
import logging
from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from functools import partial
from typing import Optional, Any
from uuid import uuid4
from dateutil import tz
from ntech.asyncio_utils import async_task
@iissme
iissme / metaclass_example_ru.py
Last active October 7, 2018 13:26
Metaclass example
"""
Создание класса:
1a. Meta.__prepare__(mcs=<class '__main__.Meta'>, name='Class', bases=(), **{'extra': 1})
Возвращает dictionary-like обьект, который будет использован для создания класса. Можно добавить что-то туда сразу (также доступны параметры из обьявления будущего класса в kwargs).
1b. "Проход" по методам Class, все добавляется в dictionary-like обьект.
2. Meta.__new__(mcs=<class '__main__.Meta'>, name='Class', bases=(), attrs=[added, __module__, __qualname__, __new__, __init__, __str__, __classcell__], **{'extra': 1})
Вызывается с заполненным словарем будущего класса в attrs. Kwargs теже, что и в __prepare__. type создает новый класс.
3. Meta.__init__(cls=<class '__main__.Class'>, name='Class', bases=(), attrs=[added, __module__, __qualname__, __new__, __init__, __str__, __classcell__], **{'extra': 1})
Инициализируется только что созданный класс.
Создание экземпляра:
import asyncio
import weakref
import functools
class AsyncCoroQueueDispatcher:
"""
Dispatcher gets coroutine from its iternal queue,
runs is asynchronously (in background) and waits untill it's done,
then gets next one. It's meant to be embed in another class.
Dispatcher can be easily rewritten for asyncio.PriorityQueue if needed.
@iissme
iissme / sync_call.py
Created May 31, 2017 11:39 — forked from sethrh/sync_call.py
Trick to synchronously call a coroutine from a non-asyncio-aware legacy code. The calling thread can treat the coroutine as a regular (synchronous) function call.
#!/Library/Frameworks/Python.framework/Versions/3.4/bin/python3
"""
Synchronously call an asyncio task/coroutine from a non-asyncio thread.
usage:
# block the current thread until the coroutine yields or returns
value = sync_call(loop, coroutine, 1, 2, 3, four=4, five=5)
# control returns here when the coroutine is done
"""
import asyncio