Skip to content

Instantly share code, notes, and snippets.

@hackaugusto
Last active October 13, 2020 11:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hackaugusto/088c31ba95c74355989f5a42b9b0a6e0 to your computer and use it in GitHub Desktop.
Save hackaugusto/088c31ba95c74355989f5a42b9b0a6e0 to your computer and use it in GitHub Desktop.
"""Types used to represent transaction and its states.
Each state of a transaction is described by a different wrapper dataclass, this
is used to introduce new fields in a type safe manner, and to enforce correct
usage.
Transactions are by design immutable and don't have access in their field to
objects that do IO, that is why the RPC class is given as an argument to each
method, when necessary.
"""
import heapq
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from threading import Event, Lock, RLock
from typing import (Any, Dict, Generic, Iterable, List, NewType, Optional,
Tuple, TypeVar, Union)
from gevent import AsyncResult
from web3 import Web3
T = TypeVar("T", bound="Transaction")
E = TypeVar("E", bound="GasErrors")
K = TypeVar("K", bound="ExecutionErrors")
ABI = Dict[str, Any]
Gas = NewType("Gas", int)
TransactionToken = NewType("TransactionToken", bytes)
AccountETHBalance = NewType("AccountETHBalance", int)
GasPrice = NewType("GasPrice", int)
TransactionHash = NewType("TransactionHash", bytes)
Nonce = NewType("Nonce", int)
BlockNumber = NewType("BlockNumber", int)
BlocksToMineTransaction = NewType("BlocksToMineTransaction", int)
BlockHash = NewType("BlockHash", int)
Address = NewType("Address", bytes)
TokenNetworkAddress = NewType("TokenNetworkAddress", bytes)
TokenAmount = NewType("TokenAmount", int)
ChannelID = NewType("ChannelID", int)
# TBD
ConfirmedTransactionReceipt = Any
Web3Transaction = Any
FAILURE = "FAILURE"
MINED = "MINED"
EMPTY_RECEIPT = object()
def get_available_nonce(address_checksumed: Address, web3: Web3) -> Nonce:
return Nonce(web3.eth.getTransactionCount(address_checksumed, "pending"))
def random_token() -> TransactionToken:
# should produce an unique token to identify a queue transaction
return TransactionToken(b"")
# Just to mark the code that is handling pruned blocks
class PrunedBlock(Exception):
pass
class InsufficientFunds(Exception):
"""Exception raised if the user doesn't have enough balance to send a batch
of transactions.
"""
class UnrecoverableError(Exception):
"""Exception raised if an unexpected remote error happened."""
class PreconditionError(Exception):
"""Exception raised if the transaction was invalid at creation time, this
means the user code is not doing proper validation.
"""
class ConfirmedBlock:
block_number: BlockNumber
block_hash: BlockHash
class Signer:
"""Class used to abstract the signing.
This will potentially do IO if remote signing is used, so exposure of this
class has to be limited.
"""
address: Address
def sign(self, data):
pass
class GasErrors(ABC, Enum):
"""Type to agreggate descriptions for a failed estimate gas. These reasons
are always safe, if an unsafe problem is encoutered UnrecoverableError
should be used instead.
"""
class ExecutionErrors(ABC, Enum):
pass
@dataclass(frozen=True)
class TransactionFailedConfirmed(Generic[T, K]):
"""Represents a failed transaction that has been mined and confirmed."""
tx: T
gas: Gas
should_be_mined_in_blocks: BlocksToMineTransaction
token: TransactionToken
target_block: BlockNumber
tx_hash: TransactionHash
nonce: Nonce
receipt: ConfirmedTransactionReceipt
reason: K
@dataclass(frozen=True)
class TransactionSuccessfulConfirmed(Generic[T]):
"""Represents a successful transaction that has been mined and confirmed."""
tx: T
gas: Gas
should_be_mined_in_blocks: BlocksToMineTransaction
token: TransactionToken
target_block: BlockNumber
tx_hash: TransactionHash
nonce: Nonce
receipt: ConfirmedTransactionReceipt
@dataclass(frozen=True)
class TransactionConfirmed(Generic[T]):
"""Represents a transaction that has been mined and confirmed."""
tx: T
gas: Gas
should_be_mined_in_blocks: BlocksToMineTransaction
token: TransactionToken
target_block: BlockNumber
tx_hash: TransactionHash
nonce: Nonce
receipt: ConfirmedTransactionReceipt
def transaction_failed(
self, web3: Web3, receipt: ConfirmedTransactionReceipt
) -> TransactionFailedConfirmed[T, K]:
"""Checks why the mined transaction failed.
Returns:
K: An enum describing the race condition.
Raises:
UnrecoverableError: If the gas estimation failed for something but a race condition.
"""
reason = self.tx.transaction_failed(web3, receipt)
return TransactionFailedConfirmed(
self.tx,
self.gas,
self.should_be_mined_in_blocks,
self.token,
self.target_block,
self.tx_hash,
self.nonce,
receipt,
reason,
)
@dataclass(frozen=True)
class TransactionReplaced(Generic[T]):
"""Represents a transaction that has been replaced."""
tx: T
gas: Gas
should_be_mined_in_blocks: BlocksToMineTransaction
token: TransactionToken
target_block: BlockNumber
tx_hash: TransactionHash
nonce: Nonce
@dataclass(frozen=True)
class TransactionSent(Generic[T]):
"""Represents a transaction that has been chosen from the pending queue to
be sent to the network.
There is no time guarantee on how long it will take for the transaction to
be mined, and it is possible the transaction will be replaced.
"""
tx: T
gas: Gas
should_be_mined_in_blocks: BlocksToMineTransaction
token: TransactionToken
target_block: BlockNumber
tx_hash: TransactionHash
nonce: Nonce
@dataclass(frozen=True)
class TransactionQueued(Generic[T]):
"""Represents a transaction that is in the local queue, waiting for it to
be sent.
At this point, this transaction has:
- The preconditions checked
- The gas successfully estimated
- The account's balance checked
There is no time guarantee on how long it will take for the transaction to
be mined, and it is possible the transaction will be replaced.
"""
tx: T
gas: Gas
should_be_mined_in_blocks: BlocksToMineTransaction
token: TransactionToken
@dataclass(frozen=True)
class TransactionInvalidated(Generic[T, E]):
"""Represents a transaction that was valid but cannot be sent to the
blockchain because gas estimation failed.
This must only be used for expected race conditions, if the error is
unexpected a UnrecoverableError should be raised instead.
"""
tx: T
reason: E
@dataclass(frozen=True)
class TransactionRacing(Generic[T]):
"""Represents a transaction that may become invalid, and currently it is
racing against another transaction in an unconfirmed block.
This is a transient state, and the caller has to wait a few blocks to know
determine what happened.
"""
tx: T
@dataclass(frozen=True)
class TransactionGasEstimated(Generic[T]):
"""Represents a transaction that is valid and has a gas estimation.
This is ready to be sent.
"""
tx: T
gas: Gas
@dataclass(frozen=True)
class TransactionValidated(Generic[T]):
"""Represents a transaction that is valid in respect to the
trigerring_block, and is ready to have the gas estimated.
"""
tx: T
def gas_estimation_failed(
self, web3: Web3, block: ConfirmedBlock
) -> TransactionInvalidated[T, E]:
"""Checks why the gas estimation failed at `block`.
Returns:
E: An enum describing the race condition.
Raises:
UnrecoverableError: If the gas estimation failed for something but a race condition.
"""
reason = self.tx.gas_estimation_failed(web3, block)
return TransactionInvalidated(self.tx, reason)
@dataclass(frozen=True)
class TransactionInitial(Generic[T]):
"""This is just a new transaction, nothing is known about it yet. The only
valid action at this point is to validate the transaction against the
trigerring block.
"""
tx: T
def preconditions(self, web3: Web3) -> TransactionValidated[T]:
""" Raises if the preconditions are not satisfied at the given
`trigerring_block`.
Used to check if there are any logic bugs in the business logic
which resulted in an impossible ContractSend*.
Note:
This does not care if the preconditions got invalidated on the
latest block.
raises:
UnrecoverableError: If the preconditions are not satisfied.
"""
self.tx.preconditions(web3)
return TransactionValidated(self.tx)
class Transaction(ABC, Generic[E, K]):
abi: ABI
trigerring_block: ConfirmedBlock
from_address: Address
minimum_gas: Gas
@abstractmethod
def preconditions(self, web3: Web3) -> None:
"""Checks the preconditions of the transaction at block `trigerring_block`.
Raises:
PreconditionError: If any of the preconditions are not valid at
`trigerring_block`.
"""
@abstractmethod
def gas_estimation_failed(self, web3: Web3, block: ConfirmedBlock) -> E:
pass
@abstractmethod
def transaction_failed(self, web3: Web3, receipt: ConfirmedTransactionReceipt) -> K:
pass
class OpenTransaction(Transaction):
class OpenTransactionGasErrors(GasErrors):
CHANNEL_CLOSED = "CHANNEL_CLOSED"
class OpenTransactionExecutionErrors(ExecutionErrors):
CHANNEL_CLOSED = "CHANNEL_CLOSED"
def __init__(
self,
trigerring_block: ConfirmedBlock,
network: TokenNetworkAddress,
participants: Tuple[Address, Address],
) -> None:
self.trigerring_block = trigerring_block
self.network = network
self.participants = participants
def preconditions(self, web3: Web3):
try:
# trigerring_state = self.get_state(self.trigerring_block)
pass
except PrunedBlock:
# ignore pruned blocks
pass
else:
# check_preconditions at trigerring_state
pass
def gas_estimation_failed(
self, web3: Web3, block: ConfirmedBlock
) -> "OpenTransactionGasErrors":
pass
def transaction_failed(
self, web3: Web3, receipt: ConfirmedTransactionReceipt
) -> "OpenTransactionExecutionErrors":
pass
class SetTotalDepositTransaction(Transaction):
def __init__(
self,
channel_identifier: ChannelID,
total_deposit: TokenAmount,
our_address: Address,
partner: Address,
token_network: TokenNetworkAddress,
):
if not isinstance(total_deposit, int):
raise ValueError("total_deposit needs to be an integral number.")
self.channel_identifier = channel_identifier
self.total_deposit = total_deposit
self.our_address = our_address
self.partner = partner
self.token_network = token_network
def preconditions(self, web3: Web3):
proxy = web3.eth.proxy()
try:
onchain_channel_identifier = proxy.get_channel_identifier(
self.trigerring_block
)
current_balance = proxy.balance_of(self.trigerring_block, self.our_address)
previous_total_deposit = proxy.participant_deposit(self.trigerring_block)
except PrunedBlock:
pass
else:
if onchain_channel_identifier != self.channel_identifier:
raise PreconditionError(
"Current channel identifier is outdated. "
f"current={self.channel_identifier}, "
f"new={onchain_channel_identifier}"
)
if self.total_deposit < previous_total_deposit:
msg = (
f"Current total deposit ({previous_total_deposit}) is already larger "
f"than the requested total deposit amount ({self.total_deposit})"
)
raise PreconditionError(msg)
amount_to_deposit = self.total_deposit - previous_total_deposit
if current_balance < amount_to_deposit:
msg = (
f"new_total_deposit - previous_total_deposit = {amount_to_deposit} can not "
f"be larger than the available balance {current_balance}, "
f"for token at address {self.token_network}"
)
raise PreconditionError(msg)
def gas_estimation_failed(self, web3: Web3, block: ConfirmedBlock):
raise UnrecoverableError()
def transaction_failed(self, web3: Web3, receipt: ConfirmedTransactionReceipt):
token = web3.proxy()
proxy = web3.eth.proxy(self.abi)
previous_total_deposit = proxy.participant_deposit(receipt.block)
amount_to_deposit = self.total_deposit - previous_total_deposit
allowance = token.allowance(self.from_address, self.token_network)
if allowance < amount_to_deposit:
log_msg = (
"The allowance is insufficient, "
"check concurrent deposits for the same token network "
"but different proxies."
)
elif token.balance_of(self.from_address) < amount_to_deposit:
log_msg = "The address doesn't have enough funds"
elif previous_total_deposit < self.total_deposit:
log_msg = "The tokens were not transferred"
else:
log_msg = "unknown"
return log_msg
class ApproveTransaction(Transaction):
class ApproveTransactionGasErrors(GasErrors):
"""There are no reasons for approve to fail."""
class ApproveTransactionExecutionErrors(ExecutionErrors):
"""There are no reasons for approve to fail."""
def preconditions(self, web3: Web3):
pass
def gas_estimation_failed(
self, web3: Web3, block: ConfirmedBlock
) -> "ApproveTransactionGasErrors":
raise UnrecoverableError()
def transaction_failed(
self, web3: Web3, receipt: ConfirmedTransactionReceipt
) -> "ApproveTransactionExecutionErrors":
pass
class RunOncePerSignal:
"""Use this to broadcast multiple signals to consumers, each signal will be
delivered at-most-once.
This primitive is intended to be used in loops, where the broadcaster
allows the consumers to do at most one unit of work per signal.
The consumers should use this in a hot loop:
while event.ready():
do_work_once()
Then for every time the producer sets the event, the loop will run at most
once.
"""
def __init__(self) -> None:
self._run_consumers = Event()
self._wait_for_reset = Event()
# This is a new instance, there is nothing to reset yet.
self._wait_for_reset.set()
def ready(self) -> None:
"""Method used by consumers to wait for a new event."""
# Stop fast consumers until the next iteration is ready.
self._wait_for_reset.wait()
# Run one iteration
self._run_consumers.wait()
def signal(self) -> None:
"""Method used by producer to allow all consumer to do one unit of work."""
# Reset the wait signal, this blocks the faster consumers so that they
# don't run more than once.
self._wait_for_reset.clear()
# This should allow all consumers to run. I.e. it should context switch
# to all consumers, if it does not, then the signal may not be
# processed.
self._run_consumers.set()
# End the previous iteration. If a task was not awaken yet, then it is
# possible it will not run.
self._run_consumers.clear()
# Allow all consumers to wait for the next iteration.
self._wait_for_reset.set()
class TransactionManager:
# Available ether in the users account after all the pending transactions
# are mined
eth_available: AccountETHBalance
def __init__(self, web3: Web3, signer: Signer) -> None:
self.web3 = web3
self.signer = signer
self.transaction_queue: List[TransactionQueued] = list()
self.concurrent_transactions = 3
# This lock protects the account's nonce and estimated balance
self._available_nonce = get_available_nonce(signer.address, web3)
self._send_lock = RLock()
# This lock is used to make updates to block and confirmed block
# atomic. In some circumstances it is fine to read _latest_block_number
# without the acquiring lock, since this value is strictly monotonic.
self._block_lock = Lock()
self._latest_block_number: BlockNumber
self._latest_block_hash: BlockHash
self._confirmation_blocks: BlockNumber
# Result for polling threads
self._token_to_asyncresult: Dict[TransactionToken, AsyncResult] = dict()
self._heap: List[TransactionSent] = list()
def new_block_callback(self, block_number: BlockNumber, block_hash: BlockHash):
""" When a new block is mined, upgrade local state and check if a
transaction has been mined.
This should be installed as a callback to the AlarmTask.
"""
with self._block_lock:
self._latest_block_number = block_number
self._latest_block_hash = block_hash
if self._heap:
transaction_is_mined = True
while transaction_is_mined:
transaction = self._heap[0]
receipt = self.get_receipt_if_confirmed(transaction)
transaction_is_mined = bool(receipt)
if transaction_is_mined:
confirmed = TransactionConfirmed(
transaction.tx,
transaction.gas,
transaction.should_be_mined_in_blocks,
transaction.token,
transaction.target_block,
transaction.tx_hash,
transaction.nonce,
receipt,
)
self._token_to_asyncresult.pop(transaction.token).set(confirmed)
heapq.heappop(self._heap)
self.maybe_send_few()
def validate(
self, transactions: List[TransactionInitial]
) -> List[TransactionValidated]:
"""Checks the preconditions for collection `transaction` at their
specific trigerring blocks.
Raises:
PreconditionError: If any of the preconditions are not valid.
"""
result: List[TransactionValidated] = [
tx.preconditions(self.web3) for tx in transactions
]
return result
def estimate_gas(
self, transactions: List[TransactionValidated]
) -> Tuple[
List[TransactionInvalidated],
List[TransactionRacing],
List[TransactionGasEstimated],
]:
"""Estimate gas for the transaction against the latest known
*unconfirmed* block number.
Estimating gas of a non-validated transaction is not useful, therefore
this only works `TransactionValidated`.
The gas estimation is done against the unconfirmed block number to make
sure the transaction can be mined on top of latest. However, because of
reorgs, there is a window for race conditions and undetermined
behavior. The problem is that a invalidating transction may not be
confirmed yet, and it can because of reorgs it can be removed from the
canonical chain. In this case gas estimation would only temporarily
invalid. The only solution for this is to wait until the invalidating
transaction is comfirmed.
"""
invalidated: List[TransactionInvalidated] = list()
estimated: List[TransactionGasEstimated] = list()
racing: List[TransactionRacing] = list()
for transaction in transactions:
# It is okay to run estimate gas for different transactions against
# different blocks. But the gas estimation and check of one
# transaction must be against the same block.
with self._block_lock:
block_number = self._latest_block_number
confirmed_block = block_number - self._confirmation_blocks
estimated_gas = self.web3.estimate_gas(block_number, transaction)
if estimated_gas:
start_gas = max(estimated_gas, transaction.tx.minimum_gas)
estimated.append(TransactionGasEstimated(transaction, start_gas))
else:
confirmed_estimated_gas = self.web3.estimate_gas(
confirmed_block, transaction
)
# The transaction is valid against the confirmed block, so
# there must be a transaction in an unconfirmed block that
# invalidates it. At this point it is not know if the
# invalidating transaction will be on the canonical chain or
# not, so waiting is necessary.
if confirmed_estimated_gas:
racing.append(TransactionRacing(transaction))
else:
invalidated.append(
TransactionInvalidated(
transaction,
transaction.gas_estimation_failed(block_number, self.web3),
)
)
return (invalidated, racing, estimated)
def queue(
self, transactions: List[TransactionGasEstimated]
) -> List[TransactionQueued]:
"""Queue a transaction to be sent, the transaction may or may not be
sent right away, this depends on how many are already in flight.
Queueing transactions is necessary to allow for reordering of high
priority transactions, and for transaction batching.
Every transactions in the queue already has its necessary gas allocated
for, this however does not guarantee the transaction will not run out
of gas, because it is possible for the state of the smart contract to
change because of transactions from another users, and change the gas
profile of the function call.
"""
with self._send_lock:
should_be_mined_in_blocks, gas_price = self.gas_price()
eth_required = 0
for tx in transactions:
eth_required += tx.gas * gas_price
if eth_required > self.eth_available:
raise InsufficientFunds()
result: List[TransactionQueued] = [
TransactionQueued(
tx.tx, tx.gas, should_be_mined_in_blocks, random_token()
)
for tx in transactions
]
for queued_tx in result:
assert queued_tx.token not in self._token_to_asyncresult
self._token_to_asyncresult[queued_tx.token] = AsyncResult()
self.transaction_queue.append(queued_tx)
self.eth_available = AccountETHBalance(self.eth_available - eth_required)
self.maybe_send_few()
return result
def maybe_send_few(self) -> None:
with self._send_lock:
should_send_another_transction = (
self.transaction_queue
and len(self._heap) < self.concurrent_transactions
)
if should_send_another_transction:
tx = self.transaction_queue.pop(0)
nonce = self._available_nonce
self._available_nonce = Nonce(self._available_nonce + 1)
tx_packed = tx.tx.build_transaction()
tx_signed = self.signer.sign(tx_packed)
tx_hash = self.web3.send(tx_signed)
sent = TransactionSent(
tx.tx,
tx.gas,
tx.should_be_mined_in_blocks,
tx.token,
BlockNumber(
self._latest_block_number + tx.should_be_mined_in_blocks
),
tx_hash,
nonce,
)
heapq.heappush(self._heap, sent)
def get_receipt_if_confirmed(
self, tx: TransactionSent
) -> Optional[ConfirmedTransactionReceipt]:
# What does eth_getTransactionReceipt return when a transaction is
# replaced?
receipt = self.web3.getTransactionReceipt(tx, self._confirmation_blocks)
return (
receipt.status is MINED
or receipt.block_number + self._confirmation_blocks
>= self._latest_block_number
)
def wait_for_next_confirmed(
self, transactions: List[TransactionQueued]
) -> Iterable[Union[TransactionSuccessfulConfirmed, TransactionFailedConfirmed]]:
"""Function to wait for a transaction to be mined and confirmed.
This function will not return a replaced transaction. When a
transaction is replaced it should be sent again with a higher nonce,
and eventually it will be successful or not.
"""
for tx in transactions:
tx_mined: TransactionConfirmed = self._token_to_asyncresult[tx.token].get()
if tx_mined.receipt.status is FAILURE:
yield tx_mined.tx.transaction_failed(self.web3)
else:
yield TransactionSuccessfulConfirmed(
tx_mined.tx,
tx_mined.gas,
tx_mined.should_be_mined_in_blocks,
tx_mined.token,
tx_mined.target_block,
tx_mined.tx_hash,
tx_mined.nonce,
tx_mined.receipt,
)
def gas_price( # pylint: disable=no-self-use
self
) -> Tuple[BlocksToMineTransaction, GasPrice]:
# The gas price should be computed with a strategy, the target_block
# should be computed based on how long it should take for the
# transaction to be mined under the given strategy
return (BlocksToMineTransaction(5), GasPrice(1_000))
def example():
transactions: List[Transaction] = [
OpenTransaction("", "", ""),
ApproveTransaction(),
]
manager = TransactionManager(Web3(), Signer())
_, _, gas_estimated_transactions = manager.estimate_gas(transactions)
sent_transactions = manager.queue(gas_estimated_transactions)
transactions_iter = iter(sent_transactions)
for _ in manager.wait_for_next_confirmed(transactions_iter):
pass
@hackaugusto
Copy link
Author

hackaugusto commented Sep 18, 2019

This design has to be fleshed out, specially from the typing perspective. This is pushing the features in mypy and may not be possible to bring it to the level necessary. Maybe a simpler approach that instead of using phantom types, just wrap the transactions on dataclasses or tuples should be used.

@hackaugusto
Copy link
Author

hackaugusto commented Sep 19, 2019

I made a second iteration, instead of using Literal[state], which is an experimental feature and probably unknown to most people, I changed it to use just plain old data structures. The idea here is roughly to have a "tagged union" of the states of a transaction. Each dataclass represents one such state and adds the fields that are available. This removes the need for having a single class with lots of None values. Each class also works as a representation of the current state of the transaction, and can be used for the type checking, making sure that all the steps are followed accordingly.

@hackaugusto
Copy link
Author

hackaugusto commented Sep 19, 2019

This iteration also takes raiden-network/raiden#3890 into account. This design seems good enough.

The poller has to be improved, it should be merged with the transaction manager, because it is the one class that knows about invalidation, and on top of that, after a transaction is mined, its state have to be checked and the validate_after_failure have to be incorporated (IOW, it needs the ideas from raiden-network/raiden#3268 )

@hackaugusto
Copy link
Author

This last iteration added a queue and transaction tokens, this can be used to remove the hot loop used for polling, and possible it can be used to cancel transactions if necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment