View aws_sig_v4_headers.py
import datetime | |
import hashlib | |
import hmac | |
import urllib.parse | |
def aws_sig_v4_headers(access_key_id, secret_access_key, pre_auth_headers, | |
service, region, host, method, path, query, payload): | |
algorithm = 'AWS4-HMAC-SHA256' |
View normalise_environment.py
def normalise_environment(key_values): | |
''' Converts denormalised dict of (string -> string) pairs, where the first string | |
is treated as a path into a nested list/dictionary structure | |
{ | |
"FOO__1__BAR": "setting-1", | |
"FOO__1__BAZ": "setting-2", | |
"FOO__2__FOO": "setting-3", | |
"FOO__2__BAR": "setting-4", | |
"FIZZ": "setting-5", |
View table_csv_view.py
import csv | |
import logging | |
import gevent | |
from psycopg2 import ( | |
connect, | |
sql, | |
) | |
from django.conf import ( |
View getaddrinfo_via_libc.py
from ctypes import ( | |
byref, | |
c_char_p, c_uint32, c_uint16, c_ubyte, | |
CDLL, | |
POINTER, | |
pointer, | |
Structure, Union, | |
sizeof, | |
) | |
import platform |
View path_lock_global_read_write.py
import asyncio | |
import contextlib | |
from fifolock import FifoLock | |
class Read(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] |
View path_lock_local_read_write.py
import asyncio | |
import contextlib | |
import weakref | |
from fifolock import FifoLock | |
class Read(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] |
View path_lock_local_read_write_ancestor.py
import asyncio | |
import contextlib | |
import weakref | |
from fifolock import FifoLock | |
class ReadAncestor(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] |
View asyncio_read_write_lock.py
import asyncio | |
import collections | |
import contextlib | |
class Read(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] | |
class Write(asyncio.Future): |
View path_lock_global_exclusive.py
import asyncio | |
import contextlib | |
class PathLock(): | |
def __init__(self): | |
self.lock = asyncio.Lock() | |
@contextlib.asynccontextmanager | |
async def __call__(self, read, write): |
View aws_sig_v4_headers_aiohttp_s3_put.py
import asyncio | |
import os | |
import aiohttp | |
from aws_sig_v4_headers import aws_sig_v4_headers | |
async def main(): | |
method = 'PUT' | |
host = 's3-eu-west-1.amazonaws.com' |
NewerOlder