Skip to content

Instantly share code, notes, and snippets.

@doi-t
Last active September 25, 2019 21:17
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doi-t/58d5a25c11a798e999b5cd4487287b28 to your computer and use it in GitHub Desktop.
Save doi-t/58d5a25c11a798e999b5cd4487287b28 to your computer and use it in GitHub Desktop.
CloudWatch Logs --> subscription filter --> Kinesis stream --> boto3.client('kinesis').get_records()
import base64
import datetime
import gzip
import json
import sys
import time
from typing import Any, Dict, List
import boto3
def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> List[Dict[str, Any]]:
'''
lambda function can get records with event source mapping in near real time (Blueprint: kinesis-process-record-python)
CloudWatch Logs --> subscription filter --> Kinesis stream --> event source mapping --> Lambda function
'''
logs_per_record = []
for record in event['Records']:
logs_per_record.append(json.loads(gzip.decompress(base64.b64decode(record['kinesis']['data']))))
return logs_per_record
def get_cloudwatch_logs_records_from_kinesis_stream(stream_name: str, record_limit: int, data_window: int) -> List[Dict[str, Any]]:
'''
I want to use CloudWatch Logs Event to get kinesis stream records from lambda function, for example, every 15 minutes instead of event source mapping in near real time.
This can work everywhere that has proper kinesis stream permissions.
CloudWatch Logs --> subscription filter --> Kinesis stream --> boto3.client('kinesis').get_records()
get_records() with 'AT_TIMESTAMP' option gets records from kinesis stream last 'data_window' minutes.
Arguments:
stream_name: target stream name
record_limit: You have to wisely choose 'record_limit' considering your average record size when determining this limit
because each data record can be up to 1 MB in size, and each shard can read up to 2 MB per second.
data_window: Time length that determines the start reading point.
'''
kinesis_client = boto3.client('kinesis')
response = kinesis_client.describe_stream(StreamName=stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=stream_name,
ShardId=my_shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=data_window)
)['ShardIterator']
logs_per_record = []
while True:
record_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=record_limit)
logs_per_record = logs_per_record + retrieve_logs_from_record(record_response)
from pprint import pprint
pprint(record_response)
# NOTE: Checking NextShardIterator does not grantee no more data in the stream.
# Look at the meaning of 'MillisBehindLatest' in the document.
# Ref. https://boto3.readthedocs.io/en/latest/reference/services/kinesis.html#Kinesis.Client.get_records
if (record_response['MillisBehindLatest'] > 0) and ('NextShardIterator' in record_response):
shard_iterator = record_response['NextShardIterator']
else:
break
time.sleep(0.5) # To avoid an error of 'Rate exceeded'
return logs_per_record
def retrieve_logs_from_record(record_response: Dict) -> List[Dict[str, Any]]:
logs = []
for record in record_response['Records']:
# NOTE: If you use boto3 instead of event source mapping, you do not need to do b64decode (for some reasons).
# Ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/ValidateLogEventFlow.html
try:
logs.append(json.loads(gzip.decompress(record['Data']).decode('utf-8')))
except OSError:
try:
logs.append(json.loads(record['Data'].decode('utf-8')))
except Exception:
logs.append(record['Data'].decode('utf-8'))
return logs
if __name__ == '__main__':
argc = len(sys.argv)
if (argc != 4):
print('Usage: %s [stream_name] [record_limit] [data_window (minutes)]' % sys.argv[0])
sys.exit(1)
stream_name = sys.argv[1]
record_limit = int(sys.argv[2])
data_window = int(sys.argv[3])
logs_per_record = get_cloudwatch_logs_records_from_kinesis_stream(stream_name, record_limit, data_window)
print(json.dumps(logs_per_record, indent=4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment