This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| env LINE_PROFILE=1 python 00_tricky.py | |
| python -m line_profiler -rtmz profile_output.lprof | |
| """ | |
| import time | |
| from enum import StrEnum | |
| from line_profiler import profile | |
| class State(StrEnum): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # tested on Python3.9 with just injector installed (pip install injector==0.18.4) | |
| from dataclasses import dataclass | |
| from typing import TypeVar, Generic | |
| from injector import Injector, Module, provider | |
| TCommand = TypeVar("TCommand") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| redis_connection = StrictRedis(host='redis') | |
| # source: https://redis.io/topics/distlock | |
| # This will be used to release lock IF AND ONLY IF we have acquired it | |
| REMOVE_ONLY_IF_OWNER_SCRIPT = \ | |
| """if redis.call("get",KEYS[1]) == ARGV[1] then | |
| return redis.call("del",KEYS[1]) | |
| else | |
| return 0 | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @app.task(soft_time_limit=5, time_limit=10) | |
| def time_limited_task(): | |
| """ | |
| This task has time limits to ensure it will never work for | |
| longer than anticipated. | |
| soft_time_limit is a number of seconds when SoftTimeLimitExceeded | |
| is raised to give a moment to clean up. | |
| time_limit is a number of seconds after task is terminated unconditionally | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def test_dashboard_requires_login(client): | |
| user = User.objects.create(...) # 1 | |
| response = client.get('/dashboard') # 2 | |
| self.assertRedirects(response, '/accounts/login') # 3 | |
| assert user.last_access_attempt = ... # 4 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule Router do | |
| def get do | |
| end | |
| def owner_id_currency_to_name(owner_id, currency_id) do | |
| try do | |
| String.to_existing_atom("wallet-#{owner_id}-#{currency_id}") | |
| rescue | |
| ArgumentError -> String.to_atom("wallet-#{owner_id}-#{currency_id}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule StackAgent do | |
| use Agent | |
| def start(initial_value) do | |
| Agent.start(fn -> initial_value end, name: __MODULE__) | |
| end | |
| def value() do | |
| Agent.get(__MODULE__, fn state -> state end) | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule IntegrationTest do | |
| use ExUnit.Case, async: true | |
| use Plug.Test | |
| alias Wallets.Router | |
| @opts Router.init([]) | |
| test "getting balance creates new wallet with balance 0" do | |
| conn = conn(:get, "/balance") | |
| |> Router.call(@opts) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule Wallets.Router do | |
| use Plug.Router | |
| plug(:match) | |
| plug(:fetch_query_params) | |
| plug(:dispatch) | |
| get "/balance" do | |
| send_resp(conn, 200, "Not implemented") | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| class EchoClientProtocol(asyncio.Protocol): | |
| def __init__(self, message, loop): | |
| self.message = message | |
| self.loop = loop | |
| def connection_made(self, transport): | |
| transport.write(self.message.encode()) | |
| print('Data sent: {!r}'.format(self.message)) |
NewerOlder