Skip to content

Instantly share code, notes, and snippets.


Tom Christie tomchristie

View GitHub Profile
class ThrottleTransport:
def __init__(self, throttle, **kwargs):
self.pool = httpcore.SyncConnectionPool(**kwargs)
self.history = []
# Parse the thottle, which should be a string, like '100/minute'.
count, _, duration = throttle.partition('/')
self.max_in_history = int(count)
self.cutoff = {'second': 1.0, 'minute': 60.0, 'hour': 3600.0}[duration]

Maintainer driven issues.

Hi there - we’re trying out something new on this repository. We’re calling it “maintainer driven” issues & pull requests.

What does this mean for you?

In general: Please do not open issues or pull requests on this repository.

Instead, we’d encourage you to instead talk through any issues or suggestions in the chat room, or on the discussion group. If the maintainance team would like to escalate a discussion into an issue then we may do so, or ask you to do so.

tomchristie /
Created Jan 15, 2020
An ASGI proxy service.
import httpx
from starlette.requests import Request
from starlette.responses import StreamingResponse
class Proxy:
def __init__(self, hostname):
self.hostname = hostname
self.client = httpx.AsyncClient()
class LifespanManager:
def __init__(self, app): = app
self.startup_complete = asyncio.Event()
self.shutdown_complete = asyncio.Event()
self.messages = [{'type': 'lifespan.startup'}, {'type': 'lifespan.shutdown'}]
async def __aenter__(self):
self.task = asyncio.create_task(, self.send))
await self.startup_complete.wait()


A proposal for requests III.

Feature support

  • HTTP/1.1 and HTTP/2 Support.
  • async/await support for non-thread-blocking HTTP requests.
  • Fully type annotated.
  • 98% test coverage.
View gist:74f1ef35fa1ae6516ea81de704b3b421
import aiohttp
from django.views.generic import TemplateView
class FetchView(TemplateView):
template_name = 'index.html'
async def get(self, request, *args, **kwargs):
async with aiohttp.ClientSession() as session:
View gist:b99979c57eea85a2fbdb6b2b68926913
* Werkzeug Debugger
* Routing
* Whitenoise
* JWT?
* Cache?
* ->WSGI / ->ASGI
* Pub/Sub
* Multipart
import asyncio
import time
import threading
import queue
async def foo(q):
i = 0
while True:
View formToData.js
function formToData(form) {
const formData = new FormData(form.get()[0])
var params = new Map()
var errors = []
var inputElements = {}
// Initially iterate through all the inputs
form.find(':input').each(function(key, value) {
var elem = $(this)
var name = elem.attr('name')
View gist:31862fba1c41faf9abea10ffc9993c3e
class ProductPriceSerializer(serializers.Serializer):
title = serializers.CharField(source='product.title')
price = serializers.IntegerField()
You can’t perform that action at this time.