Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created August 19, 2021 08:08
  • Star 4 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save miohtama/e298f6ecd2c5c9c59b5bdc9e5818ec65 to your computer and use it in GitHub Desktop.
Fast event fetcher and decoder for web3.py
"""Below is an example for faster JSON-RPC event fetcher.
It skips a lot of steps, like converting raw binary values to corresponding numbers (float, ints),
looking up ABI labels and building `AttributedDict` object.
The resulting Event dictitionary is generated faster, but harder to use and you need to know what you are doing.
"""
def _fetch_events_for_all_contracts(
web3,
event,
argument_filters: dict,
from_block: int,
to_block: int) -> Iterable:
"""Get events using eth_getLogs API.
This method is detached from any contract instance.
This is a stateless method, as opposite to createFilter.
It can be safely called against nodes which do not provide eth_newFilter API, like Infura.
"""
if from_block is None:
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock")
# Currently no way to poke this using a public Web3.py API.
# This will return raw underlying ABI JSON object for the event
abi = event._get_event_abi()
# Depending on the used Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
# We just assume the default that you set on Web3 object here.
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec
# Here we need to poke a bit into Web3 internals, as this
# functionality is not exposed by default.
# Construct JSON-RPC raw filter presentation based on human readable Python descriptions
# Namely, convert event names to their keccak signatures
# More information here:
# https://github.com/ethereum/web3.py/blob/e176ce0793dafdd0573acc8d4b76425b6eb604ca/web3/_utils/filters.py#L71
data_filter_set, event_filter_params = construct_event_filter_params(
abi,
codec,
address=argument_filters.get("address"),
argument_filters=argument_filters,
fromBlock=from_block,
toBlock=to_block
)
logger.debug("Querying eth_getLogs with the following parameters: %s", event_filter_params)
# Prepare fast ABI processing
event_abi = abi
log_topics_abi = get_indexed_event_inputs(event_abi)
log_topic_normalized_inputs = normalize_event_input_types(log_topics_abi)
log_topic_types = get_event_abi_types_for_decoding(log_topic_normalized_inputs)
log_topic_names = get_abi_input_names(ABIEvent({'inputs': log_topics_abi}))
log_data_abi = exclude_indexed_event_inputs(event_abi)
log_data_normalized_inputs = normalize_event_input_types(log_data_abi)
log_data_types = get_event_abi_types_for_decoding(log_data_normalized_inputs)
log_data_names = get_abi_input_names(ABIEvent({'inputs': log_data_abi}))
# Call JSON-RPC API on your Ethereum node.
# getLogs() returns raw AttributedDict entries
logs = web3.eth.getLogs(event_filter_params)
def _get_event_data_fast(abi_codec: ABICodec, event_abi: ABIEvent, log_entry: LogReceipt) -> dict:
"""
Given an event ABI and a log entry for that event, return the decoded
event data
"""
if event_abi['anonymous']:
log_topics = log_entry['topics']
elif not log_entry['topics']:
raise MismatchedABI("Expected non-anonymous event to have 1 or more topics")
elif event_abi_to_log_topic(event_abi) != log_entry['topics'][0]:
raise MismatchedABI("The event signature did not match the provided ABI")
else:
log_topics = log_entry['topics'][1:]
if len(log_topics) != len(log_topic_types):
raise LogTopicError("Expected {0} log topics. Got {1}".format(
len(log_topic_types),
len(log_topics),
))
log_data = hexstr_if_str(to_bytes, log_entry['data'])
decoded_log_data = abi_codec.decode_abi(log_data_types, log_data)
# normalized_log_data = map_abi_data(
# BASE_RETURN_NORMALIZERS,
# log_data_types,
# decoded_log_data
#)
decoded_topic_data = [
abi_codec.decode_single(topic_type, topic_data)
for topic_type, topic_data
in zip(log_topic_types, log_topics)
]
#normalized_topic_data = map_abi_data(
# BASE_RETURN_NORMALIZERS,
# log_topic_types,
# decoded_topic_data
#)
#event_args = dict(itertools.chain(
# zip(log_topic_names, normalized_topic_data),
# zip(log_data_names, normalized_log_data),
#))
event_data = {
# 'args': event_args,
'log_data': decoded_log_data,
'topic_data': decoded_topic_data,
'event': event_abi['name'],
'logIndex': log_entry['logIndex'],
'transactionIndex': log_entry['transactionIndex'],
'transactionHash': log_entry['transactionHash'],
'address': log_entry['address'],
'blockHash': log_entry['blockHash'],
'blockNumber': log_entry['blockNumber'],
}
return event_data
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
# Convert raw JSON-RPC log result to human readable event by using ABI data
# More information how processLog works here
# https://github.com/ethereum/web3.py/blob/fbaf1ad11b0c7fac09ba34baff2c256cffe0a148/web3/_utils/events.py#L200
try:
evt = _get_event_data_fast(codec, abi, log)
# evt = get_event_data(codec, abi, log)
except LogTopicError as e:
# Example of a bad Mint event https://etherscan.io/tx/0x8605158f617602523f8bdf7569980bf4d15b4fd88f9e3512c7bffe0ada162f76#eventlog
# Cannot separate Compound Mint and Uniswap Mint events nicely
logger.debug("Mismatched event %s %s", e, log)
continue
# Note: This was originally yield,
# but deferring the timeout exception caused the throttle logic not to work
all_events.append(evt)
return all_events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment