Skip to content

Instantly share code, notes, and snippets.

View ikonst's full-sized avatar

Ilya Priven ikonst

  • New York, NY
  • 05:07 (UTC -04:00)
View GitHub Profile
@ikonst
ikonst / pgPoolTracing.ts
Created September 13, 2023 14:55
Patches pg-pool to trace with dd-trace
import tracer, { Span } from "dd-trace";
import pg from "pg";
/**
* Patches pg.Pool to add tracing.
*/
export function patchPgPool(serviceName: string) {
const name = "pg-pool.connect";
const service = `${serviceName}-postgres`;
@ikonst
ikonst / LogStream.test.ts
Last active September 16, 2023 01:47
Bunyan log size limiting stream
import bunyan from "bunyan";
import fs from "fs";
import { LogStream, trimStringToJsonLength } from "LogStream";
describe("LogStream", () => {
const outputStream = fs.createWriteStream("/dev/null");
const logger = bunyan.createLogger({
name: "test",
stream: new LogStream({ stream: outputStream, maxLength: 1000 }),
});
from typing import Any
from typing import Dict
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from pynamodb.attributes import MapAttribute
MapT = TypeVar('MapT', bound=MapAttribute)
@ikonst
ikonst / mocker.py
Created November 11, 2021 02:45
pytest_mock with patch.method
import gc
import sys
import types
import unittest.mock
from typing import Any
from typing import Callable
from typing import Generator
from typing import Optional
from typing import TYPE_CHECKING
class Foo():
message = 'foo'
def bar(self):
print('bar')
from unittest.mock import patch
def baz(self):
print(self.message)
from datetime import datetime
import pytz
import dateutil.tz
from pynamodb.models import Model
from pynamodb.attributes import NumberAttribute, UTCDateTimeAttribute
class MyModel(Model):
class Meta:
table_name = 'my_model'
<?php
use DateTime;
use DrSlump\Protobuf;
use google\protobuf\Timestamp;
class PhpArrayModernCodec extends \DrSlump\Protobuf\Codec\PhpArray {
protected function encodeMessage(Protobuf\Message $message)
{
if ($message instanceof Timestamp) {
from contextlib import contextmanager
from functools import wraps
from socket import socket as socket_type, MSG_PEEK
from typing import Set
import flask
import gevent
def _abort_on_eof_watchdog(socket: socket_type, greenlets: Set[gevent.Greenlet]) -> bool:
@ikonst
ikonst / abort_on_dc.py
Last active May 14, 2019 19:29
Example of a gevent-based Flask server that aborts when downstream disconnects
"""
Implements a behavior similar to:
- Go srv's cancelation of context on downstream disconnect
- PHP (unless `ignore_user_abort` is true)
"""
import gevent.monkey
gevent.monkey.patch_all()
from contextlib import contextmanager
from flask import Flask
@ikonst
ikonst / mongoengine_allow_inheritance_be_careful.py
Created December 19, 2017 20:09
One doesn't just add "allow_inheritance=True"
from mongoengine import Document, StringField, ListField, EmbeddedDocument, EmbeddedDocumentField
# ******** before remodel: **********
class FailedChargeAttempt_Old(EmbeddedDocument):
id = StringField()
failed_reason = StringField()