Skip to content

Instantly share code, notes, and snippets.

View dnmellen's full-sized avatar

Diego Navarro dnmellen

  • Madrid
View GitHub Profile
@dnmellen
dnmellen / function_tools.py
Created September 20, 2013 10:42
Calls a function and retries N times if there is an exception or the returned value by the function is *value_to_retry*
def retry(callable, callable_args, num_retries=5, sleep_time=0, value_to_retry='f9e40fdf355643dd82cea787ca2e8d3c04966855'):
"""
Calls a function and retries N times if there is an exception or the returned value by the function is *value_to_retry*
:param callable: What you want to execute
:type callable: python callable obj
:param callable_args: Callable's arguments
:type callable_args: tuple
:param num_retries: Numer of retries
:type num_retries: int
@dnmellen
dnmellen / inverval_dates.lua
Last active December 24, 2015 07:19
Returns an array of dates between two given dates. You can select the "step" (year, month, day)
-- Returns an array of dates between `date1` and `date2`
-- The 'step' between dates it can be selected with 3 possible values:
-- (year, month, day)
--
-- ie: get_interval_dates(20130101, 20130601, "month")
-- -> (201301, 201302, 201303, 201304, 201305, 201306)
--
-- Diego Navarro (dnmellen)
function get_interval_dates(date1, date2, step)
local result = {}
@dnmellen
dnmellen / flatten.py
Last active December 29, 2015 04:59
My try for "flatten" with yield from
from collections import Iterable
def flatten(iterable):
for item in iterable:
if isinstance(item, Iterable):
yield from flatten(item)
else:
yield item
@dnmellen
dnmellen / postinstall.md
Last active December 30, 2015 08:49
Post installation PostgresSQL

Useful things to do after postgresql installation

# su - postgre
$ createuser youruser
$ psql
postgres=# ALTER USER youruser WITH PASSWORD 'yoursecretpassword';
postgres=# CREATE DATABASE yourdb owner youruser;
@dnmellen
dnmellen / redis_listener.py
Created January 15, 2014 15:37
Listen Redis pubs with python3 and asyncio
import asyncio
import redis
@asyncio.coroutine
def listener(redis_conn, channels):
pubsub = redis_conn.pubsub()
pubsub.subscribe(channels)
print('Listening redis...')
@dnmellen
dnmellen / timeout_decorator.py
Created January 23, 2014 15:39
Threaded timeout Python decorator
import threading
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def timeout(secs=None):
def my_decorator(target, *args, **kwargs):
@dnmellen
dnmellen / seconds.py
Created January 15, 2015 12:03
Seconds to the next day in python
from datetime import datetime, timedelta
print (datetime(*(datetime.now() + timedelta(days=1)).timetuple()[:3]) - datetime.now()).seconds
@dnmellen
dnmellen / worker_function.py
Created January 31, 2017 15:07
AWS Lambda function that performs an ssh command through a bastion server to another server. The function will be triggered by a Cloudwatch Alarm
import json
import boto3
import paramiko
def worker_handler(event, context):
ALLOWED_HOSTS = [
'host1',
'host2,
@dnmellen
dnmellen / models.py
Last active October 3, 2021 04:14
DynamoDB mixin for Django models: Mix Django fields and DynamoDB fields in your models!
import uuid
import boto3
from decimal import Decimal
from functools import partial
from django.db import models
from django.conf import settings
class UUIDModel(models.Model):
"""
@dnmellen
dnmellen / brokers.py
Created October 20, 2020 10:33
Improved EagerBroker for dramatiq
from dramatiq.brokers.stub import StubBroker
class EagerBroker(StubBroker):
"""Used by tests to simulate CELERY_ALWAYS_EAGER behavior.
https://github.com/Bogdanp/dramatiq/issues/195
Modified by @dnmellen to support pipelines and groups
"""
def process_message(self, message):