Skip to content

Instantly share code, notes, and snippets.

View ods's full-sized avatar
🇺🇦
Stand with Ukraine

Denis Otkidach ods

🇺🇦
Stand with Ukraine
View GitHub Profile
@ods
ods / maybe_raises.py
Created August 11, 2022 11:54
pytest.raises() variation accepting None for cases when no exception is expected
from contextlib import nullcontext
from typing import ContextManager, Optional, Union
import pytest
def maybe_raises(
exception: Union[type[BaseException], tuple[type[BaseException], ...]]
) -> ContextManager[Optional[pytest.ExceptionInfo[BaseException]]]:
if exception is None:
import asyncio
from contextlib import AsyncExitStack, ExitStack
import contextvars
import functools
import inspect
import sys
__all__ = ['exit_stack']
import time, asyncio, hashlib
def sync_cpu_bound():
s = bytes(range(256))
for i in range(100000):
hashlib.sha256(s)
start = time.time()
sync_cpu_bound()
@ods
ods / bench_asyncio_lock.py
Created September 27, 2018 12:43
Benchmark for PR to serialize execution of SQL in asyncpg: https://github.com/MagicStack/asyncpg/pull/367
"""
Benchmark for PR to serialize execution of SQL in asyncpg.
See: https://github.com/MagicStack/asyncpg/pull/367
"""
import asyncio
from asyncio import Lock
class _Atomic:
@ods
ods / test2.py
Last active January 1, 2016 07:19 — forked from anonymous/test2.py
registry = Registry()
ReplicatedModel = registry.multidb(
admin=AdminBase,
front=FrontBase,
)
@registry.model(ReplicatedModel)
def Media(models):
@ods
ods / sa_merge_m2m_backref.py
Last active December 24, 2015 12:09
This schema fails as well as relationship definitions on both side with back_populates. Relationship definitions on both side without back_populates work fine. Exception is: sqlalchemy.orm.exc.FlushError: New instance <B at 0x2135e90> with identity key (<class '__main__.B'>, (1,)) conflicts with persistent instance <B at 0x212d850> Commenting th…
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker, Session, relationship
Base = declarative_base()
class AB(Base):
__tablename__ = 'AB'
a_id = Column(ForeignKey('A.id'), primary_key=True)
@ods
ods / sa_dict_collection.py
Created August 28, 2013 15:59
Custom collection replication prove of concept
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.orm.collections import attribute_mapped_collection
Base = declarative_base()
class Parent(Base):
__tablename__ = 'parents'
id = Column(Integer, primary_key=True)
@ods
ods / replicate_auto_relations.py
Created August 21, 2013 16:44
Automatically discover relationships to replicate
#!/usr/bin/python2.7
import path_config
from app import db_maker
db = db_maker()
from models import admin
from models.admin.base import AdminFrontModel
from sqlalchemy.orm.attributes import manager_of_class
@ods
ods / path_config.py
Created June 20, 2013 13:46
A possible implementation of path_config.py to quickly get path configuration in all subsites of our typical iktomi-based project. This module is placed in code root and symlinked to all subsite roots. A script in subsite root requires just the following single line: import path_config
import os, sys, warnings
def _get_root():
# Only .py file is symbolic link, while compiled module can be placed
# locally
name, ext = os.path.splitext(__file__)
fn = os.path.abspath(name +'.py')
if os.path.islink(fn):
# Symlinks are relative to target, not working dir
fn = os.path.join(os.path.dirname(fn), os.readlink(fn))
@ods
ods / gist:5481419
Created April 29, 2013 12:54
Test case for implicit merge in SQLAlchemy when PK is set in transient object
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, ForeignKey, create_engine
from sqlalchemy.orm import sessionmaker, relationship
Base = declarative_base()
class Parent(Base):
__tablename__ = 'parents'
id = Column(Integer, primary_key=True)
#children = relationship('Child', cascade='all') # doesn't merge