Skip to content

Instantly share code, notes, and snippets.

View goodboy's full-sized avatar
🥱
seriously, don't be serious..

goodboy

🥱
seriously, don't be serious..
View GitHub Profile
from __future__ import annotations
import cProfile
import pstats
from collections import deque
from dataclasses import dataclass, field
from random import Random
from timeit import default_timer
from typing import Awaitable, Callable, Deque, Iterable, List, Optional, Protocol, Union
@richardsheridan
richardsheridan / universal_queue.py
Last active November 7, 2022 04:09 — forked from njsmith/universal-trio-queue.py
Universal cross-thread unbuffered queue for trio, asyncio, and threads
# Rough draft of a Queue object that can be used simultaneously from
# sync threads + *multiple* trio and asyncio threads, all at once.
#
# If you don't have multiple threads each doing their own separate calls to Xio.run,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
#
# Currently, the test will provide 94% coverage given sufficient timeout. The
# remaining are (apparently rare) races and the durable aio shielding.
import random
import trio
import functools
class ReplicatedBroadcastFailed(Exception):
pass
class ReplicatedBroadcast:
def __init__(self, nursery, async_iterable):
self._nursery = nursery
@iamzoltan
iamzoltan / pussthescrapper.py
Last active July 15, 2021 21:38
basic scrapping
import httpx
import feedparser
from bs4 import BeautifulSoup
from sqlalchemy.exc import IntegrityError
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
# create db engine
engine = create_engine('sqlite:///crypto_articles.db', echo = True)
meta = MetaData()
@richardsheridan
richardsheridan / map_concurrently_in_subthread_trio.py
Last active January 2, 2023 22:14
map_concurrently_in_subthread_trio
import queue
import random
from functools import partial
from time import sleep, perf_counter
import trio
CONCURRENCY_LIMIT = 8
limiter = trio.CapacityLimiter(CONCURRENCY_LIMIT)
@basak
basak / aioevent.md
Created April 6, 2021 02:27
Trio broadcast implementation with "slowest consumer" backpressure

I've been using this code "in production" for a while now. It dates back to https://groups.google.com/g/python-tulip/c/J7tCcBU5TPA/m/NM7iBhhhEAAJ except that I converted it to Trio a while ago. It is intended to be lossless - if desired, you can ensure to catch all messages since you start to listen, without losing anything. Producers are blocked on send() until the slowest consumer has received the message.

Since new consumers won't receive messages from before they began to listen, the point at which a consumer "begins listening" is important. This happens when the async iterator is created - ie. when the for loop runs the implicit aiter(). If you do this as the first thing in a coroutine, you might expect all message following a nursery.start_soon() call starting that coroutine to be picked up. But in practice, the for loop won't run the implicit aiter() until some time later, and so you won't see messages sent prior to that point. To avoid this, you must all aiter() yourself and pass that in, o

# Rough draft of a Queue object that can be used simultaneously simultaneously from
# sync threads + *multiple* trio threads, all at once.
#
# If you don't have multiple threads each doing their own separate calls to trio.run,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
#
# Also note: completely untested, probably has bugs
from collections import OrderedDict
"""
Example of running an embedded IPython shell inside an already-running trio loop with working autoawait (it's handy
to be able to start an interactive REPL with your application environment fully initialized). This is a full solution
that works around https://github.com/ipython/ipython/issues/680 (see
https://gist.github.com/mikenerone/3640fdd450b4ca55ee8df4d4da5a7165 for how simple it *could* be). This bug should be
fixed IMO (using atexit is a questionable design choice in the first place given the embedding feature of IPython
IMO). As it is now, the entire IPythonAtExitContext context manager exists only to work around the problem,
otherwise it would result in an error on process exit when IPython's atexit-registered method calls fail to save the
input history.
import trio
import weakref
import cachetools
# Note: this assumes you only have one trio thread running at once...
# In the unlikely event that you have multiple, you should put the locks and cache
# into thread-local or run-local storage.
def trio_lru_cache(fn):
cache = cachetools.LRUCache()
# Trick: this contains a lock for each unique cache key in use,
class EnterTimeout:
def __init__(self, async_cm, timeout):
self._async_cm = async_cm
self._timeout = timeout
async def __aenter__(self):
with fail_after(self.timeout):
return await self._async_cm.__aenter__()
async def __aexit__(self, *args):