Skip to content

Instantly share code, notes, and snippets.

View bench.py
import timeit, random
import heapq
random.seed(42)
left = []
right = []
# Create two sorted lists of integers
n = 0
View getaddrinfo.py
import asyncio
import ipaddress
import pprint
import socket
import sys
from socket import AddressFamily, SocketKind
from jeepney.wrappers import MessageGenerator, new_method_call
View demo.py
#!/usr/bin/env python3
from distributed import default_client
import sys
import asyncio
import concurrent.futures
from concurrent import futures
import contextlib
from tornado.ioloop import IOLoop
import random
import time
@graingert
graingert / bench.py
Created May 20, 2022 — forked from ajdavis/bench.py
asyncio getaddrinfo benchmark
View bench.py
# Test BaseEventLoop.getaddrinfo performance, before and after my commit 39c135b
# Try a variety of getaddrinfo parameters and log the duration of 10k calls
import socket
import time
from asyncio import get_event_loop
try:
from asyncio.base_events import _ipaddr_info
except ImportError:
View warn_sort.py
import dataclasses
import operator
@dataclasses.dataclass
class WarningFilter:
command: str
msg: str
cls: str
module: str | None = None
View demo.py
import sys
import logging
import threading
import collections
class Record:
def __init__(self):
self.levelno = 0
View ki.py
from __future__ import annotations
import sys
import asyncio
import concurrent.futures
import contextlib
class _Client:
def __init__(self, loop):
@graingert
graingert / client.py
Created Sep 3, 2021 — forked from foxmask/client.py
websocket client
View client.py
#!/usr/bin/env python3
import sys
import asyncio
import json
import websockets
async def hello():
uri = "ws://localhost:8888"
@graingert
graingert / anyio.pytb
Created Aug 24, 2021
amirouche-generator-stop
View anyio.pytb
2021-08-24 09:11:54.057 | ERROR | __main__:get:51 - http get failed with exception url: https://hyper.dev, <class 'RuntimeError'>
Traceback (most recent call last):
File "/home/graingert/.virtualenvs/testingpypy37/site-packages/httpcore/_backends/anyio.py", line 60, in read
return await self.stream.receive(n)
│ │ │ └ 65536
│ │ └ <function TLSStream.receive at 0x00007f0102d79920>
│ └ TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x00007f010434cb80>, standard_compatible=False, _...
<httpcore._backends.anyio.SocketStream object at 0x00005651a1184608>
View gimcrack.py
import curio
import collections
import threading
class Locals(threading.local):
synchrotron = None
tlocals = Locals()