This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# tested on Python3.9 with just injector installed (pip install injector==0.18.4) | |
from dataclasses import dataclass | |
from typing import TypeVar, Generic | |
from injector import Injector, Module, provider | |
TCommand = TypeVar("TCommand") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
redis_connection = StrictRedis(host='redis') | |
# source: https://redis.io/topics/distlock | |
# This will be used to release lock IF AND ONLY IF we have acquired it | |
REMOVE_ONLY_IF_OWNER_SCRIPT = \ | |
"""if redis.call("get",KEYS[1]) == ARGV[1] then | |
return redis.call("del",KEYS[1]) | |
else | |
return 0 | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.task(soft_time_limit=5, time_limit=10) | |
def time_limited_task(): | |
""" | |
This task has time limits to ensure it will never work for | |
longer than anticipated. | |
soft_time_limit is a number of seconds when SoftTimeLimitExceeded | |
is raised to give a moment to clean up. | |
time_limit is a number of seconds after task is terminated unconditionally | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_dashboard_requires_login(client): | |
user = User.objects.create(...) # 1 | |
response = client.get('/dashboard') # 2 | |
self.assertRedirects(response, '/accounts/login') # 3 | |
assert user.last_access_attempt = ... # 4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Router do | |
def get do | |
end | |
def owner_id_currency_to_name(owner_id, currency_id) do | |
try do | |
String.to_existing_atom("wallet-#{owner_id}-#{currency_id}") | |
rescue | |
ArgumentError -> String.to_atom("wallet-#{owner_id}-#{currency_id}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule StackAgent do | |
use Agent | |
def start(initial_value) do | |
Agent.start(fn -> initial_value end, name: __MODULE__) | |
end | |
def value() do | |
Agent.get(__MODULE__, fn state -> state end) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule IntegrationTest do | |
use ExUnit.Case, async: true | |
use Plug.Test | |
alias Wallets.Router | |
@opts Router.init([]) | |
test "getting balance creates new wallet with balance 0" do | |
conn = conn(:get, "/balance") | |
|> Router.call(@opts) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Wallets.Router do | |
use Plug.Router | |
plug(:match) | |
plug(:fetch_query_params) | |
plug(:dispatch) | |
get "/balance" do | |
send_resp(conn, 200, "Not implemented") | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
class EchoClientProtocol(asyncio.Protocol): | |
def __init__(self, message, loop): | |
self.message = message | |
self.loop = loop | |
def connection_made(self, transport): | |
transport.write(self.message.encode()) | |
print('Data sent: {!r}'.format(self.message)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from app import router | |
@router('/index') | |
def fun(request): | |
lines = request.get('lines') | |
splitted = [l.split() for l in lines] | |
return str(splitted) | |