Skip to content

Instantly share code, notes, and snippets.

View amitripshtos's full-sized avatar

Amit Ripshtos amitripshtos

View GitHub Profile
@amitripshtos
amitripshtos / exception_middleware.py
Created July 18, 2017 08:56
Exception handling middleware for aiohttp (python 3.5)
def json_error(status_code: int, exception: Exception) -> web.Response:
"""
Returns a Response from an exception.
Used for error middleware.
:param status_code:
:param exception:
:return:
"""
return web.Response(
status=status_code,
@amitripshtos
amitripshtos / alternative-scheduler-for-celery.py
Created August 22, 2018 09:53
Alternative to celery beat for people who got burned hard
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job
import json
import logging
from apscheduler.triggers.cron import CronTrigger
import time
from celery import Celery
from typing import List
@amitripshtos
amitripshtos / elasticsearch-py-async-bulk.py
Created October 28, 2018 14:13
Asyncio bulk helper methods for elasticsearch-py-async (python 3.6+) , use it like a regular helper, with AsyncElasticsearch
from __future__ import unicode_literals
import logging
from operator import methodcaller
import asyncio
from elasticsearch.exceptions import TransportError
from elasticsearch.helpers import BulkIndexError, expand_action, _chunk_actions
from elasticsearch.compat import map