Skip to content

Instantly share code, notes, and snippets.

@aleskiontherun
Last active July 23, 2020 08:53
Show Gist options
  • Save aleskiontherun/5e3aeeae2f78057f171828d7bc2d238a to your computer and use it in GitHub Desktop.
Save aleskiontherun/5e3aeeae2f78057f171828d7bc2d238a to your computer and use it in GitHub Desktop.
Copy existing logs from CloudWatch to Elasticsearch
import json
import boto3
import gzip
import base64
import time
# AWS account ID
AWS_ACCOUNT_ID = "001234567890"
# CloudWatch log group name
CW_GROUP = 'my-logs'
# CloudWatch log stream names, set to None to copy from all streams
CW_STREAMS = ['my-stream']
# Lambda function name https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_ES_Stream.html
LAMBDA_NAME = 'LogsToElasticsearch'
# Copy logs from this timestamp in milliseconds
START_TIME = 1595030400000
# Copy logs up to this timestamp in milliseconds
END_TIME = 1595116799999
# Number of messages to copy in one batch
BATCH_SIZE = 1000
# Number of seconds to wait between batches
BATCH_INTERVAL = 1.0
# Execute without actually sending events to ES
DRY_RUN = False
logs_client = boto3.client('logs')
lambda_client = boto3.client('lambda')
def get_streams():
token = None
while True:
response = get_streams_batch(token)
for row in response['logStreams']:
yield row['logStreamName']
if 'nextToken' in response:
token = response['nextToken']
else:
break
def get_streams_batch(next_token=None):
if next_token is None:
return logs_client.describe_log_streams(
logGroupName=CW_GROUP,
orderBy='LogStreamName',
)
return logs_client.describe_log_streams(
logGroupName=CW_GROUP,
orderBy='LogStreamName',
nextToken=next_token,
)
def get_events(stream_name):
token = None
while True:
response = get_events_batch(stream_name, token)
if len(response['events']) == 0:
break
for row in response['events']:
yield row
if 'nextForwardToken' in response:
token = response['nextForwardToken']
else:
break
def get_events_batch(stream_name, next_token=None):
if next_token is None:
return logs_client.get_log_events(
logGroupName=CW_GROUP,
logStreamName=stream_name,
startTime=START_TIME,
endTime=END_TIME,
startFromHead=True,
)
return logs_client.get_log_events(
logGroupName=CW_GROUP,
logStreamName=stream_name,
nextToken=next_token,
)
if CW_STREAMS in None:
CW_STREAMS = get_streams()
t = 1
i = 0
size = 10
events = []
for stream_name in CW_STREAMS:
for event in get_events(stream_name):
event_id = str(event['timestamp']) + str(i).zfill(20)
events.append({
'id': event_id,
'timestamp': event['timestamp'],
'message': event['message'],
})
i += 1
if len(events) == BATCH_SIZE:
payload = {
"awslogs": {
"data": base64.b64encode(gzip.compress(json.dumps({
"messageType": "DATA_MESSAGE",
"owner": AWS_ACCOUNT_ID,
"logGroup": CW_GROUP,
"logStream": CW_STREAMS,
"subscriptionFilters": [],
"logEvents": events,
}).encode('utf-8'))).decode('utf-8')
}
}
if not DRY_RUN:
lambda_client.invoke(
FunctionName=LAMBDA_NAME,
InvocationType='Event',
LogType='None',
Payload=str.encode(json.dumps(payload), 'utf-8')
)
print('sent ' + event_id)
# print(json.dumps(events, indent=4))
events = []
if BATCH_INTERVAL > 0:
time.sleep(BATCH_INTERVAL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment