Skip to content

Instantly share code, notes, and snippets.

@ahmednazir85
Created May 7, 2025 13:17
Show Gist options
  • Save ahmednazir85/578a65ebb60c8235d65dc8a538621843 to your computer and use it in GitHub Desktop.
Save ahmednazir85/578a65ebb60c8235d65dc8a538621843 to your computer and use it in GitHub Desktop.
import argparse
import logging
from solana.rpc.api import Client
import time
import os
import boto3
from pathlib import Path
from solana.rpc.core import RPCException
import toml
import shutil
def configure_logging(logfile):
"""
Configure logging to write to the specified log file.
"""
# Ensure the log directory exists
#os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
logging.basicConfig(
filename=logfile,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
def copy_directory(src_dir: str, dest_dir: str) -> None:
"""
Copies all files from src_dir to dest_dir.
Creates dest_dir if it does not exist.
Args:
src_dir (str): The source directory path.
dest_dir (str): The destination directory path.
"""
# Create the destination directory if it doesn't exist
os.makedirs(dest_dir, exist_ok=True)
# Iterate over all files in the source directory
for item in os.listdir(src_dir):
src_path = os.path.join(src_dir, item)
dest_path = os.path.join(dest_dir, item)
# If it's a file, copy it to the destination directory
if os.path.isfile(src_path):
shutil.copy2(src_path, dest_path)
logging.info(f"All files copied from {src_dir} to {dest_dir}")
def clear_directory(directory: str) -> None:
"""
Removes all files and subdirectories within the specified directory.
Args:
directory (str): The path to the directory to clear.
"""
# Check if the directory exists
if not os.path.exists(directory):
logging.info(f"The directory '{directory}' does not exist.")
return
# Iterate over all items in the directory
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
# If it's a file, remove it
if os.path.isfile(item_path) or os.path.islink(item_path):
os.remove(item_path)
# If it's a directory, remove it and its contents
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
logging.info(f"All contents of '{directory}' have been removed.")
def upload_files_to_s3(local_path: str, bucket_name: str, s3_prefix: str,
aws_access_key_id: str, aws_secret_access_key: str):
"""
Uploads a file or folder to an AWS S3 bucket.
:param local_path: Path to the file or directory to upload.
:param bucket_name: Target S3 bucket name.
:param s3_prefix: Optional prefix (folder path) in S3.
:param aws_access_key_id: AWS access key ID.
:param aws_secret_access_key: AWS secret access key.
:param aws_session_token: Optional AWS session token (for temporary credentials).
"""
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
s3_client = session.client('s3')
local_path = Path(local_path)
if not local_path.exists():
logging.error(f"File path does not exist: {local_path}")
return
def upload_file(file_path, s3_key):
"""Helper function to upload a single file with retries."""
try:
logging.info(f"Uploading {file_path} to s3://{bucket_name}/{s3_key}")
s3_client.upload_file(str(file_path), bucket_name, s3_key)
logging.info(f"Successfully uploaded {file_path} to s3://{bucket_name}/{s3_key}")
return
except botocore.exceptions.BotoCoreError as e:
logging.error(f"Upload attempt failed for {file_path}: {e}")
if local_path.is_file():
# Upload a single file
s3_key = f"{s3_prefix}/{local_path.name}" if s3_prefix else local_path.name
upload_file(local_path, s3_key)
else:
# Upload a folder
for root, _, files in os.walk(local_path):
for file in files:
file_path = Path(root) / file
relative_path = file_path.relative_to(local_path)
s3_key = f"{s3_prefix}/{relative_path}" if s3_prefix else str(relative_path)
upload_file(file_path, s3_key)
def get_current_slot(rpc_url):
client = Client(rpc_url)
while True:
try:
response = client.get_slot()
return response.value
except (RPCException, KeyError, TypeError) as e:
logging.warning(f"Failed to fetch current slot. Retrying... Error: {e}")
time.sleep(5) # Wait before retrying
def monitor_slots(rpc_url, leader_slots, traces_dir_path,traces_temp_copy_path, bucket_name, aws_access_key_id, aws_secret_access_key):
slot_time_seconds = 0.4 # Estimated slot time in seconds
started_slots = set() # Track slots where traces are uploaded
while True:
current_slot = get_current_slot(rpc_url)
logging.info(f"Current slot: {current_slot}")
all_handled = True # Assume all slots are handled unless we find a pending one
for slot in leader_slots:
if slot in started_slots:
continue # Skip slots where traces are already uploaded
slots_remaining = slot - current_slot
logging.info(f"Target slot={slot}, Current slot={current_slot}, Slots remaining={slots_remaining}.")
if -100 <= slots_remaining <= 0: #if slot is older then 100 slots it is too old to upload traces
logging.info(f"Slot reached slot {slot}. Uploading traces...")
#clear_directory(traces_temp_copy_path) #no need to clear directory as it will be overwritten due to same names
copy_directory(traces_dir_path,traces_temp_copy_path) #make a copy so that traces do not get rotated while uploading
upload_files_to_s3(traces_temp_copy_path,bucket_name, str(slot), aws_access_key_id, aws_secret_access_key)
logging.info(f"upload finished for slot {slot}")
started_slots.add(slot) # Mark this slot as handled
elif slots_remaining < -100:
logging.info(f"{slot} is older then 100 slots skipping ...")
started_slots.add(slot) # Mark this slot as handled
elif slots_remaining > 0:
all_handled = False # There's at least one upcoming slot
sleep_time = max(slots_remaining * slot_time_seconds / 2, 3)
logging.info(f"Sleeping for {sleep_time:.2f} seconds until closer to slot.")
time.sleep(sleep_time)
break # Recheck slots after waking up
if all_handled:
logging.info("All leader slots handled. Terminating the process...")
os._exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Monitor Solana leader slots and upload traces to AWS.")
parser.add_argument("--config", type=str, default="config.toml", help="Path to the configuration file (.toml). Defaults to 'config.toml'.")
args = parser.parse_args()
# Load the config file
config = toml.load(args.config)
app_config = config['app_config']
# Configure logging
configure_logging(app_config['logfile'])
leader_slots = sorted([int(slot) for slot in app_config['slots']])
logging.info(f"leader slots {leader_slots}")
# Start monitoring
monitor_slots(
app_config['rpc_url'],
leader_slots,
app_config['traces_dir_path'],
app_config['traces_temp_copy_path'],
app_config['bucket_name'],
app_config['aws_access_key_id'],
app_config['aws_secret_access_key']
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment