Skip to content

Instantly share code, notes, and snippets.

View thehesiod's full-sized avatar
🎯
Focusing

Alexander Mohr thehesiod

🎯
Focusing
View GitHub Profile
@thehesiod
thehesiod / request_https_leak.py
Last active March 21, 2017 19:59
Python Requests HTTPS leak
import requests
import multiprocessing
import setproctitle
import os
import threading
import tracemalloc
import linecache
import traceback
import gc
import logging
#!/usr/bin/env python3.5
import zmq.asyncio
import zmq
import asyncio
import pickle
import logging
import tempfile
import traceback
import multiprocessing
@thehesiod
thehesiod / libzmq_bug.cpp
Last active May 14, 2017 18:22
libzmq bug
#include <unistd.h>
#include <sys/types.h>
#include <errno.h>
#include "zmq.h"
#include <stdio.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdexcept>
#include <time.h>
#include <string>
@thehesiod
thehesiod / ahps_formats.py
Last active June 6, 2017 22:49
Comparison of new and old AHPS file formats
import numpy as np
import netCDF4
def reversed_list_enumerate(value: list):
for i in range(-1, -len(value) - 1, -1):
yield i, value[i]
def find_first(dataset, var_name, reverse: bool, ahps_to_inches: bool=False):
@thehesiod
thehesiod / aiohttp_timeout.py
Created June 7, 2017 22:04
asyncio timeout testcase
import asyncio
import aiohttp.web
async def stream_handler(request):
# Without the Content-Type, most (all?) browsers will not render
# partially downloaded content. Note, the response type is
# StreamResponse not Response.
resp = aiohttp.web.StreamResponse(status=200,
reason='OK',
headers={'Content-Type': 'text/html'})
@thehesiod
thehesiod / async_exit_stack.py
Last active July 23, 2017 19:35
Async ExitStack
from inspect import iscoroutinefunction, isawaitable
import sys
from collections import deque
# NOTE: this follows the contextlib.ExitStack implementation
class _BaseExitStack:
def __init__(self):
@thehesiod
thehesiod / xray_httplib_patch.py
Last active August 11, 2017 19:15
AWS X-Ray http.client patch
import wrapt
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.util import inject_trace_header
import ssl
_XRAY_PROP = '_xray_prop'
@thehesiod
thehesiod / test_script.py
Created January 10, 2018 06:50
test_script.py
#!/usr/local/bin/python3
from aiohttp import (ClientSession, TCPConnector, BasicAuth)
from asyncio import get_event_loop
from async_timeout import timeout as aio_timeout
from ssl import create_default_context
from yarl import URL
# allows us to customize the session with cert files
def setup_session(self, path_to_cert=None, custom_headers=None, login=None, close=False):
@thehesiod
thehesiod / msgpack.py
Created March 30, 2018 02:25
MessagePack Utilities
import msgpack
import gc
import datetime
from datetime import timezone, timedelta
empty_dict_bytes = msgpack.packb({})
_datetime_ExtType = 42
# NOTE: if we were to store the timestamp instead of the extended information, for naive datetimes we'd have to convert
@thehesiod
thehesiod / s3_relay.py
Last active April 4, 2018 07:08
S3 Latency Limiter, parallel get_object wrapper
import asyncio
import logging
from typing import List
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger):
# noinspection PyBroadException
try:
task.result()
except BaseException: