Skip to content

Instantly share code, notes, and snippets.

@pirogoeth
pirogoeth / apt_collector.py
Created January 2, 2021 22:06
apt metric collector for prometheus exposition
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim: set ai et ts=4 sts=4 sw=4:
import apt
import argparse
import os
import pathlib
import shutil
import sys
@pirogoeth
pirogoeth / filebeat_sd_notify_wrapper.sh
Created December 7, 2020 23:08
systemd notify-compatible wrapper script to launch and monitor filebeat
#!/usr/bin/env bash
CONNECT_TIMEOUT=${HTTP_CONNECT_TIMEOUT:-1}
HTTP_ADDR=${FILEBEAT_HTTP_ADDR:-http://localhost:5066/}
REPORT_TIME=$(($WATCHDOG_USEC / 2000000))
SD_NOTIFY=${SD_NOTIFY_PATH:-/bin/systemd-notify}
set -euo pipefail
function watchdog() {
#!/usr/bin/env zsh
# -*- coding: utf-8 -*-
# vim: set ai et ts=4 sts=4 sw=4 syntax=zsh:
set -o pipefail
function __ensure_envdir() {
[[ ! -d "${HOME}/.envmgr.d/" ]] && mkdir -p "${HOME}/.envmgr.d/"
return 0
}
# -*- coding: utf-8 -*-
import functools
import operator
from typing import Optional
ANY = object()
class regex:
@respond_to(regex=r"^scale\s+(?P<params>.*)", flags=re.I)
@abac.allow_access("nomad.scale", defer=True)
async def scale(self, msg: Message, authz: abac.Authorizer, params: str):
params: dict = kvinline.parse(params)
service_name: Optional[str] = None
regions: Optional[str] = None
scale_groups: Mapping[str, int] = {}
for key, value in params.items():
import typing
from functools import partial
from typing import Optional, Tuple, Type, Union
K = typing.TypeVar("K")
V = typing.TypeVar("V")
class TypedDict(typing.Dict[K, V]):
pass
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import json
import os
import re
import shutil
import sys
from datetime import datetime
# -*- coding: utf-8 -*-
import inspect
def get_caller():
""" Inspects the call stack to determine the path of a caller.
Returns a string representing the full path to the caller.
"""
from raygun import json
from raygun.params import parse_str
class Account(mongoengine.Document, raygun.params.CustomKind):
""" Account object for database storage AND request body schema
"""
name = StringField(required=True)
@classmethod
In [1]: import timeit
In [4]: class A(object):
...: pass
...:
In [5]: item = object()
In [6]: item_a = A()