Skip to content

Instantly share code, notes, and snippets.

@Evan-Kim2028
Last active January 9, 2024 21:50
Show Gist options
  • Save Evan-Kim2028/50fb8e220a7293c406423b789fd0e8f9 to your computer and use it in GitHub Desktop.
Save Evan-Kim2028/50fb8e220a7293c406423b789fd0e8f9 to your computer and use it in GitHub Desktop.
polars_query.py
txs_calldata: pl.LazyFrame = (
pl.scan_parquet("data/raw/transactions/*.parquet")
.select(
"block_number",
"transaction_hash",
"input",
)
.with_columns([pl.col("input").str.len_bytes().alias("calldata_size")])
.with_columns(pl.col("calldata_size") - 1)
.select("transaction_hash", "block_number", "calldata_size")
).with_columns(
pl.col("calldata_size")
.sum()
.over(pl.col("block_number"))
.alias("total_calldata_block_size")
)
# cryo txs
txs: pl.LazyFrame = (
pl.scan_parquet("data/raw/transactions/*.parquet")
.select(
"block_number",
"transaction_index",
"transaction_hash",
"from_address",
"gas_used",
"gas_price",
"max_priority_fee_per_gas",
"max_fee_per_gas",
)
.join(
txs_calldata,
on="transaction_hash",
how="left",
)
)
# Use pattern matching to handle the presence or absence of mempool data
match mempool:
case _ if isinstance(mempool, pl.LazyFrame):
# Join transactions with blocks and mempool data if mempool is provided
combined_df = txs.join(
blocks, on="block_number", how="left", suffix="_block"
).join(
mempool,
right_on="hash",
left_on="transaction_hash",
how="left",
suffix="_mempool",
)
case None:
# Join only transactions with blocks if mempool is not provided
combined_df = txs.join(
blocks, on="block_number", how="left", suffix="_block"
)
# Common processing steps for both scenarios
return (
combined_df.with_columns(
[
# Calculate the transaction gas cost
(pl.col("gas_used") * pl.col("gas_price") / 10**18).alias(
"tx_gas_cost"
),
# Convert epoch timestamp to datetime
pl.from_epoch("timestamp").alias("block_datetime"),
# Calculate the max transaction index per block
pl.col("transaction_index")
.max()
.over(pl.col("block_number"))
.name.suffix("_max"),
# Calculate the gas price premium over the base fee per gas
(pl.col("gas_price") / pl.col("base_fee_per_gas")).alias(
"block_gas_premium"
),
]
)
.with_columns(
# Calculate the transaction index percentile within its block
(pl.col("transaction_index") / pl.col("transaction_index_max") * 100).alias(
"blockspace_percentile"
)
)
.with_columns(
# Round the block space percentile for easier interpretation
(pl.col("blockspace_percentile").round()).alias(
"rounded_blockspace_percentile"
)
)
# unit conversions
.with_columns(
# convert gas to gwei
(pl.col("gas_price") / 10**9),
(pl.col("max_priority_fee_per_gas") / 10**9),
(pl.col("max_fee_per_gas") / 10**9),
(pl.col("base_fee_per_gas") / 10**9),
# convert bytes to kilobytes
(pl.col("calldata_size") / 10**3),
(pl.col("total_calldata_block_size") / 10**3),
)
.fill_nan(0) # Fill NaN values with 0
.unique() # Ensure all rows are unique
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment