Last active
June 11, 2024 07:48
-
-
Save polius/e6e705692e83ea194856252384cc1654 to your computer and use it in GitHub Desktop.
Retrieve all error logs from an Amazon RDS instance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import argparse | |
class main: | |
def __init__(self): | |
# Grab parameters | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--region', required=True, help='AWS region name (eu-west-1)') | |
parser.add_argument('--instance', required=True, help='Database instance name') | |
parser.add_argument('--profile', required=False, help='AWS profile name ($ aws configure --profile customer)') | |
self._args = parser.parse_args() | |
# Create a session with the specified profile | |
session = boto3.Session(profile_name=self._args.profile, region_name=self._args.region) | |
# Create Boto3 clients | |
self._rds = session.client('rds') | |
# Compute | |
self.compute() | |
def compute(self): | |
# List all logs | |
log_files = self.__list_logs() | |
# Get logs content | |
overall_log = '' | |
for x, log_file in enumerate(log_files): | |
print(f"- Processing {x+1}/{len(log_files)}: {log_file['LogFileName']}") | |
log_content = self.__get_log(log_file['LogFileName']) | |
if len(log_content.strip()) != 0: | |
print("- Data found!") | |
overall_log += "----------------------------------------\n" | |
overall_log += f"--> LOG_NAME: {log_file['LogFileName']}\n" | |
overall_log += "----------------------------------------\n" | |
overall_log += f"{log_content}\n" | |
# Store logs | |
with open(self._args.instance + '.txt', 'w') as fopen: | |
fopen.write(overall_log) | |
def __list_logs(self): | |
""" | |
List all log files for the given RDS instance identifier. | |
""" | |
response = self._rds.describe_db_log_files( | |
DBInstanceIdentifier=self._args.instance | |
) | |
return [log for log in response['DescribeDBLogFiles'] if 'mysql-error.log' in log['LogFileName'] or 'mysql-error-running.log' in log['LogFileName'] or log['LogFileName'].startswith('error/postgresql.log')] | |
def __get_log(self, log_file_name): | |
""" | |
Retrieve a specific log file from the RDS instance. | |
""" | |
log_content = '' | |
marker = '0' | |
# Handle large logs by downloading in portions | |
while True: | |
response = self._rds.download_db_log_file_portion( | |
DBInstanceIdentifier=self._args.instance, | |
LogFileName=log_file_name, | |
Marker=marker | |
) | |
log_content += f"{response['LogFileData']}\n" | |
marker = response['Marker'] | |
if not response['AdditionalDataPending']: | |
break | |
return log_content | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment