Skip to content

Instantly share code, notes, and snippets.

@harrisont
harrisont / example.py
Created February 25, 2017 22:03
Python asyncio exception, cancellation, and KeyboardInterrupt handling
import asyncio
import logging
import time
async def do_work(i):
print('start', i)
# Work
time.sleep(1)
@harrisont
harrisont / file.py
Last active March 23, 2022 00:32
Python: live output from subprocess command
# From http://stackoverflow.com/a/18422264
import subprocess
import sys
with open('test.log', 'w') as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, ''): # With Python 3, you need iter(process.stdout.readline, b'') (i.e. the sentinel passed to iter needs to be a binary string, since b'' != '')
sys.stdout.write(line)
f.write(line)
@harrisont
harrisont / example.py
Created June 7, 2016 07:24
Limit the number of running futures with asyncio
import asyncio
from contextlib import closing
import random
async def async_index_printer(index: int):
print('start', index)
await asyncio.sleep(random.uniform(1, 3))
return index
@harrisont
harrisont / as_completed_with_max_concurrent.py
Last active April 13, 2020 12:10
as_completed_with_max_concurrent: similar to asyncio.as_completed, but limits the concurrently running futures
import asyncio
import itertools
def as_completed_with_max_concurrent(futures, max_concurrent, loop=None, timeout=None):
"""Tweaked version of `asyncio.as_completed` with the addition of the `max_concurrent` param.
The main change is to only queue (`_queue_future`) the first `max_concurrent` futures initially.
The rest will be queued in `_on_completion`.
"""
@harrisont
harrisont / download_multiple.py
Last active March 19, 2023 05:19
Use asyncio and aiohttp to asynchronously download multiple files at once and handle the responses as they finish
import asyncio
from contextlib import closing
import aiohttp
async def download_file(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
assert response.status == 200
# For large files use response.content.read(chunk_size) instead.
@harrisont
harrisont / coroutine_helper.py
Last active June 6, 2016 09:20
Coroutine helpers and example
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
@coroutine
def broadcast(*sinks):
@harrisont
harrisont / parse_working_dir.py
Created June 5, 2016 20:36
Parse a working directory from the arguments with argparse, validating that the directory exists
import argparse
import os
def directory(raw_path):
if not os.path.isdir(raw_path):
raise argparse.ArgumentTypeError('"{}" is not an existing directory'.format(raw_path))
return os.path.abspath(raw_path)