Skip to content

Instantly share code, notes, and snippets.

# tested on Python3.9 with just injector installed (pip install injector==0.18.4)
from dataclasses import dataclass
from typing import TypeVar, Generic
from injector import Injector, Module, provider
TCommand = TypeVar("TCommand")
redis_connection = StrictRedis(host='redis')
# source: https://redis.io/topics/distlock
# This will be used to release lock IF AND ONLY IF we have acquired it
REMOVE_ONLY_IF_OWNER_SCRIPT = \
"""if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
@app.task(soft_time_limit=5, time_limit=10)
def time_limited_task():
"""
This task has time limits to ensure it will never work for
longer than anticipated.
soft_time_limit is a number of seconds when SoftTimeLimitExceeded
is raised to give a moment to clean up.
time_limit is a number of seconds after task is terminated unconditionally
"""
def test_dashboard_requires_login(client):
user = User.objects.create(...) # 1
response = client.get('/dashboard') # 2
self.assertRedirects(response, '/accounts/login') # 3
assert user.last_access_attempt = ... # 4
defmodule Router do
def get do
end
def owner_id_currency_to_name(owner_id, currency_id) do
try do
String.to_existing_atom("wallet-#{owner_id}-#{currency_id}")
rescue
ArgumentError -> String.to_atom("wallet-#{owner_id}-#{currency_id}")
defmodule StackAgent do
use Agent
def start(initial_value) do
Agent.start(fn -> initial_value end, name: __MODULE__)
end
def value() do
Agent.get(__MODULE__, fn state -> state end)
end
defmodule IntegrationTest do
use ExUnit.Case, async: true
use Plug.Test
alias Wallets.Router
@opts Router.init([])
test "getting balance creates new wallet with balance 0" do
conn = conn(:get, "/balance")
|> Router.call(@opts)
defmodule Wallets.Router do
use Plug.Router
plug(:match)
plug(:fetch_query_params)
plug(:dispatch)
get "/balance" do
send_resp(conn, 200, "Not implemented")
end
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
from app import router
@router('/index')
def fun(request):
lines = request.get('lines')
splitted = [l.split() for l in lines]
return str(splitted)