Skip to content

Instantly share code, notes, and snippets.

View mrzechonek's full-sized avatar

Michał 'Khorne' Lowas-Rzechonek mrzechonek

View GitHub Profile
import asyncio
import itertools
async def repeat(value, *, offset, interval, count=None):
await asyncio.sleep(offset)
for i in (range(count) if count else itertools.cycle([None])):
yield value
await asyncio.sleep(interval)
import inspect
import asyncio
from async_generator import asynccontextmanager
class Response:
def __init__(self, body):
self._body = body
async def body(self):
return self._body
import sys
from functools import partial
from typing import Tuple
from docopt import DocoptExit, docopt
class GatewayConfigClient:
def configuration_get(self, destination: int, net_index: int):
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include "gsmat.h"
#include "log.h"
#include "utils.h"
* This a parser for Hayes command set used to talk with GSM modem.
from collections import defaultdict
from dataclasses import dataclass, field
import pytest
from _pytest.mark import Mark, ParameterSet
from _pytest.python import Metafunc
class Metafunc:
import pytest
from typing import NamedTuple
class FixtureConfig:
def from_request(cls, request):
kwargs = {}
import pytest
from typing import NamedTuple
class SomeFixture(NamedTuple):
first_option: bool = False
second_option: bool = True
import asyncio
import functools
def igather(*awaitables, loop=None, timeout=None):
loop = loop or asyncio.get_event_loop()
queue = asyncio.Queue()
cancelled = False
remaining = set(map(asyncio.ensure_future, awaitables))
import asyncio
import contextlib
import functools
import json
from concurrent.futures import CancelledError
from datetime import datetime
import aiohttp
import pytest
import signal
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
from gi.repository import GLib
from prompt_toolkit import prompt
from prompt_toolkit.eventloop.defaults import get_event_loop, set_event_loop