Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
vxgmichel / amap.py
Created April 19, 2024 11:13
Async map with task limit using anyio
import math
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterable, AsyncIterator, Awaitable, Callable
from anyio import create_memory_object_stream, create_task_group, abc
from anyio.streams.memory import MemoryObjectReceiveStream
@asynccontextmanager
async def amap[
@vxgmichel
vxgmichel / udpproxy.py
Created February 2, 2017 10:05
UDP proxy server using asyncio
"""UDP proxy server."""
import asyncio
class ProxyDatagramProtocol(asyncio.DatagramProtocol):
def __init__(self, remote_address):
self.remote_address = remote_address
self.remotes = {}
@vxgmichel
vxgmichel / aioudp.py
Last active February 15, 2024 00:12
High-level UDP endpoints for asyncio
"""Provide high-level UDP endpoints for asyncio.
Example:
async def main():
# Create a local UDP enpoint
local = await open_local_endpoint('localhost', 8888)
# Create a remote UDP enpoint, pointing to the first one
@vxgmichel
vxgmichel / convert_cp437.py
Last active October 22, 2023 22:46
Video conversion for Turing Complete 80x48 display.
#!/usr/bin/env python
"""
Convert an mp4 video file to uncompressed 80x48 video frames.
The output data is meant to be used as direct memory for the 80x24 Turing complete
console. In particular, pixels are grouped vertically so 2 pixels can fit into a
single `▀` (`\xdf`) character.
For instance, the following frame:
@vxgmichel
vxgmichel / Cargo.toml
Last active October 11, 2023 11:56
Comparing rust and python on a specific set intersection problem
[package]
name = "intersection"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
ahash = ["dep:ahash"]
@vxgmichel
vxgmichel / sshmockserver.py
Last active August 31, 2023 14:56
A pytest fixture for running an ssh mock server
"""A pytest fixture for running an ssh mock server.
Requires pytest and asyncssh:
$ pip install pytest asyncssh
"""
from socket import AF_INET
from unittest.mock import Mock
from contextlib import asynccontextmanager
@vxgmichel
vxgmichel / trio_task_monitoring.py
Last active August 28, 2023 18:22
Trio instrument to detect and log blocking tasks
import time
import inspect
import traceback
import trio
import structlog
logger = structlog.get_logger()
@vxgmichel
vxgmichel / go-style-generators.md
Last active July 26, 2023 06:50
Go-style generators in asyncio

Go-style generators in asyncio

I've recently been looking into the go concurrency model to see how it compares to asyncio.

An interesting concept caught my attention: go generators.

Generators in Go

@vxgmichel
vxgmichel / asyncio_timing.py
Last active November 16, 2022 05:35
A timing context for asyncio
# See the coresponding stackoverflow post:
# https://stackoverflow.com/a/34827291/2846140
import time
import asyncio
import selectors
import contextlib
class TimedSelector(selectors.DefaultSelector):
@vxgmichel
vxgmichel / timecapsule.py
Last active October 15, 2022 14:55
Theoretical solver of the LCS35 Time Capsule Crypto-Puzzle
#!/usr/bin/env python3
"""
Theoretical solver of the LCS35 Time Capsule Crypto-Puzzle
See: https://people.csail.mit.edu/rivest/lcs35-puzzle-description.txt
Example usage:
% time ./timecapsule.py -t 100000