I've recently been looking into the go concurrency model to see how it compares to asyncio.
An interesting concept caught my attention: go generators.
import math | |
import asyncio | |
from contextlib import asynccontextmanager | |
from typing import AsyncIterable, AsyncIterator, Awaitable, Callable | |
from anyio import create_memory_object_stream, create_task_group, abc | |
from anyio.streams.memory import MemoryObjectReceiveStream | |
@asynccontextmanager | |
async def amap[ |
"""UDP proxy server.""" | |
import asyncio | |
class ProxyDatagramProtocol(asyncio.DatagramProtocol): | |
def __init__(self, remote_address): | |
self.remote_address = remote_address | |
self.remotes = {} |
"""Provide high-level UDP endpoints for asyncio. | |
Example: | |
async def main(): | |
# Create a local UDP enpoint | |
local = await open_local_endpoint('localhost', 8888) | |
# Create a remote UDP enpoint, pointing to the first one |
#!/usr/bin/env python | |
""" | |
Convert an mp4 video file to uncompressed 80x48 video frames. | |
The output data is meant to be used as direct memory for the 80x24 Turing complete | |
console. In particular, pixels are grouped vertically so 2 pixels can fit into a | |
single `▀` (`\xdf`) character. | |
For instance, the following frame: |
[package] | |
name = "intersection" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[features] | |
ahash = ["dep:ahash"] |
"""A pytest fixture for running an ssh mock server. | |
Requires pytest and asyncssh: | |
$ pip install pytest asyncssh | |
""" | |
from socket import AF_INET | |
from unittest.mock import Mock | |
from contextlib import asynccontextmanager |
import time | |
import inspect | |
import traceback | |
import trio | |
import structlog | |
logger = structlog.get_logger() | |
I've recently been looking into the go concurrency model to see how it compares to asyncio.
An interesting concept caught my attention: go generators.
# See the coresponding stackoverflow post: | |
# https://stackoverflow.com/a/34827291/2846140 | |
import time | |
import asyncio | |
import selectors | |
import contextlib | |
class TimedSelector(selectors.DefaultSelector): |
#!/usr/bin/env python3 | |
""" | |
Theoretical solver of the LCS35 Time Capsule Crypto-Puzzle | |
See: https://people.csail.mit.edu/rivest/lcs35-puzzle-description.txt | |
Example usage: | |
% time ./timecapsule.py -t 100000 |