Skip to content

Instantly share code, notes, and snippets.

View fabiocerqueira's full-sized avatar

Fabio Cerqueira fabiocerqueira

View GitHub Profile
from functools import wraps
from twisted.internet import defer, reactor
from twisted.python import failure
class DeferTimeoutError(Exception):
pass
@fabiocerqueira
fabiocerqueira / example.py
Created October 7, 2020 13:17
Difference between blocking and non-blocking tasks in python
import time
import asyncio
async def my_blocking_task(task_id):
print(f"running blocking {task_id}")
time.sleep(2) # faking cpu task
async def my_nonblocking_task(task_id):
@fabiocerqueira
fabiocerqueira / bug.py
Created September 18, 2020 16:12
never raise or return on finally blocks (in Python also continue will break things)
from twisted.internet import defer, reactor, threads
@defer.inlineCallbacks
def problem():
1 / 0
yield 1
@defer.inlineCallbacks
@fabiocerqueira
fabiocerqueira / freeze_time_threads.py
Last active August 24, 2020 13:03
This example shows the problem using twisted, threads and freezetime
from datetime import datetime
from functools import wraps
from freezegun import freeze_time
from twisted.internet import defer, reactor, threads
def tx_freeze_time(*ft_args, **ft_kwargs):
def decorator(func):
@fabiocerqueira
fabiocerqueira / callbacks.py
Last active October 30, 2019 11:53
twisted example
from twisted.internet import reactor, defer
from twisted.web.client import Agent
def ping_google():
agent = Agent(reactor)
d = agent.request("GET", "https://www.google.com/")
def check(response):
print("finish request")
@fabiocerqueira
fabiocerqueira / async_executor_process.py
Last active October 20, 2019 12:23
sync running in a process pool executor
import asyncio
from pprint import pprint
from concurrent.futures import ProcessPoolExecutor
import requests
def io_block_socket_call(url):
resp = url, requests.get(url).status_code == 200
return resp
@fabiocerqueira
fabiocerqueira / async_executor.py
Last active October 19, 2019 14:12
Example sync function in a executor
import asyncio
from pprint import pprint
import requests
def io_block_socket_call(url):
resp = url, requests.get(url).status_code == 200
return resp
@fabiocerqueira
fabiocerqueira / app.py
Last active October 15, 2019 19:35
Flask exemplo simples de autenticacao
from functools import wraps
from flask import Flask, escape, request, make_response
app = Flask(__name__)
USERS = {"fabio": "12345", "italo": "abcde"}
def login_required(func):
@wraps(func)
@fabiocerqueira
fabiocerqueira / bloomfilter.py
Last active October 14, 2019 20:30
Example BloomFilter implementation in Python
from hashlib import md5
import redis
class BloomFilterBaseBackend:
def initialize(self, size):
self.size = size
def validate(self, position):
if position < 0 or position >= self.size:
In [82]: stdlib = set(open("stdlib").read().split("\n"))
In [83]: result = set()
In [84]: p = Path('.')