Skip to content

Instantly share code, notes, and snippets.

View thehesiod's full-sized avatar
🎯
Focusing

Alexander Mohr thehesiod

🎯
Focusing
View GitHub Profile
@thehesiod
thehesiod / test_script.py
Created January 10, 2018 06:50
test_script.py
#!/usr/local/bin/python3
from aiohttp import (ClientSession, TCPConnector, BasicAuth)
from asyncio import get_event_loop
from async_timeout import timeout as aio_timeout
from ssl import create_default_context
from yarl import URL
# allows us to customize the session with cert files
def setup_session(self, path_to_cert=None, custom_headers=None, login=None, close=False):
@thehesiod
thehesiod / gather_cancel_children_on_exception.py
Last active September 27, 2022 08:55
asyncio cancel all tasks on first task's exception
import asyncio
import logging
from typing import List
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger):
# noinspection PyBroadException
try:
task.result()
except BaseException:
@thehesiod
thehesiod / MemTracer.py
Last active May 15, 2023 22:55
Memory Tracer
import tracemalloc
import os
import linecache
import wrapt
_TRACE_FILTERS = (
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, tracemalloc.__file__, all_frames=True), # needed because tracemalloc calls fnmatch
tracemalloc.Filter(False, linecache.__file__),
tracemalloc.Filter(False, os.path.abspath(__file__), all_frames=True), # since we call weakref
@thehesiod
thehesiod / msgpack.py
Created March 30, 2018 02:25
MessagePack Utilities
import msgpack
import gc
import datetime
from datetime import timezone, timedelta
empty_dict_bytes = msgpack.packb({})
_datetime_ExtType = 42
# NOTE: if we were to store the timestamp instead of the extended information, for naive datetimes we'd have to convert
@thehesiod
thehesiod / s3_relay.py
Last active April 4, 2018 07:08
S3 Latency Limiter, parallel get_object wrapper
import asyncio
import logging
from typing import List
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger):
# noinspection PyBroadException
try:
task.result()
except BaseException:
@thehesiod
thehesiod / test_leak.py
Last active September 19, 2018 22:52
aiobotocore / botocore leak test script
#!/usr/bin/env python3
import botocore.exceptions
import botocore.session
import tracemalloc
import aiobotocore.session
import gc
import os
import time
import sys
import asyncio
@thehesiod
thehesiod / asyncio_generator_issue.py
Created August 1, 2018 21:21
asyncio generator issue
import asyncio
async def generator_fn():
print(f" enter generator {id(asyncio.Task.current_task())}")
try:
while True:
yield
except:
@thehesiod
thehesiod / upvote_client.py
Created August 28, 2018 17:00
google upvote client
import argparse
from collections import defaultdict
from datetime import datetime, timedelta
import sys
import os
import logging
from google.cloud import datastore
from google.appengine.ext import ndb
@thehesiod
thehesiod / upvote_aiohttp.py
Created August 29, 2018 12:42
upvote aiohttp client
import asyncio
import argparse
import logging
import functools
# Third Party
from google.oauth2 import service_account
from google.oauth2 import _client
from google.auth import transport
from google.auth.transport import requests as gauth_requests
@thehesiod
thehesiod / 1pass_dups.py
Last active October 5, 2023 17:09
1password duplicate remover (alpha, only run in debugger with breakpoints everywhere *g*)
#!/usr/bin/env python3
import json
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
import html
import dictdiffer
import iso8601