Skip to content

Instantly share code, notes, and snippets.

@zomglings
Last active November 11, 2021 21:01
Show Gist options
  • Save zomglings/95e45a1bdf84d0131bf499b4a45acf35 to your computer and use it in GitHub Desktop.
Save zomglings/95e45a1bdf84d0131bf499b4a45acf35 to your computer and use it in GitHub Desktop.
Iterate through past 24 hours of Ethereum blockchain subscription data using Moonstream Python client.
import argparse
import json
import sys
import time
from moonstream.client import Moonstream
ETHEREUM_BLOCKCHAIN_QUERY = "type:ethereum_blockchain"
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Query all ethereum_blockchain events involving an address in your stream for the past 24 hours"
)
parser.add_argument(
"-t",
"--token",
required=True,
help="Access token for Moonstream API (create one at https://moonstream.to/account/tokens)",
)
parser.add_argument(
"-a",
"--address",
required=True,
help="Optional address that you are interested in. Only events involving this address will be shown.",
)
parser.add_argument(
"-o",
"--outfile",
type=argparse.FileType("a"),
default=sys.stdout,
help="File to write output to in JSON lines format",
)
args = parser.parse_args()
now = int(time.time())
# 24 * 60 * 60 is one day's worth of seconds
window_start = now - 24 * 60 * 60
time_step = 5 * 60
query = f"{ETHEREUM_BLOCKCHAIN_QUERY} sub:ethereum_blockchain:{args.address}"
client = Moonstream()
client.authorize(args.token)
for _ in range(3):
try:
latest_events = client.latest_events(query)
break
except:
pass
if not latest_events:
print("No events in your stream!")
sys.exit(0)
latest_event_timestamp = max(
event.get("event_timestamp", -1) for event in latest_events
)
if latest_event_timestamp < window_start:
print("No events in your stream within the past 24 hours!")
sys.exit(0)
with args.outfile:
current_end_time = latest_event_timestamp
while current_end_time > window_start:
start_time = current_end_time - time_step
events = None
for _ in range(3):
try:
events = client.events(
start_time,
current_end_time,
include_start=False,
include_end=True,
q=query,
)
break
except:
pass
if events is None:
raise Exception(
f"Error retrieving events from {start_time} to {current_end_time}"
)
for event in events.get("events", []):
json.dump(event, args.outfile)
current_end_time = max(start_time, window_start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment