Created
May 31, 2024 07:40
-
-
Save marcobazzani/28d926ea40aa1e9b8c6e34c99adfc013 to your computer and use it in GitHub Desktop.
Athena and PaperTrail Datasource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import boto3 | |
import time | |
from datetime import datetime | |
import urllib.request | |
import json | |
def round_minute(date: datetime = None, round_to: int = 1): | |
""" | |
round datetime object to minutes | |
""" | |
if not date: | |
date = datetime.now() | |
date = date.replace(second=0, microsecond=0) | |
delta = date.minute % round_to | |
return date.replace(minute=date.minute - delta) | |
def query_string(): | |
return """ | |
SELECT | |
-- id, received_at, source_ip, hostname, facility, severity, program, | |
received_at, source_ip, | |
TRIM('"' FROM SPLIT_PART(message, ',', 1)) as server_name, | |
TRIM('"' FROM SPLIT_PART(message, ',', 6)) as download, | |
TRIM('"' FROM SPLIT_PART(message, ',', 7)) as upload, | |
TRIM('"' FROM SPLIT_PART(message, ',', 10)) as link | |
FROM "default"."papertrail_logs" | |
where program = 'SPEEDTEST' | |
AND message not like '%server name%' | |
ORDER BY generated_at DESC; | |
""" | |
def query_athena(): | |
bucket='net-bazzani-papertrail' | |
path='for_cloudwatch' | |
DATABASE = 'default' | |
client = boto3.client('athena') | |
query = query_string() | |
response = client.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={ | |
'Database': DATABASE | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://{}/{}/'.format(bucket, path) | |
} | |
) | |
execution_id = response['QueryExecutionId'] | |
while True: | |
response = client.get_query_execution(QueryExecutionId=execution_id) | |
if response['QueryExecution']['Status']['State'] == 'SUCCEEDED': | |
break | |
elif response['QueryExecution']['Status']['State'] == 'FAILED': | |
raise Exception('Query failed to run') | |
time.sleep(1) | |
result_data = client.get_query_results(QueryExecutionId=execution_id) | |
return result_data | |
def fetch_papertrail(min_time, index): | |
# Define the API endpoint and headers | |
url = 'https://papertrailapp.com/api/v1/events/search.json' | |
headers = { | |
'X-Papertrail-Token': 'MpklkFUKQqW09GCrkAnO' | |
} | |
# Define the query parameters | |
params = { | |
'q': 'program:SPEEDTEST', | |
'min_time': int(min_time) | |
} | |
# Encode query parameters | |
encoded_params = urllib.parse.urlencode(params) | |
# Create request URL | |
request_url = f"{url}?{encoded_params}" | |
try: | |
# Make the request | |
req = urllib.request.Request(request_url, headers=headers) | |
with urllib.request.urlopen(req) as response: | |
data = response.read().decode('utf-8') | |
# Parse the JSON response | |
data = json.loads(data) | |
# Extract events | |
events = data.get('events', []) | |
# Process each event | |
result = [] | |
for event in events: | |
timestamp = round_minute(datetime.strptime(event['received_at'], '%Y-%m-%dT%H:%M:%S%z')).timestamp() | |
message_parts = event['message'].split(',') | |
if len(message_parts) >= 3: | |
value = message_parts[2+index].strip('"') | |
if value.replace('.', '', 1).isdigit(): # Check if value is numeric | |
result.append({'timestamp': int(timestamp), 'value': float(value), 'source': 'Papertrail'}) | |
return result | |
except urllib.error.HTTPError as e: | |
print('Error:', e.code) | |
except urllib.error.URLError as e: | |
print('Error:', e.reason) | |
def fetch_athena(start_time, end_time, index): | |
# Obtain the data from query_athena | |
result_data = query_athena() | |
data = result_data['ResultSet']['Rows'][1:] # Exclude header row | |
# Parse the data into timestamp-value pairs | |
athena_data = [] | |
last_timestamp=0 | |
for row in data: | |
if 'VarCharValue' not in row['Data'][index]: | |
continue | |
timestamp = round_minute(datetime.strptime(row['Data'][0]['VarCharValue'], "%Y-%m-%d %H:%M:%S.%f")).timestamp() | |
if timestamp < start_time or timestamp > end_time: | |
print("{} excluded".format(row)) | |
continue | |
value = float(row['Data'][index]['VarCharValue']) | |
athena_data.append({'timestamp': timestamp, 'value': value, 'source': 'Athena'}) | |
last_timestamp=timestamp | |
return last_timestamp, athena_data | |
def validation_error(message): | |
return { | |
'Error': { | |
'Code': 'Validation', | |
'Value': message, | |
} | |
} | |
def validate_get_metric_data_event(event): | |
arguments = event['GetMetricDataRequest']['Arguments'] | |
if len(arguments) != 3: | |
return validation_error('Argument count must be 2') | |
if not isinstance(arguments[0], str): | |
return validation_error('Argument 1 must be a string') | |
if not isinstance(arguments[1], (int, float)): | |
return validation_error('Argument 2 must be a number') | |
return None | |
def get_metric_data_event_handler(event): | |
validation_error = validate_get_metric_data_event(event) | |
if validation_error: | |
return validation_error | |
start_time = event['GetMetricDataRequest']['StartTime'] | |
end_time = event['GetMetricDataRequest']['EndTime'] | |
period = event['GetMetricDataRequest']['Period'] | |
label = event['GetMetricDataRequest']['Arguments'][0] | |
index = int(event['GetMetricDataRequest']['Arguments'][1]) | |
source = event['GetMetricDataRequest']['Arguments'][2] | |
print("{} {} {} {} {} {}".format(start_time,end_time,period,label,index,source)) | |
if source == "ALL": | |
last_timestamp, athena_data = fetch_athena(start_time, end_time, index) | |
papertrail_data = fetch_papertrail(last_timestamp, index) | |
elif source == "Athena": | |
last_timestamp, athena_data = fetch_athena(start_time, end_time, index) | |
elif source == "Papertrail": | |
papertrail_data = fetch_papertrail(0,index) | |
retval = { | |
'MetricDataResults': [ | |
{ | |
'StatusCode': 'Complete', | |
'Label': "{} Athena".format(label), | |
'Timestamps': [val['timestamp'] for val in athena_data], | |
'Values': [val['value'] for val in athena_data], | |
}, | |
{ | |
'StatusCode': 'Complete', | |
'Label': "{} Papertrail".format(label), | |
'Timestamps': [val['timestamp'] for val in papertrail_data], | |
'Values': [val['value'] for val in papertrail_data], | |
} | |
] | |
} | |
return retval | |
def describe_get_metric_data_event_handler(): | |
description = f"""## Sample hello world data source connector | |
Generates a sample time series at a given value across a time range | |
### Query arguments | |
\\# | Type | Description | |
---|---|--- | |
1 | String | The name of the time series | |
2 | Number | The value returned for all data points in the time series | |
### Example Expression | |
``` | |
LAMBDA('{os.environ.get('AWS_LAMBDA_FUNCTION_NAME')}', 'metricLabel', 10) | |
``` | |
""" | |
return { | |
'ArgumentDefaults': [{'Value': 'Download'}, {'Value': 3}, {'Source', 'ALL'}], | |
'Description': description | |
} | |
def lambda_handler(event, context): | |
if event['EventType'] == 'GetMetricData': | |
return get_metric_data_event_handler(event) | |
if event['EventType'] == 'DescribeGetMetricData': | |
return describe_get_metric_data_event_handler() | |
return validation_error('Invalid event type: ' + event['EventType']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment