Created
May 7, 2025 13:17
-
-
Save ahmednazir85/578a65ebb60c8235d65dc8a538621843 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
from solana.rpc.api import Client | |
import time | |
import os | |
import boto3 | |
from pathlib import Path | |
from solana.rpc.core import RPCException | |
import toml | |
import shutil | |
def configure_logging(logfile): | |
""" | |
Configure logging to write to the specified log file. | |
""" | |
# Ensure the log directory exists | |
#os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) | |
logging.basicConfig( | |
filename=logfile, | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S" | |
) | |
def copy_directory(src_dir: str, dest_dir: str) -> None: | |
""" | |
Copies all files from src_dir to dest_dir. | |
Creates dest_dir if it does not exist. | |
Args: | |
src_dir (str): The source directory path. | |
dest_dir (str): The destination directory path. | |
""" | |
# Create the destination directory if it doesn't exist | |
os.makedirs(dest_dir, exist_ok=True) | |
# Iterate over all files in the source directory | |
for item in os.listdir(src_dir): | |
src_path = os.path.join(src_dir, item) | |
dest_path = os.path.join(dest_dir, item) | |
# If it's a file, copy it to the destination directory | |
if os.path.isfile(src_path): | |
shutil.copy2(src_path, dest_path) | |
logging.info(f"All files copied from {src_dir} to {dest_dir}") | |
def clear_directory(directory: str) -> None: | |
""" | |
Removes all files and subdirectories within the specified directory. | |
Args: | |
directory (str): The path to the directory to clear. | |
""" | |
# Check if the directory exists | |
if not os.path.exists(directory): | |
logging.info(f"The directory '{directory}' does not exist.") | |
return | |
# Iterate over all items in the directory | |
for item in os.listdir(directory): | |
item_path = os.path.join(directory, item) | |
# If it's a file, remove it | |
if os.path.isfile(item_path) or os.path.islink(item_path): | |
os.remove(item_path) | |
# If it's a directory, remove it and its contents | |
elif os.path.isdir(item_path): | |
shutil.rmtree(item_path) | |
logging.info(f"All contents of '{directory}' have been removed.") | |
def upload_files_to_s3(local_path: str, bucket_name: str, s3_prefix: str, | |
aws_access_key_id: str, aws_secret_access_key: str): | |
""" | |
Uploads a file or folder to an AWS S3 bucket. | |
:param local_path: Path to the file or directory to upload. | |
:param bucket_name: Target S3 bucket name. | |
:param s3_prefix: Optional prefix (folder path) in S3. | |
:param aws_access_key_id: AWS access key ID. | |
:param aws_secret_access_key: AWS secret access key. | |
:param aws_session_token: Optional AWS session token (for temporary credentials). | |
""" | |
session = boto3.Session( | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key | |
) | |
s3_client = session.client('s3') | |
local_path = Path(local_path) | |
if not local_path.exists(): | |
logging.error(f"File path does not exist: {local_path}") | |
return | |
def upload_file(file_path, s3_key): | |
"""Helper function to upload a single file with retries.""" | |
try: | |
logging.info(f"Uploading {file_path} to s3://{bucket_name}/{s3_key}") | |
s3_client.upload_file(str(file_path), bucket_name, s3_key) | |
logging.info(f"Successfully uploaded {file_path} to s3://{bucket_name}/{s3_key}") | |
return | |
except botocore.exceptions.BotoCoreError as e: | |
logging.error(f"Upload attempt failed for {file_path}: {e}") | |
if local_path.is_file(): | |
# Upload a single file | |
s3_key = f"{s3_prefix}/{local_path.name}" if s3_prefix else local_path.name | |
upload_file(local_path, s3_key) | |
else: | |
# Upload a folder | |
for root, _, files in os.walk(local_path): | |
for file in files: | |
file_path = Path(root) / file | |
relative_path = file_path.relative_to(local_path) | |
s3_key = f"{s3_prefix}/{relative_path}" if s3_prefix else str(relative_path) | |
upload_file(file_path, s3_key) | |
def get_current_slot(rpc_url): | |
client = Client(rpc_url) | |
while True: | |
try: | |
response = client.get_slot() | |
return response.value | |
except (RPCException, KeyError, TypeError) as e: | |
logging.warning(f"Failed to fetch current slot. Retrying... Error: {e}") | |
time.sleep(5) # Wait before retrying | |
def monitor_slots(rpc_url, leader_slots, traces_dir_path,traces_temp_copy_path, bucket_name, aws_access_key_id, aws_secret_access_key): | |
slot_time_seconds = 0.4 # Estimated slot time in seconds | |
started_slots = set() # Track slots where traces are uploaded | |
while True: | |
current_slot = get_current_slot(rpc_url) | |
logging.info(f"Current slot: {current_slot}") | |
all_handled = True # Assume all slots are handled unless we find a pending one | |
for slot in leader_slots: | |
if slot in started_slots: | |
continue # Skip slots where traces are already uploaded | |
slots_remaining = slot - current_slot | |
logging.info(f"Target slot={slot}, Current slot={current_slot}, Slots remaining={slots_remaining}.") | |
if -100 <= slots_remaining <= 0: #if slot is older then 100 slots it is too old to upload traces | |
logging.info(f"Slot reached slot {slot}. Uploading traces...") | |
#clear_directory(traces_temp_copy_path) #no need to clear directory as it will be overwritten due to same names | |
copy_directory(traces_dir_path,traces_temp_copy_path) #make a copy so that traces do not get rotated while uploading | |
upload_files_to_s3(traces_temp_copy_path,bucket_name, str(slot), aws_access_key_id, aws_secret_access_key) | |
logging.info(f"upload finished for slot {slot}") | |
started_slots.add(slot) # Mark this slot as handled | |
elif slots_remaining < -100: | |
logging.info(f"{slot} is older then 100 slots skipping ...") | |
started_slots.add(slot) # Mark this slot as handled | |
elif slots_remaining > 0: | |
all_handled = False # There's at least one upcoming slot | |
sleep_time = max(slots_remaining * slot_time_seconds / 2, 3) | |
logging.info(f"Sleeping for {sleep_time:.2f} seconds until closer to slot.") | |
time.sleep(sleep_time) | |
break # Recheck slots after waking up | |
if all_handled: | |
logging.info("All leader slots handled. Terminating the process...") | |
os._exit(0) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Monitor Solana leader slots and upload traces to AWS.") | |
parser.add_argument("--config", type=str, default="config.toml", help="Path to the configuration file (.toml). Defaults to 'config.toml'.") | |
args = parser.parse_args() | |
# Load the config file | |
config = toml.load(args.config) | |
app_config = config['app_config'] | |
# Configure logging | |
configure_logging(app_config['logfile']) | |
leader_slots = sorted([int(slot) for slot in app_config['slots']]) | |
logging.info(f"leader slots {leader_slots}") | |
# Start monitoring | |
monitor_slots( | |
app_config['rpc_url'], | |
leader_slots, | |
app_config['traces_dir_path'], | |
app_config['traces_temp_copy_path'], | |
app_config['bucket_name'], | |
app_config['aws_access_key_id'], | |
app_config['aws_secret_access_key'] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment