Skip to content

Instantly share code, notes, and snippets.

View akaIDIOT's full-sized avatar
🍭

Mattijs Ugen akaIDIOT

🍭
  • The Hague, The Netherlands
View GitHub Profile
from collections.abc import Mapping
class DictView(Mapping):
# sentinel value used to check when a KeyError should be raised rather than returning a value
_no_default = object()
def __init__(self, source, separator='.'):
self._source = source
self._separator = separator
@akaIDIOT
akaIDIOT / chunked.py
Last active May 24, 2016 15:50
I/O implementation leveraging requests' chunked response handling
from io import RawIOBase
class ChunkedResponseIO(RawIOBase):
"""
Turns a generator of chunks into raw I/O, which can in turn be fed to
something like an `io.BufferedReader`.
.. code-block:: python
@akaIDIOT
akaIDIOT / drain.py
Last active March 25, 2016 14:55
Context manager to drain content from a requests.Response object in the case its content is not fully read
class drain(object):
"""
Context manager to make sure a `Response` object is drained of content.
Can be used to return a connection used to retrieve a response back to the
connection pool it came from in case the actual response content of the
response is not needed, e.g.:
.. code-block:: python
response = session.get('https://example.com/')
@akaIDIOT
akaIDIOT / periodic.py
Created March 26, 2015 21:26
Call something periodically using asyncio
import asyncio
def call_periodic(interval, callback, *args, **kwargs):
# get loop as a kwarg or take the default one
loop = kwargs.get('loop') or asyncio.get_event_loop()
# record the loop's time when call_periodic was called
start = loop.time()
def run(handle):