View async-does-not-mix.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import httpx | |
import time | |
async def do_work_async(): | |
http = httpx.AsyncClient() | |
while True: | |
resp = await http.get('https://httpbin.org/status/202') |
View example_asyncio_signal_shutdown.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import functools | |
import signal | |
import sys | |
import time | |
async def worker(shutdown_event): | |
try: | |
while True: |
View aiobotocore-config-example.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiobotocore.session | |
import aiobotocore.config | |
async def example(): | |
# Equivalent to boto3.client('s3', config=botocore.client.Config(**kwargs)) | |
# See also for available options: | |
# * https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html |
View boto3async.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ============================================================================= | |
# Copyright 2022 by Rafid Al-Humaimidi. All rights reserved. | |
# Licensed via Apache 2.0 (https://www.apache.org/licenses/LICENSE-2.0) | |
# | |
# Forked from: https://github.com/rafidka/boto3async | |
# ============================================================================= | |
"""Adds simple async wrappers around boto3 client methods. | |
This module adds async methods to the stock boto3 clients. The async versions of |
View example.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
redis-server --save "" --appendonly no |
View listening.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://stackoverflow.com/a/30029855/21784 | |
listening() { | |
if [ $# -eq 0 ]; then | |
sudo lsof -iTCP -sTCP:LISTEN -n -P | |
elif [ $# -eq 1 ]; then | |
sudo lsof -iTCP -sTCP:LISTEN -n -P | grep -i --color $1 | |
else | |
echo "Usage: listening [pattern]" | |
fi | |
} |
View recipe.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import falcon | |
import falcon.asgi | |
import falcon.util | |
class HelloResource: | |
async def on_get(self): | |
pass |
View httpx_multiprocessing.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import multiprocessing | |
import httpx | |
class Publisher: | |
def __init__(self): | |
# NOTE(kgriffs): Explicitly manage the client so we can use the | |
# connection pool. This class could be made into a singleton |
View lru_cache_ttl.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Grant Jenks' LRU Cache with TTL for Python | |
# | |
# https://stackoverflow.com/questions/31771286/python-in-memory-cache-with-time-to-live/71634221#71634221 | |
from functools import lru_cache, wraps | |
from time import monotonic | |
def lru_cache_with_ttl(maxsize=128, typed=False, ttl=60): | |
"""Least-recently used cache with time-to-live (ttl) limit.""" |
View compressed-multipart-python-requests.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In [16]: data = {'some': 'data'} | |
In [17]: data_compressed = gzip.compress(json.dumps(data, ensure_ascii=False).encode()) | |
In [18]: requests.post("https://httpbin.org/anything", files={"msg": ('message.json.gz', body_compressed)}).json() | |
Out[18]: | |
{'args': {}, | |
'data': '', | |
'files': {'msg': 'data:application/octet-stream;base64,H4sIAGeLyGIC/6tWKs7PTVWyUlBKSSxJVKoFABmPLCMQAAAA'}, |
NewerOlder