This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import subprocess | |
import sys | |
from concurrent.futures import ThreadPoolExecutor | |
import html | |
import dictdiffer | |
import iso8601 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from datetime import datetime, timezone | |
import os | |
def utc_now(): | |
# utcnow returns a naive datetime, so we have to set the timezone manually <sigh> | |
return datetime.utcnow().replace(tzinfo=timezone.utc) | |
class Terminator: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tracemalloc | |
import os | |
import linecache | |
import wrapt | |
_TRACE_FILTERS = ( | |
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), | |
tracemalloc.Filter(False, tracemalloc.__file__, all_frames=True), # needed because tracemalloc calls fnmatch | |
tracemalloc.Filter(False, linecache.__file__), | |
tracemalloc.Filter(False, os.path.abspath(__file__), all_frames=True), # since we call weakref |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- inspiration from https://dba.stackexchange.com/questions/69988/how-can-i-fake-inet-client-addr-for-unit-tests-in-postgresql/70009#70009 | |
CREATE SCHEMA if not exists override; | |
create table if not exists override.freeze_time_param_type | |
( | |
param_type text not null primary key | |
); | |
insert into override.freeze_time_param_type values ('enabled'), ('timestamp'), ('tick') on conflict do nothing; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import faulthandler | |
import warnings | |
from pathlib import Path | |
import io | |
from typing import Optional | |
# This is how often we'll trigger our callback to measure blockage | |
_MIN_RESOLUTION = 0.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import argparse | |
from collections import defaultdict | |
from datetime import date, timedelta | |
import logging | |
from typing import Dict | |
from pprint import pprint | |
from pyeconet import EcoNetApiInterface | |
from pyeconet.equipment import EquipmentType |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from typing import List | |
def _ignore_task_exception(task: asyncio.Future, logger: logging.Logger): | |
# noinspection PyBroadException | |
try: | |
task.result() | |
except BaseException: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
from pathlib import Path | |
from typing import Dict, Optional | |
import logging | |
import shutil | |
from functools import partial | |
import hashlib | |
import dataclasses |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import time | |
import logging | |
from multiprocessing import Process, Queue | |
from queue import Empty | |
import botocore.session | |
from botocore.credentials import Credentials, CredentialResolver, CredentialProvider, AssumeRoleProvider |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requires webdav connection (see WebDAV Server app, and ensure port is not 8080) | |
from pathlib import Path | |
import shutil | |
def main(): | |
cam_path = Path('//192.168.1.186@8081/DavWWWRoot/DCIM/Camera') | |
for idx, item in enumerate(cam_path.iterdir()): | |
if idx % 100 == 0: | |
print(f"Processing folder {idx}") |
NewerOlder