Skip to content

Instantly share code, notes, and snippets.

@marcobazzani
Created May 31, 2024 07:40
Show Gist options
  • Save marcobazzani/28d926ea40aa1e9b8c6e34c99adfc013 to your computer and use it in GitHub Desktop.
Save marcobazzani/28d926ea40aa1e9b8c6e34c99adfc013 to your computer and use it in GitHub Desktop.
Athena and PaperTrail Datasource
import os
import boto3
import time
from datetime import datetime
import urllib.request
import json
def round_minute(date: datetime = None, round_to: int = 1):
"""
round datetime object to minutes
"""
if not date:
date = datetime.now()
date = date.replace(second=0, microsecond=0)
delta = date.minute % round_to
return date.replace(minute=date.minute - delta)
def query_string():
return """
SELECT
-- id, received_at, source_ip, hostname, facility, severity, program,
received_at, source_ip,
TRIM('"' FROM SPLIT_PART(message, ',', 1)) as server_name,
TRIM('"' FROM SPLIT_PART(message, ',', 6)) as download,
TRIM('"' FROM SPLIT_PART(message, ',', 7)) as upload,
TRIM('"' FROM SPLIT_PART(message, ',', 10)) as link
FROM "default"."papertrail_logs"
where program = 'SPEEDTEST'
AND message not like '%server name%'
ORDER BY generated_at DESC;
"""
def query_athena():
bucket='net-bazzani-papertrail'
path='for_cloudwatch'
DATABASE = 'default'
client = boto3.client('athena')
query = query_string()
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': 's3://{}/{}/'.format(bucket, path)
}
)
execution_id = response['QueryExecutionId']
while True:
response = client.get_query_execution(QueryExecutionId=execution_id)
if response['QueryExecution']['Status']['State'] == 'SUCCEEDED':
break
elif response['QueryExecution']['Status']['State'] == 'FAILED':
raise Exception('Query failed to run')
time.sleep(1)
result_data = client.get_query_results(QueryExecutionId=execution_id)
return result_data
def fetch_papertrail(min_time, index):
# Define the API endpoint and headers
url = 'https://papertrailapp.com/api/v1/events/search.json'
headers = {
'X-Papertrail-Token': 'MpklkFUKQqW09GCrkAnO'
}
# Define the query parameters
params = {
'q': 'program:SPEEDTEST',
'min_time': int(min_time)
}
# Encode query parameters
encoded_params = urllib.parse.urlencode(params)
# Create request URL
request_url = f"{url}?{encoded_params}"
try:
# Make the request
req = urllib.request.Request(request_url, headers=headers)
with urllib.request.urlopen(req) as response:
data = response.read().decode('utf-8')
# Parse the JSON response
data = json.loads(data)
# Extract events
events = data.get('events', [])
# Process each event
result = []
for event in events:
timestamp = round_minute(datetime.strptime(event['received_at'], '%Y-%m-%dT%H:%M:%S%z')).timestamp()
message_parts = event['message'].split(',')
if len(message_parts) >= 3:
value = message_parts[2+index].strip('"')
if value.replace('.', '', 1).isdigit(): # Check if value is numeric
result.append({'timestamp': int(timestamp), 'value': float(value), 'source': 'Papertrail'})
return result
except urllib.error.HTTPError as e:
print('Error:', e.code)
except urllib.error.URLError as e:
print('Error:', e.reason)
def fetch_athena(start_time, end_time, index):
# Obtain the data from query_athena
result_data = query_athena()
data = result_data['ResultSet']['Rows'][1:] # Exclude header row
# Parse the data into timestamp-value pairs
athena_data = []
last_timestamp=0
for row in data:
if 'VarCharValue' not in row['Data'][index]:
continue
timestamp = round_minute(datetime.strptime(row['Data'][0]['VarCharValue'], "%Y-%m-%d %H:%M:%S.%f")).timestamp()
if timestamp < start_time or timestamp > end_time:
print("{} excluded".format(row))
continue
value = float(row['Data'][index]['VarCharValue'])
athena_data.append({'timestamp': timestamp, 'value': value, 'source': 'Athena'})
last_timestamp=timestamp
return last_timestamp, athena_data
def validation_error(message):
return {
'Error': {
'Code': 'Validation',
'Value': message,
}
}
def validate_get_metric_data_event(event):
arguments = event['GetMetricDataRequest']['Arguments']
if len(arguments) != 3:
return validation_error('Argument count must be 2')
if not isinstance(arguments[0], str):
return validation_error('Argument 1 must be a string')
if not isinstance(arguments[1], (int, float)):
return validation_error('Argument 2 must be a number')
return None
def get_metric_data_event_handler(event):
validation_error = validate_get_metric_data_event(event)
if validation_error:
return validation_error
start_time = event['GetMetricDataRequest']['StartTime']
end_time = event['GetMetricDataRequest']['EndTime']
period = event['GetMetricDataRequest']['Period']
label = event['GetMetricDataRequest']['Arguments'][0]
index = int(event['GetMetricDataRequest']['Arguments'][1])
source = event['GetMetricDataRequest']['Arguments'][2]
print("{} {} {} {} {} {}".format(start_time,end_time,period,label,index,source))
if source == "ALL":
last_timestamp, athena_data = fetch_athena(start_time, end_time, index)
papertrail_data = fetch_papertrail(last_timestamp, index)
elif source == "Athena":
last_timestamp, athena_data = fetch_athena(start_time, end_time, index)
elif source == "Papertrail":
papertrail_data = fetch_papertrail(0,index)
retval = {
'MetricDataResults': [
{
'StatusCode': 'Complete',
'Label': "{} Athena".format(label),
'Timestamps': [val['timestamp'] for val in athena_data],
'Values': [val['value'] for val in athena_data],
},
{
'StatusCode': 'Complete',
'Label': "{} Papertrail".format(label),
'Timestamps': [val['timestamp'] for val in papertrail_data],
'Values': [val['value'] for val in papertrail_data],
}
]
}
return retval
def describe_get_metric_data_event_handler():
description = f"""## Sample hello world data source connector
Generates a sample time series at a given value across a time range
### Query arguments
\\# | Type | Description
---|---|---
1 | String | The name of the time series
2 | Number | The value returned for all data points in the time series
### Example Expression
```
LAMBDA('{os.environ.get('AWS_LAMBDA_FUNCTION_NAME')}', 'metricLabel', 10)
```
"""
return {
'ArgumentDefaults': [{'Value': 'Download'}, {'Value': 3}, {'Source', 'ALL'}],
'Description': description
}
def lambda_handler(event, context):
if event['EventType'] == 'GetMetricData':
return get_metric_data_event_handler(event)
if event['EventType'] == 'DescribeGetMetricData':
return describe_get_metric_data_event_handler()
return validation_error('Invalid event type: ' + event['EventType'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment