pip install python-consul
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import json | |
| import time | |
| from peewee import * | |
| class RqliteCursor: | |
| """Minimal cursor-like wrapper for rqlite query responses.""" | |
| def __init__(self, rows=None, columns=None, lastrowid=None, rowcount=None): | |
| self._rows = rows or [] | |
| self._columns = columns or [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import base64 | |
| import json | |
| import typing | |
| from datetime import datetime | |
| def parse_jwt_token(token_part: str) -> dict[str, typing.Any]: | |
| token_part_with_padding = f"{token_part}{'=' * (len(token_part) % 4)}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import subprocess | |
| import statistics | |
| from collections import defaultdict | |
| from typing import Union | |
| def run_kubectl_top( | |
| all_namespaces: bool = False, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from celery.task.control import revoke | |
| from celery.task.control import inspect | |
| def revoke_tasks_by_name(task_name, worker_prefix=''): | |
| """ | |
| Revoke all tasks by the name of the celery task | |
| :param task_name: Name of the celery task | |
| :param worker_prefix: Prefix for the worker |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| from aiohttp import web | |
| from aiohttp import streamer | |
| @streamer | |
| async def file_sender(writer, file_path=None): | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| for i in $(redis-cli keys celery-task-meta-*); do redis-cli EXPIRE "$i" 3600; done; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime | |
| from datetime import timedelta | |
| from uuid import UUID | |
| def time_from_uuid1(u: str) -> datetime: | |
| return datetime(1582, 10, 15) + timedelta(microseconds=UUID(u).time // 10) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime | |
| import time | |
| import kombu.five | |
| from celery.task.control import inspect | |
| def get_stuck_celery_tasks(threshold=600): | |
| i = inspect() | |
| for worker, tasks in i.active().items(): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fastapi==0.70.0 | |
| gino==1.0.1 | |
| pytest==6.2.5 | |
| pytest-asyncio==0.16.0 | |
| requests==2.26.0 |
NewerOlder