Skip to content

Instantly share code, notes, and snippets.

View andgineer's full-sized avatar
💭
Andrey Sorokin, engineer

Andrey Sorokin andgineer

💭
Andrey Sorokin, engineer
View GitHub Profile
@andgineer
andgineer / tasks.py
Created April 5, 2024 04:21
Register `invoke` tasks from all Python files in the directory.
import os
from invoke import Collection
import importlib
tasks_dir = os.path.dirname(os.path.abspath(__file__))
ns = Collection()
for filename in os.listdir(tasks_dir):
if filename.endswith(".py"):
module_name = os.path.splitext(filename)[0]
@andgineer
andgineer / gist:913e7d6b85a1c34db7c8eeca64a8574f
Created March 1, 2024 08:07
Send CloudWatch logs to OpenSech with Kinesis Firehose
const firehoseRole = new iam.Role(this, 'FirehoseDeliveryRole', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
});
domain.grantWrite(firehoseRole);
// Create a Kinesis Data Firehose delivery stream
const deliveryStream = new firehose.CfnDeliveryStream(this, 'LogsDeliveryStream', {
deliveryStreamType: 'DirectPut',
openSearchDestinationConfiguration: {
@andgineer
andgineer / testcontainers_utils.py
Created February 19, 2024 07:40
Dynamically allocate available port. Just excersice - better use DockerContainer ability to do it itself
import socket
from testcontainers.core.container import DockerContainer
class DynamicPortDockerContainer(DockerContainer):
def bind_available_port(self, container_port: int) -> int:
"""Find an available port on the host machine and bind it to the `container_port`.
Return bound host port number.
"""
@andgineer
andgineer / pytest_xdist_scheduler_longest_first.py
Last active February 14, 2024 10:24
pytest-xdist custom scheduler: Executes tests from the priority list first. Placing the longest tests in this list could significantly reduce test suite run time.
def pytest_xdist_make_scheduler(config, log):
"""Create a custom scheduler for pytest-xdist."""
return PriorityListScheduling(config, log)
"""Custom scheduler implementation that run tests from tests/resources/priority_tests.txt first.
For example that enables us to run long tests first and on separate nodes.
To get tests list sorted by duration we can use pytest --durations=200
"""
aws servicediscovery list-services --filters Name=NAMESPACE_ID,Values=${NS_ID} --query "Services[?Name=='${SERVICE_NAME}'].Id" --output text
@andgineer
andgineer / s3du.sh
Last active December 20, 2022 05:52
Equivalent of "du" command for a Amazon S3 bucket's "folder". Shows sizes of "subfolders" in this "folder".
function s3du {
readonly folder_to_scan=${1:?"The argument 's3://bucket/folder_to_scan/' must be specified."}
for subfolder in $(aws s3 ls "${folder_to_scan}" | grep PRE | awk '{print $2}'); do
echo "${folder_to_scan}${subfolder}:"
aws s3 ls "${folder_to_scan}${subfolder}" --recursive \
--human-readable \
--summarize \
| tail -n2
done
@andgineer
andgineer / inputs-script_filter.rb
Created February 11, 2022 12:11
Alfred dictionary: Create Blank workflow, add inputs/Script filter, language Ruby, with input as {query}
require 'json'
dictionary_list = [
{"Яндекс переводчик": "https://translate.yandex.ru/?lang=en-ru&text={query}"},
{"Oxford Dictionary": "https://www.oxfordlearnersdictionaries.com/definition/english/{query}?q={query}"},
{"Cambridge Dictionary": "https://dictionary.cambridge.org/dictionary/english-russian/{query}"}
]
script_filter_items = []
def create_object(s3=None):
session = get_session() if s3 is None else None
with session.create_client(
"s3"
) if s3 is None else contextlib.nullcontext() as s3_temp:
return (s3 or s3_temp).get_object(Bucket=bucket, Key=key)
@andgineer
andgineer / aiobojtocore_client_context.py
Last active October 15, 2021 05:20
aiobojtocore client context
session = get_session()
async with session.create_client("s3") as s3_temp:
s3_object = await s3_temp.get_object(Bucket=bucket, Key=key)
@andgineer
andgineer / async_nullcontext.py
Created October 15, 2021 04:59
async version of nullcontext
session = get_session() if s3 is None else None
async with session.create_client(
"s3"
) if s3 is None else contextlib.AsyncExitStack() as s3_temp:
s3_object = await (s3 or s3_temp).get_object(Bucket=bucket, Key=key)