This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import async_timeout | |
import quattro | |
import contextlib | |
import anyio | |
async def main(note, cmgr): | |
try: | |
async with cmgr(-11): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def main(): | |
try: | |
async with asyncio.timeout(0): | |
await asyncio.sleep(0) | |
except TimeoutError: | |
print("PASS: great a timeout is here!") | |
else: | |
print("FAILED: expected a timeout") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psutil | |
import multiprocessing | |
import socket | |
import sys | |
import concurrent.futures | |
ctx = multiprocessing.get_context("spawn") | |
def target(sock): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import timeit, random | |
import heapq | |
random.seed(42) | |
left = [] | |
right = [] | |
# Create two sorted lists of integers | |
n = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import ipaddress | |
import pprint | |
import socket | |
import sys | |
from socket import AddressFamily, SocketKind | |
from jeepney.wrappers import MessageGenerator, new_method_call | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from distributed import default_client | |
import sys | |
import asyncio | |
import concurrent.futures | |
from concurrent import futures | |
import contextlib | |
from tornado.ioloop import IOLoop | |
import random | |
import time |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Test BaseEventLoop.getaddrinfo performance, before and after my commit 39c135b | |
# Try a variety of getaddrinfo parameters and log the duration of 10k calls | |
import socket | |
import time | |
from asyncio import get_event_loop | |
try: | |
from asyncio.base_events import _ipaddr_info | |
except ImportError: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataclasses | |
import operator | |
@dataclasses.dataclass | |
class WarningFilter: | |
command: str | |
msg: str | |
cls: str | |
module: str | None = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
import threading | |
import collections | |
class Record: | |
def __init__(self): | |
self.levelno = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import sys | |
import asyncio | |
import concurrent.futures | |
import contextlib | |
class _Client: | |
def __init__(self, loop): |