Skip to content

Instantly share code, notes, and snippets.

View mrzechonek's full-sized avatar

Michał 'Khorne' Lowas-Rzechonek mrzechonek

View GitHub Profile
import asyncio
import itertools
async def repeat(value, *, offset, interval, count=None):
await asyncio.sleep(offset)
for i in (range(count) if count else itertools.cycle([None])):
yield value
await asyncio.sleep(interval)
import inspect
import asyncio
from async_generator import asynccontextmanager
class Response:
def __init__(self, body):
self._body = body
async def body(self):
return self._body
import sys
from functools import partial
from typing import Tuple
from docopt import DocoptExit, docopt
class GatewayConfigClient:
def configuration_get(self, destination: int, net_index: int):
pass
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include "gsmat.h"
#include "log.h"
#include "utils.h"
/*
* This a parser for Hayes command set used to talk with GSM modem.
from collections import defaultdict
from dataclasses import dataclass, field
import pytest
from _pytest.mark import Mark, ParameterSet
from _pytest.python import Metafunc
@dataclass
class Metafunc:
import pytest
from typing import NamedTuple
class FixtureConfig:
FIXTURE = None
@classmethod
def from_request(cls, request):
kwargs = {}
import pytest
from typing import NamedTuple
class SomeFixture(NamedTuple):
first_option: bool = False
second_option: bool = True
@pytest.fixture
import asyncio
import functools
def igather(*awaitables, loop=None, timeout=None):
loop = loop or asyncio.get_event_loop()
queue = asyncio.Queue()
cancelled = False
remaining = set(map(asyncio.ensure_future, awaitables))
import asyncio
import contextlib
import functools
import json
from concurrent.futures import CancelledError
from datetime import datetime
import aiohttp
import pytest
#!python3
import signal
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
from gi.repository import GLib
from prompt_toolkit import prompt
from prompt_toolkit.eventloop.defaults import get_event_loop, set_event_loop