-
-
Save hackaugusto/088c31ba95c74355989f5a42b9b0a6e0 to your computer and use it in GitHub Desktop.
"""Types used to represent transaction and its states. | |
Each state of a transaction is described by a different wrapper dataclass, this | |
is used to introduce new fields in a type safe manner, and to enforce correct | |
usage. | |
Transactions are by design immutable and don't have access in their field to | |
objects that do IO, that is why the RPC class is given as an argument to each | |
method, when necessary. | |
""" | |
import heapq | |
from abc import ABC, abstractmethod | |
from dataclasses import dataclass | |
from enum import Enum | |
from threading import Event, Lock, RLock | |
from typing import (Any, Dict, Generic, Iterable, List, NewType, Optional, | |
Tuple, TypeVar, Union) | |
from gevent import AsyncResult | |
from web3 import Web3 | |
T = TypeVar("T", bound="Transaction") | |
E = TypeVar("E", bound="GasErrors") | |
K = TypeVar("K", bound="ExecutionErrors") | |
ABI = Dict[str, Any] | |
Gas = NewType("Gas", int) | |
TransactionToken = NewType("TransactionToken", bytes) | |
AccountETHBalance = NewType("AccountETHBalance", int) | |
GasPrice = NewType("GasPrice", int) | |
TransactionHash = NewType("TransactionHash", bytes) | |
Nonce = NewType("Nonce", int) | |
BlockNumber = NewType("BlockNumber", int) | |
BlocksToMineTransaction = NewType("BlocksToMineTransaction", int) | |
BlockHash = NewType("BlockHash", int) | |
Address = NewType("Address", bytes) | |
TokenNetworkAddress = NewType("TokenNetworkAddress", bytes) | |
TokenAmount = NewType("TokenAmount", int) | |
ChannelID = NewType("ChannelID", int) | |
# TBD | |
ConfirmedTransactionReceipt = Any | |
Web3Transaction = Any | |
FAILURE = "FAILURE" | |
MINED = "MINED" | |
EMPTY_RECEIPT = object() | |
def get_available_nonce(address_checksumed: Address, web3: Web3) -> Nonce: | |
return Nonce(web3.eth.getTransactionCount(address_checksumed, "pending")) | |
def random_token() -> TransactionToken: | |
# should produce an unique token to identify a queue transaction | |
return TransactionToken(b"") | |
# Just to mark the code that is handling pruned blocks | |
class PrunedBlock(Exception): | |
pass | |
class InsufficientFunds(Exception): | |
"""Exception raised if the user doesn't have enough balance to send a batch | |
of transactions. | |
""" | |
class UnrecoverableError(Exception): | |
"""Exception raised if an unexpected remote error happened.""" | |
class PreconditionError(Exception): | |
"""Exception raised if the transaction was invalid at creation time, this | |
means the user code is not doing proper validation. | |
""" | |
class ConfirmedBlock: | |
block_number: BlockNumber | |
block_hash: BlockHash | |
class Signer: | |
"""Class used to abstract the signing. | |
This will potentially do IO if remote signing is used, so exposure of this | |
class has to be limited. | |
""" | |
address: Address | |
def sign(self, data): | |
pass | |
class GasErrors(ABC, Enum): | |
"""Type to agreggate descriptions for a failed estimate gas. These reasons | |
are always safe, if an unsafe problem is encoutered UnrecoverableError | |
should be used instead. | |
""" | |
class ExecutionErrors(ABC, Enum): | |
pass | |
@dataclass(frozen=True) | |
class TransactionFailedConfirmed(Generic[T, K]): | |
"""Represents a failed transaction that has been mined and confirmed.""" | |
tx: T | |
gas: Gas | |
should_be_mined_in_blocks: BlocksToMineTransaction | |
token: TransactionToken | |
target_block: BlockNumber | |
tx_hash: TransactionHash | |
nonce: Nonce | |
receipt: ConfirmedTransactionReceipt | |
reason: K | |
@dataclass(frozen=True) | |
class TransactionSuccessfulConfirmed(Generic[T]): | |
"""Represents a successful transaction that has been mined and confirmed.""" | |
tx: T | |
gas: Gas | |
should_be_mined_in_blocks: BlocksToMineTransaction | |
token: TransactionToken | |
target_block: BlockNumber | |
tx_hash: TransactionHash | |
nonce: Nonce | |
receipt: ConfirmedTransactionReceipt | |
@dataclass(frozen=True) | |
class TransactionConfirmed(Generic[T]): | |
"""Represents a transaction that has been mined and confirmed.""" | |
tx: T | |
gas: Gas | |
should_be_mined_in_blocks: BlocksToMineTransaction | |
token: TransactionToken | |
target_block: BlockNumber | |
tx_hash: TransactionHash | |
nonce: Nonce | |
receipt: ConfirmedTransactionReceipt | |
def transaction_failed( | |
self, web3: Web3, receipt: ConfirmedTransactionReceipt | |
) -> TransactionFailedConfirmed[T, K]: | |
"""Checks why the mined transaction failed. | |
Returns: | |
K: An enum describing the race condition. | |
Raises: | |
UnrecoverableError: If the gas estimation failed for something but a race condition. | |
""" | |
reason = self.tx.transaction_failed(web3, receipt) | |
return TransactionFailedConfirmed( | |
self.tx, | |
self.gas, | |
self.should_be_mined_in_blocks, | |
self.token, | |
self.target_block, | |
self.tx_hash, | |
self.nonce, | |
receipt, | |
reason, | |
) | |
@dataclass(frozen=True) | |
class TransactionReplaced(Generic[T]): | |
"""Represents a transaction that has been replaced.""" | |
tx: T | |
gas: Gas | |
should_be_mined_in_blocks: BlocksToMineTransaction | |
token: TransactionToken | |
target_block: BlockNumber | |
tx_hash: TransactionHash | |
nonce: Nonce | |
@dataclass(frozen=True) | |
class TransactionSent(Generic[T]): | |
"""Represents a transaction that has been chosen from the pending queue to | |
be sent to the network. | |
There is no time guarantee on how long it will take for the transaction to | |
be mined, and it is possible the transaction will be replaced. | |
""" | |
tx: T | |
gas: Gas | |
should_be_mined_in_blocks: BlocksToMineTransaction | |
token: TransactionToken | |
target_block: BlockNumber | |
tx_hash: TransactionHash | |
nonce: Nonce | |
@dataclass(frozen=True) | |
class TransactionQueued(Generic[T]): | |
"""Represents a transaction that is in the local queue, waiting for it to | |
be sent. | |
At this point, this transaction has: | |
- The preconditions checked | |
- The gas successfully estimated | |
- The account's balance checked | |
There is no time guarantee on how long it will take for the transaction to | |
be mined, and it is possible the transaction will be replaced. | |
""" | |
tx: T | |
gas: Gas | |
should_be_mined_in_blocks: BlocksToMineTransaction | |
token: TransactionToken | |
@dataclass(frozen=True) | |
class TransactionInvalidated(Generic[T, E]): | |
"""Represents a transaction that was valid but cannot be sent to the | |
blockchain because gas estimation failed. | |
This must only be used for expected race conditions, if the error is | |
unexpected a UnrecoverableError should be raised instead. | |
""" | |
tx: T | |
reason: E | |
@dataclass(frozen=True) | |
class TransactionRacing(Generic[T]): | |
"""Represents a transaction that may become invalid, and currently it is | |
racing against another transaction in an unconfirmed block. | |
This is a transient state, and the caller has to wait a few blocks to know | |
determine what happened. | |
""" | |
tx: T | |
@dataclass(frozen=True) | |
class TransactionGasEstimated(Generic[T]): | |
"""Represents a transaction that is valid and has a gas estimation. | |
This is ready to be sent. | |
""" | |
tx: T | |
gas: Gas | |
@dataclass(frozen=True) | |
class TransactionValidated(Generic[T]): | |
"""Represents a transaction that is valid in respect to the | |
trigerring_block, and is ready to have the gas estimated. | |
""" | |
tx: T | |
def gas_estimation_failed( | |
self, web3: Web3, block: ConfirmedBlock | |
) -> TransactionInvalidated[T, E]: | |
"""Checks why the gas estimation failed at `block`. | |
Returns: | |
E: An enum describing the race condition. | |
Raises: | |
UnrecoverableError: If the gas estimation failed for something but a race condition. | |
""" | |
reason = self.tx.gas_estimation_failed(web3, block) | |
return TransactionInvalidated(self.tx, reason) | |
@dataclass(frozen=True) | |
class TransactionInitial(Generic[T]): | |
"""This is just a new transaction, nothing is known about it yet. The only | |
valid action at this point is to validate the transaction against the | |
trigerring block. | |
""" | |
tx: T | |
def preconditions(self, web3: Web3) -> TransactionValidated[T]: | |
""" Raises if the preconditions are not satisfied at the given | |
`trigerring_block`. | |
Used to check if there are any logic bugs in the business logic | |
which resulted in an impossible ContractSend*. | |
Note: | |
This does not care if the preconditions got invalidated on the | |
latest block. | |
raises: | |
UnrecoverableError: If the preconditions are not satisfied. | |
""" | |
self.tx.preconditions(web3) | |
return TransactionValidated(self.tx) | |
class Transaction(ABC, Generic[E, K]): | |
abi: ABI | |
trigerring_block: ConfirmedBlock | |
from_address: Address | |
minimum_gas: Gas | |
@abstractmethod | |
def preconditions(self, web3: Web3) -> None: | |
"""Checks the preconditions of the transaction at block `trigerring_block`. | |
Raises: | |
PreconditionError: If any of the preconditions are not valid at | |
`trigerring_block`. | |
""" | |
@abstractmethod | |
def gas_estimation_failed(self, web3: Web3, block: ConfirmedBlock) -> E: | |
pass | |
@abstractmethod | |
def transaction_failed(self, web3: Web3, receipt: ConfirmedTransactionReceipt) -> K: | |
pass | |
class OpenTransaction(Transaction): | |
class OpenTransactionGasErrors(GasErrors): | |
CHANNEL_CLOSED = "CHANNEL_CLOSED" | |
class OpenTransactionExecutionErrors(ExecutionErrors): | |
CHANNEL_CLOSED = "CHANNEL_CLOSED" | |
def __init__( | |
self, | |
trigerring_block: ConfirmedBlock, | |
network: TokenNetworkAddress, | |
participants: Tuple[Address, Address], | |
) -> None: | |
self.trigerring_block = trigerring_block | |
self.network = network | |
self.participants = participants | |
def preconditions(self, web3: Web3): | |
try: | |
# trigerring_state = self.get_state(self.trigerring_block) | |
pass | |
except PrunedBlock: | |
# ignore pruned blocks | |
pass | |
else: | |
# check_preconditions at trigerring_state | |
pass | |
def gas_estimation_failed( | |
self, web3: Web3, block: ConfirmedBlock | |
) -> "OpenTransactionGasErrors": | |
pass | |
def transaction_failed( | |
self, web3: Web3, receipt: ConfirmedTransactionReceipt | |
) -> "OpenTransactionExecutionErrors": | |
pass | |
class SetTotalDepositTransaction(Transaction): | |
def __init__( | |
self, | |
channel_identifier: ChannelID, | |
total_deposit: TokenAmount, | |
our_address: Address, | |
partner: Address, | |
token_network: TokenNetworkAddress, | |
): | |
if not isinstance(total_deposit, int): | |
raise ValueError("total_deposit needs to be an integral number.") | |
self.channel_identifier = channel_identifier | |
self.total_deposit = total_deposit | |
self.our_address = our_address | |
self.partner = partner | |
self.token_network = token_network | |
def preconditions(self, web3: Web3): | |
proxy = web3.eth.proxy() | |
try: | |
onchain_channel_identifier = proxy.get_channel_identifier( | |
self.trigerring_block | |
) | |
current_balance = proxy.balance_of(self.trigerring_block, self.our_address) | |
previous_total_deposit = proxy.participant_deposit(self.trigerring_block) | |
except PrunedBlock: | |
pass | |
else: | |
if onchain_channel_identifier != self.channel_identifier: | |
raise PreconditionError( | |
"Current channel identifier is outdated. " | |
f"current={self.channel_identifier}, " | |
f"new={onchain_channel_identifier}" | |
) | |
if self.total_deposit < previous_total_deposit: | |
msg = ( | |
f"Current total deposit ({previous_total_deposit}) is already larger " | |
f"than the requested total deposit amount ({self.total_deposit})" | |
) | |
raise PreconditionError(msg) | |
amount_to_deposit = self.total_deposit - previous_total_deposit | |
if current_balance < amount_to_deposit: | |
msg = ( | |
f"new_total_deposit - previous_total_deposit = {amount_to_deposit} can not " | |
f"be larger than the available balance {current_balance}, " | |
f"for token at address {self.token_network}" | |
) | |
raise PreconditionError(msg) | |
def gas_estimation_failed(self, web3: Web3, block: ConfirmedBlock): | |
raise UnrecoverableError() | |
def transaction_failed(self, web3: Web3, receipt: ConfirmedTransactionReceipt): | |
token = web3.proxy() | |
proxy = web3.eth.proxy(self.abi) | |
previous_total_deposit = proxy.participant_deposit(receipt.block) | |
amount_to_deposit = self.total_deposit - previous_total_deposit | |
allowance = token.allowance(self.from_address, self.token_network) | |
if allowance < amount_to_deposit: | |
log_msg = ( | |
"The allowance is insufficient, " | |
"check concurrent deposits for the same token network " | |
"but different proxies." | |
) | |
elif token.balance_of(self.from_address) < amount_to_deposit: | |
log_msg = "The address doesn't have enough funds" | |
elif previous_total_deposit < self.total_deposit: | |
log_msg = "The tokens were not transferred" | |
else: | |
log_msg = "unknown" | |
return log_msg | |
class ApproveTransaction(Transaction): | |
class ApproveTransactionGasErrors(GasErrors): | |
"""There are no reasons for approve to fail.""" | |
class ApproveTransactionExecutionErrors(ExecutionErrors): | |
"""There are no reasons for approve to fail.""" | |
def preconditions(self, web3: Web3): | |
pass | |
def gas_estimation_failed( | |
self, web3: Web3, block: ConfirmedBlock | |
) -> "ApproveTransactionGasErrors": | |
raise UnrecoverableError() | |
def transaction_failed( | |
self, web3: Web3, receipt: ConfirmedTransactionReceipt | |
) -> "ApproveTransactionExecutionErrors": | |
pass | |
class RunOncePerSignal: | |
"""Use this to broadcast multiple signals to consumers, each signal will be | |
delivered at-most-once. | |
This primitive is intended to be used in loops, where the broadcaster | |
allows the consumers to do at most one unit of work per signal. | |
The consumers should use this in a hot loop: | |
while event.ready(): | |
do_work_once() | |
Then for every time the producer sets the event, the loop will run at most | |
once. | |
""" | |
def __init__(self) -> None: | |
self._run_consumers = Event() | |
self._wait_for_reset = Event() | |
# This is a new instance, there is nothing to reset yet. | |
self._wait_for_reset.set() | |
def ready(self) -> None: | |
"""Method used by consumers to wait for a new event.""" | |
# Stop fast consumers until the next iteration is ready. | |
self._wait_for_reset.wait() | |
# Run one iteration | |
self._run_consumers.wait() | |
def signal(self) -> None: | |
"""Method used by producer to allow all consumer to do one unit of work.""" | |
# Reset the wait signal, this blocks the faster consumers so that they | |
# don't run more than once. | |
self._wait_for_reset.clear() | |
# This should allow all consumers to run. I.e. it should context switch | |
# to all consumers, if it does not, then the signal may not be | |
# processed. | |
self._run_consumers.set() | |
# End the previous iteration. If a task was not awaken yet, then it is | |
# possible it will not run. | |
self._run_consumers.clear() | |
# Allow all consumers to wait for the next iteration. | |
self._wait_for_reset.set() | |
class TransactionManager: | |
# Available ether in the users account after all the pending transactions | |
# are mined | |
eth_available: AccountETHBalance | |
def __init__(self, web3: Web3, signer: Signer) -> None: | |
self.web3 = web3 | |
self.signer = signer | |
self.transaction_queue: List[TransactionQueued] = list() | |
self.concurrent_transactions = 3 | |
# This lock protects the account's nonce and estimated balance | |
self._available_nonce = get_available_nonce(signer.address, web3) | |
self._send_lock = RLock() | |
# This lock is used to make updates to block and confirmed block | |
# atomic. In some circumstances it is fine to read _latest_block_number | |
# without the acquiring lock, since this value is strictly monotonic. | |
self._block_lock = Lock() | |
self._latest_block_number: BlockNumber | |
self._latest_block_hash: BlockHash | |
self._confirmation_blocks: BlockNumber | |
# Result for polling threads | |
self._token_to_asyncresult: Dict[TransactionToken, AsyncResult] = dict() | |
self._heap: List[TransactionSent] = list() | |
def new_block_callback(self, block_number: BlockNumber, block_hash: BlockHash): | |
""" When a new block is mined, upgrade local state and check if a | |
transaction has been mined. | |
This should be installed as a callback to the AlarmTask. | |
""" | |
with self._block_lock: | |
self._latest_block_number = block_number | |
self._latest_block_hash = block_hash | |
if self._heap: | |
transaction_is_mined = True | |
while transaction_is_mined: | |
transaction = self._heap[0] | |
receipt = self.get_receipt_if_confirmed(transaction) | |
transaction_is_mined = bool(receipt) | |
if transaction_is_mined: | |
confirmed = TransactionConfirmed( | |
transaction.tx, | |
transaction.gas, | |
transaction.should_be_mined_in_blocks, | |
transaction.token, | |
transaction.target_block, | |
transaction.tx_hash, | |
transaction.nonce, | |
receipt, | |
) | |
self._token_to_asyncresult.pop(transaction.token).set(confirmed) | |
heapq.heappop(self._heap) | |
self.maybe_send_few() | |
def validate( | |
self, transactions: List[TransactionInitial] | |
) -> List[TransactionValidated]: | |
"""Checks the preconditions for collection `transaction` at their | |
specific trigerring blocks. | |
Raises: | |
PreconditionError: If any of the preconditions are not valid. | |
""" | |
result: List[TransactionValidated] = [ | |
tx.preconditions(self.web3) for tx in transactions | |
] | |
return result | |
def estimate_gas( | |
self, transactions: List[TransactionValidated] | |
) -> Tuple[ | |
List[TransactionInvalidated], | |
List[TransactionRacing], | |
List[TransactionGasEstimated], | |
]: | |
"""Estimate gas for the transaction against the latest known | |
*unconfirmed* block number. | |
Estimating gas of a non-validated transaction is not useful, therefore | |
this only works `TransactionValidated`. | |
The gas estimation is done against the unconfirmed block number to make | |
sure the transaction can be mined on top of latest. However, because of | |
reorgs, there is a window for race conditions and undetermined | |
behavior. The problem is that a invalidating transction may not be | |
confirmed yet, and it can because of reorgs it can be removed from the | |
canonical chain. In this case gas estimation would only temporarily | |
invalid. The only solution for this is to wait until the invalidating | |
transaction is comfirmed. | |
""" | |
invalidated: List[TransactionInvalidated] = list() | |
estimated: List[TransactionGasEstimated] = list() | |
racing: List[TransactionRacing] = list() | |
for transaction in transactions: | |
# It is okay to run estimate gas for different transactions against | |
# different blocks. But the gas estimation and check of one | |
# transaction must be against the same block. | |
with self._block_lock: | |
block_number = self._latest_block_number | |
confirmed_block = block_number - self._confirmation_blocks | |
estimated_gas = self.web3.estimate_gas(block_number, transaction) | |
if estimated_gas: | |
start_gas = max(estimated_gas, transaction.tx.minimum_gas) | |
estimated.append(TransactionGasEstimated(transaction, start_gas)) | |
else: | |
confirmed_estimated_gas = self.web3.estimate_gas( | |
confirmed_block, transaction | |
) | |
# The transaction is valid against the confirmed block, so | |
# there must be a transaction in an unconfirmed block that | |
# invalidates it. At this point it is not know if the | |
# invalidating transaction will be on the canonical chain or | |
# not, so waiting is necessary. | |
if confirmed_estimated_gas: | |
racing.append(TransactionRacing(transaction)) | |
else: | |
invalidated.append( | |
TransactionInvalidated( | |
transaction, | |
transaction.gas_estimation_failed(block_number, self.web3), | |
) | |
) | |
return (invalidated, racing, estimated) | |
def queue( | |
self, transactions: List[TransactionGasEstimated] | |
) -> List[TransactionQueued]: | |
"""Queue a transaction to be sent, the transaction may or may not be | |
sent right away, this depends on how many are already in flight. | |
Queueing transactions is necessary to allow for reordering of high | |
priority transactions, and for transaction batching. | |
Every transactions in the queue already has its necessary gas allocated | |
for, this however does not guarantee the transaction will not run out | |
of gas, because it is possible for the state of the smart contract to | |
change because of transactions from another users, and change the gas | |
profile of the function call. | |
""" | |
with self._send_lock: | |
should_be_mined_in_blocks, gas_price = self.gas_price() | |
eth_required = 0 | |
for tx in transactions: | |
eth_required += tx.gas * gas_price | |
if eth_required > self.eth_available: | |
raise InsufficientFunds() | |
result: List[TransactionQueued] = [ | |
TransactionQueued( | |
tx.tx, tx.gas, should_be_mined_in_blocks, random_token() | |
) | |
for tx in transactions | |
] | |
for queued_tx in result: | |
assert queued_tx.token not in self._token_to_asyncresult | |
self._token_to_asyncresult[queued_tx.token] = AsyncResult() | |
self.transaction_queue.append(queued_tx) | |
self.eth_available = AccountETHBalance(self.eth_available - eth_required) | |
self.maybe_send_few() | |
return result | |
def maybe_send_few(self) -> None: | |
with self._send_lock: | |
should_send_another_transction = ( | |
self.transaction_queue | |
and len(self._heap) < self.concurrent_transactions | |
) | |
if should_send_another_transction: | |
tx = self.transaction_queue.pop(0) | |
nonce = self._available_nonce | |
self._available_nonce = Nonce(self._available_nonce + 1) | |
tx_packed = tx.tx.build_transaction() | |
tx_signed = self.signer.sign(tx_packed) | |
tx_hash = self.web3.send(tx_signed) | |
sent = TransactionSent( | |
tx.tx, | |
tx.gas, | |
tx.should_be_mined_in_blocks, | |
tx.token, | |
BlockNumber( | |
self._latest_block_number + tx.should_be_mined_in_blocks | |
), | |
tx_hash, | |
nonce, | |
) | |
heapq.heappush(self._heap, sent) | |
def get_receipt_if_confirmed( | |
self, tx: TransactionSent | |
) -> Optional[ConfirmedTransactionReceipt]: | |
# What does eth_getTransactionReceipt return when a transaction is | |
# replaced? | |
receipt = self.web3.getTransactionReceipt(tx, self._confirmation_blocks) | |
return ( | |
receipt.status is MINED | |
or receipt.block_number + self._confirmation_blocks | |
>= self._latest_block_number | |
) | |
def wait_for_next_confirmed( | |
self, transactions: List[TransactionQueued] | |
) -> Iterable[Union[TransactionSuccessfulConfirmed, TransactionFailedConfirmed]]: | |
"""Function to wait for a transaction to be mined and confirmed. | |
This function will not return a replaced transaction. When a | |
transaction is replaced it should be sent again with a higher nonce, | |
and eventually it will be successful or not. | |
""" | |
for tx in transactions: | |
tx_mined: TransactionConfirmed = self._token_to_asyncresult[tx.token].get() | |
if tx_mined.receipt.status is FAILURE: | |
yield tx_mined.tx.transaction_failed(self.web3) | |
else: | |
yield TransactionSuccessfulConfirmed( | |
tx_mined.tx, | |
tx_mined.gas, | |
tx_mined.should_be_mined_in_blocks, | |
tx_mined.token, | |
tx_mined.target_block, | |
tx_mined.tx_hash, | |
tx_mined.nonce, | |
tx_mined.receipt, | |
) | |
def gas_price( # pylint: disable=no-self-use | |
self | |
) -> Tuple[BlocksToMineTransaction, GasPrice]: | |
# The gas price should be computed with a strategy, the target_block | |
# should be computed based on how long it should take for the | |
# transaction to be mined under the given strategy | |
return (BlocksToMineTransaction(5), GasPrice(1_000)) | |
def example(): | |
transactions: List[Transaction] = [ | |
OpenTransaction("", "", ""), | |
ApproveTransaction(), | |
] | |
manager = TransactionManager(Web3(), Signer()) | |
_, _, gas_estimated_transactions = manager.estimate_gas(transactions) | |
sent_transactions = manager.queue(gas_estimated_transactions) | |
transactions_iter = iter(sent_transactions) | |
for _ in manager.wait_for_next_confirmed(transactions_iter): | |
pass |
I made a second iteration, instead of using Literal[state]
, which is an experimental feature and probably unknown to most people, I changed it to use just plain old data structures. The idea here is roughly to have a "tagged union" of the states of a transaction. Each dataclass represents one such state and adds the fields that are available. This removes the need for having a single class with lots of None values. Each class also works as a representation of the current state of the transaction, and can be used for the type checking, making sure that all the steps are followed accordingly.
This iteration also takes raiden-network/raiden#3890 into account. This design seems good enough.
The poller has to be improved, it should be merged with the transaction manager, because it is the one class that knows about invalidation, and on top of that, after a transaction is mined, its state have to be checked and the validate_after_failure
have to be incorporated (IOW, it needs the ideas from raiden-network/raiden#3268 )
This last iteration added a queue and transaction tokens, this can be used to remove the hot loop used for polling, and possible it can be used to cancel transactions if necessary.
This design has to be fleshed out, specially from the typing perspective. This is pushing the features in mypy and may not be possible to bring it to the level necessary. Maybe a simpler approach that instead of using phantom types, just wrap the transactions on dataclasses or tuples should be used.