Skip to content

Instantly share code, notes, and snippets.

View LowerDeez's full-sized avatar
🇺🇦
Focusing

Oleg Kleshchunov LowerDeez

🇺🇦
Focusing
  • Kyiv, Ukraine
  • 08:30 (UTC +03:00)
View GitHub Profile
@omarryhan
omarryhan / Sanic-Gino-Alembic migrations Setup
Last active May 30, 2020 15:49
Sanic/Gino/Alembic migrations
# Do the first 6 steps only once.
1. pip install --user alembic
2. bash: ``cd {{my_project}} && alembic init alembic``
3. bash: ``text_editor {{my_project}}/alembic.ini``
4. Change: "sqlalchemy.url = postgres://{{username}}:{{password}}@{{address}}/{{db_name}}"
5. bash: ``text_editor {{my_project}}/alembic/env.py``
6. Now, import your metadata/db object from your app.:
# {{my_project}}/{{my_project_dir}}/app.py
# HANDLER
from django.utils.translation import ugettext_lazy as _
from rest_framework.exceptions import ErrorDetail
from rest_framework.views import exception_handler
non_field_errors = 'non_field_errors'
def set_custome_errors(errors):
@adamchainz
adamchainz / double_checked_lock_iterator.py
Created February 28, 2020 17:18
double_checked_lock_iterator.py
# refactor of https://lukeplant.me.uk/blog/posts/double-checked-locking-with-django-orm/
# untested
def double_checked_lock_iterator(queryset):
for item_pk in queryset.values_list("pk", flat=True):
with transaction.atomic():
try:
yield queryset.select_for_update(skip_locked=True).get(id=item_pk)
except queryset.model.DoesNotExist:
pass
@spookylukey
spookylukey / after_fetch_queryset_mixin.py
Created September 2, 2021 20:31
AfterFetchQuerySetMixin for Django
class AfterFetchQuerySetMixin:
"""
QuerySet mixin to enable functions to run immediately
after records have been fetched from the DB.
"""
# This is most useful for registering 'prefetch_related' like operations
# or complex aggregations that need to be run after fetching, but while
# still allowing chaining of other QuerySet methods.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@horodchukanton
horodchukanton / simple_tasks_queue.py
Last active September 24, 2022 11:13
FastAPI Background tasks queue
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta, datetime
from typing import Any, Dict
class SimpleStorageEntry:
_counter: int = 0
id: int