Skip to content

Instantly share code, notes, and snippets.

🌿

Tom Christie tomchristie

🌿
View GitHub Profile
View throttle_transport.py
class ThrottleTransport:
def __init__(self, throttle, **kwargs):
self.pool = httpcore.SyncConnectionPool(**kwargs)
self.history = []
# Parse the thottle, which should be a string, like '100/minute'.
count, _, duration = throttle.partition('/')
self.max_in_history = int(count)
self.cutoff = {'second': 1.0, 'minute': 60.0, 'hour': 3600.0}[duration]
View CONTRIBUTING.md

Maintainer driven issues.

Hi there - we’re trying out something new on this repository. We’re calling it “maintainer driven” issues & pull requests.

What does this mean for you?

In general: Please do not open issues or pull requests on this repository.

Instead, we’d encourage you to instead talk through any issues or suggestions in the chat room, or on the discussion group. If the maintainance team would like to escalate a discussion into an issue then we may do so, or ask you to do so.

@tomchristie
tomchristie / proxy.py
Created Jan 15, 2020
An ASGI proxy service.
View proxy.py
import httpx
from starlette.requests import Request
from starlette.responses import StreamingResponse
class Proxy:
def __init__(self, hostname):
self.hostname = hostname
self.client = httpx.AsyncClient()
View lifespan.py
class LifespanManager:
def __init__(self, app):
self.app = app
self.startup_complete = asyncio.Event()
self.shutdown_complete = asyncio.Event()
self.messages = [{'type': 'lifespan.startup'}, {'type': 'lifespan.shutdown'}]
async def __aenter__(self):
self.task = asyncio.create_task(self.app(self.receive, self.send))
await self.startup_complete.wait()
View README.md

HTTPCore

A proposal for requests III.

Feature support

  • HTTP/1.1 and HTTP/2 Support.
  • async/await support for non-thread-blocking HTTP requests.
  • Fully type annotated.
  • 98% test coverage.
View gist:74f1ef35fa1ae6516ea81de704b3b421
import aiohttp
from django.views.generic import TemplateView
class FetchView(TemplateView):
template_name = 'index.html'
async def get(self, request, *args, **kwargs):
async with aiohttp.ClientSession() as session:
View gist:b99979c57eea85a2fbdb6b2b68926913
* CORS
* Werkzeug Debugger
* Routing
* Whitenoise
* JWT?
* Cache?
* ->WSGI / ->ASGI
* Pub/Sub
* Multipart
View async-to-block.py
import asyncio
import time
import threading
import queue
async def foo(q):
i = 0
while True:
q.put(i)
View formToData.js
function formToData(form) {
const formData = new FormData(form.get()[0])
var params = new Map()
var errors = []
var inputElements = {}
// Initially iterate through all the inputs
form.find(':input').each(function(key, value) {
var elem = $(this)
var name = elem.attr('name')
View gist:31862fba1c41faf9abea10ffc9993c3e
class ProductPriceSerializer(serializers.Serializer):
title = serializers.CharField(source='product.title')
price = serializers.IntegerField()
You can’t perform that action at this time.