Skip to content

Instantly share code, notes, and snippets.

@ribasushi
Last active January 29, 2023 23:04
Show Gist options
  • Save ribasushi/e841e87a35ca4da61c7580d3e9501ba1 to your computer and use it in GitHub Desktop.
Save ribasushi/e841e87a35ca4da61c7580d3e9501ba1 to your computer and use it in GitHub Desktop.
Simple Filecoin StorageMarkets ETL
#!/bin/bash
set -eu
set -o pipefail
# derive API_INFO from implied defaults if necessary
LOTUS_PATH="${LOTUS_PATH:-$HOME/.lotus}"
LOTUS_CFG_MADDR="$( cat $LOTUS_PATH/api 2>/dev/null || true )"
FULLNODE_API_INFO="${FULLNODE_API_INFO:-${LOTUS_CFG_MADDR:-/ip4/127.0.0.1/tcp/1234/http}}"
FIL_GENESIS_UNIX="${FIL_GENESIS_UNIX:-1598306400}"
FIL_EPOCH_SECONDS="${FIL_EPOCH_SECONDS:-30}"
# derive token + maddr, then host/port
IFS=':' read -r API_PART1 API_PART2 <<<"$FULLNODE_API_INFO"
IFS='/' read -r IGNORE API_NPROTO API_HOST API_TPROTO API_PORT API_APROTO <<<"${API_PART2:-$API_PART1}"
if [[ "$API_NPROTO" == "ip6" ]]; then
API_HOST="\[$API_HOST\]"
fi
export API_URL="http://$API_HOST:$API_PORT/rpc/v0"
die() { echo "$@" 1>&2 ; exit 1 ; }
lotus_apicall() {
local tmpdeals="$( mktemp -t dealstate-etl.XXXXXX )"
trap 'rm -f "$tmpdeals"' RETURN INT TERM EXIT
local input="$( cat )"
local maybe_err="$( curl -m240 -s "$API_URL" -XPOST -H 'Content-Type: application/json' --data "$input" | tee "$tmpdeals" | jq -rc '.error // empty' )"
[[ -z "$maybe_err" ]] && [[ -s "$tmpdeals" ]] || die -e "Error executing '$input'\n${maybe_err:-no result from API call}"
cat "$tmpdeals"
}
export -f lotus_apicall die
target_epoch="${1:-$(( ( $( printf "%(%s)T" -1 ) - $FIL_GENESIS_UNIX ) / $FIL_EPOCH_SECONDS ))}"
export dbfile="$( printf "./fil_market_actor_state_%d_%(%Y-%m-%d_%H-%M-%S)T.sqlite" $target_epoch $(( $target_epoch * $FIL_EPOCH_SECONDS + $FIL_GENESIS_UNIX )) )"
export target_tipset="$(
printf '{ "jsonrpc": "2.0", "id":1, "method": "Filecoin.ChainGetTipSetByHeight", "params": [ %d, null ] }' "$(( "$target_epoch" ))" \
| lotus_apicall | jq -rc .result.Cids
)"
echo "Initializing fresh tables 'deals' and 'clients' in database '$dbfile'... "
sqlite3 "$dbfile" <<<'
CREATE TABLE deals (
deal_id BIGINT NOT NULL PRIMARY KEY,
client_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
piece_cid TEXT NOT NULL,
label TEXT NOT NULL,
piece_size BIGINT NOT NULL,
is_filplus BOOLEAN NOT NULL,
price_per_epoch BIGINT NOT NULL,
provider_collateral BIGINT NOT NULL,
client_collateral BIGINT NOT NULL,
start_epoch INTEGER NOT NULL,
end_epoch INTEGER NOT NULL,
sector_activation_epoch INTEGER,
sector_slash_epoch INTEGER,
start_time DATETIME,
end_time DATETIME,
sector_activation_time DATETIME
);
CREATE TABLE clients (
client_id TEXT UNIQUE NOT NULL,
client_address TEXT UNIQUE NOT NULL
);
'
echo "ETL of StorageMarkets state:
Epoch: $target_epoch
Tipset: $target_tipset
API: $API_URL
... "
printf '{ "jsonrpc": "2.0", "id":1, "method": "Filecoin.StateMarketDeals", "params": [ %s ] }' "$target_tipset" \
| lotus_apicall \
| jq -r ".result | to_entries | .[] | [
.key,
.value.Proposal.Client,
.value.Proposal.Provider,
.value.Proposal.PieceCID.\"/\",
.value.Proposal.Label,
.value.Proposal.PieceSize,
( if .value.Proposal.VerifiedDeal then 1 else 0 end ),
.value.Proposal.StoragePricePerEpoch,
.value.Proposal.ProviderCollateral,
.value.Proposal.ClientCollateral,
.value.Proposal.StartEpoch,
.value.Proposal.EndEpoch,
.value.State.SectorStartEpoch,
.value.State.SlashEpoch,
null,
null,
null
] | @csv" \
| sqlite3 -csv "$dbfile" ".import '|cat -' deals"
echo "Converting epochs... "
sqlite3 "$dbfile" <<<"
PRAGMA synchronous = OFF;
UPDATE deals SET sector_activation_epoch = NULL WHERE sector_activation_epoch = -1;
UPDATE deals SET sector_slash_epoch = NULL WHERE sector_slash_epoch = -1;
UPDATE deals SET
start_time = DATETIME( ( start_epoch * $FIL_EPOCH_SECONDS + $FIL_GENESIS_UNIX ), 'unixepoch', 'utc' ),
end_time = DATETIME( ( end_epoch * $FIL_EPOCH_SECONDS + $FIL_GENESIS_UNIX ), 'unixepoch', 'utc' ),
sector_activation_time = DATETIME( ( sector_activation_epoch * $FIL_EPOCH_SECONDS + $FIL_GENESIS_UNIX ), 'unixepoch', 'utc' )
;
"
echo "Adding indexes... "
sqlite3 "$dbfile" <<<'
PRAGMA synchronous = OFF;
CREATE INDEX idx_client ON deals ( client_id );
CREATE INDEX idx_provider ON deals ( provider_id );
CREATE INDEX idx_piece ON deals ( piece_cid );
CREATE INDEX idx_filp ON deals ( is_filplus );
'
clients_to_resolve="$(
sqlite3 "$dbfile" <<<'SELECT DISTINCT( client_id ) FROM deals'
)"
echo "Resolving $( wc -l <<<"$clients_to_resolve" ) client addresses... "
<<<"$clients_to_resolve" xargs -P64 -n1 -I{} -- bash -c 'echo {},$(
printf "{ \"jsonrpc\": \"2.0\", \"id\":1, \"method\": \"Filecoin.StateAccountKey\", \"params\": [ \"%s\", %s ] }" "{}" "$target_tipset" \
| lotus_apicall \
| jq -rc .result
)' | sqlite3 -csv "$dbfile" ".import '|cat -' clients"
echo "done, took ${SECONDS} seconds. You can now run:
sqlite3 -header -column $dbfile
"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment