Skip to content

Instantly share code, notes, and snippets.

View inotify.py
import functools
import logging
import os
import struct
from typing import Dict, cast, Optional, List
import cytoolz
from trio.hazmat import wait_readable
import inotify_simple
import trio
@parity3
parity3 / quart-trio-triopg.py
Created Jan 19, 2020
running quart-trio inside trio_asyncio.run
View quart-trio-triopg.py
import logging
import sys
import trio, trio_asyncio, asyncio
from hypercorn.trio.run import worker_serve
from hypercorn import Config
import contextlib
import math
from types import TracebackType
from quart import current_app, json
@parity3
parity3 / pub_sub.py
Last active Dec 31, 2019
publish-subscribe with sequential async delivery
View pub_sub.py
from contextlib import asynccontextmanager
import trio
class broadcaster:
def __init__(self):
super().__init__()
self.subscribers = {}
async def __aenter__(self):
@parity3
parity3 / InotifyWrapper.py
Created Dec 30, 2019
use of trio wait_readable and inotify_simple
View InotifyWrapper.py
import functools
import logging
import os
import struct
from typing import Dict, cast, Optional, List
import cytoolz
from trio.hazmat import wait_readable
import inotify_simple
import trio
@parity3
parity3 / httpfile_helpers.py
Created Nov 26, 2019
example of de-chunking a http-spec based chunked transfer encoding via a coroutine (entrypoint function: coro_te_with_packets)
View httpfile_helpers.py
import cStringIO
import logging
log = logging.getLogger('log')
class te_lengthfound_exception(StandardError):
def __init__(self, length, extension):
self.length=length
self.extension=extension
class te_trailer_exception(StandardError):
View gzip_stream.py
import functools, cStringIO, sys
import itertools, zlib, gzip, os
class savedata:
# convenience class used only while reading the gzip file header.
# it may not be a gzip file, so this instance will hold whatever was read so we don't have to seek backwards
def __init__(self, fileobj):
self.fileobj = fileobj
self.saved_data = cStringIO.StringIO()
View 42672128.json
{
"account": {
"customerInfo": null,
"giftCardBalance": null,
"giftCardRedeemCode": "",
"giftCardRedeemError": "",
"isGiftCardRedeemed": false,
"isGiftCardRedeemLoading": false,
"isLoading": false
},
View 39332122.json
{
"account": {
"customerInfo": null,
"giftCardBalance": null,
"giftCardRedeemCode": "",
"giftCardRedeemError": "",
"isGiftCardRedeemed": false,
"isGiftCardRedeemLoading": false,
"isLoading": false
},
View ariat-coniston-pro-gtx-insulated-ebony__product__8762583.json
{
"account": {
"customerInfo": null,
"giftCardBalance": null,
"giftCardRedeemCode": "",
"giftCardRedeemError": "",
"isGiftCardRedeemed": false,
"isGiftCardRedeemLoading": false,
"isLoading": false
},
@parity3
parity3 / httplib2_noclose_HEAD.py
Created Oct 6, 2017
prevent HEAD requests from auto-closing the connection
View httplib2_noclose_HEAD.py
import httplib
import socket
import httplib2
# the whole reason this module is necessary is to prevent HEAD requests from auto-closing the connection.
class HTTPResponse(httplib.HTTPResponse):
def read(self, amt=None):
# prevent the HEAD check from closing the connection