This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
from aiohttp import (ClientSession, TCPConnector, BasicAuth) | |
from asyncio import get_event_loop | |
from async_timeout import timeout as aio_timeout | |
from ssl import create_default_context | |
from yarl import URL | |
# allows us to customize the session with cert files | |
def setup_session(self, path_to_cert=None, custom_headers=None, login=None, close=False): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from typing import List | |
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger): | |
# noinspection PyBroadException | |
try: | |
task.result() | |
except BaseException: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tracemalloc | |
import os | |
import linecache | |
import wrapt | |
_TRACE_FILTERS = ( | |
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), | |
tracemalloc.Filter(False, tracemalloc.__file__, all_frames=True), # needed because tracemalloc calls fnmatch | |
tracemalloc.Filter(False, linecache.__file__), | |
tracemalloc.Filter(False, os.path.abspath(__file__), all_frames=True), # since we call weakref |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import msgpack | |
import gc | |
import datetime | |
from datetime import timezone, timedelta | |
empty_dict_bytes = msgpack.packb({}) | |
_datetime_ExtType = 42 | |
# NOTE: if we were to store the timestamp instead of the extended information, for naive datetimes we'd have to convert |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from typing import List | |
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger): | |
# noinspection PyBroadException | |
try: | |
task.result() | |
except BaseException: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import botocore.exceptions | |
import botocore.session | |
import tracemalloc | |
import aiobotocore.session | |
import gc | |
import os | |
import time | |
import sys | |
import asyncio |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def generator_fn(): | |
print(f" enter generator {id(asyncio.Task.current_task())}") | |
try: | |
while True: | |
yield | |
except: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from collections import defaultdict | |
from datetime import datetime, timedelta | |
import sys | |
import os | |
import logging | |
from google.cloud import datastore | |
from google.appengine.ext import ndb |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import argparse | |
import logging | |
import functools | |
# Third Party | |
from google.oauth2 import service_account | |
from google.oauth2 import _client | |
from google.auth import transport | |
from google.auth.transport import requests as gauth_requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import subprocess | |
import sys | |
from concurrent.futures import ThreadPoolExecutor | |
import html | |
import dictdiffer | |
import iso8601 |