Last active
September 25, 2019 21:17
-
-
Save doi-t/58d5a25c11a798e999b5cd4487287b28 to your computer and use it in GitHub Desktop.
CloudWatch Logs --> subscription filter --> Kinesis stream --> boto3.client('kinesis').get_records()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import datetime | |
import gzip | |
import json | |
import sys | |
import time | |
from typing import Any, Dict, List | |
import boto3 | |
def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> List[Dict[str, Any]]: | |
''' | |
lambda function can get records with event source mapping in near real time (Blueprint: kinesis-process-record-python) | |
CloudWatch Logs --> subscription filter --> Kinesis stream --> event source mapping --> Lambda function | |
''' | |
logs_per_record = [] | |
for record in event['Records']: | |
logs_per_record.append(json.loads(gzip.decompress(base64.b64decode(record['kinesis']['data'])))) | |
return logs_per_record | |
def get_cloudwatch_logs_records_from_kinesis_stream(stream_name: str, record_limit: int, data_window: int) -> List[Dict[str, Any]]: | |
''' | |
I want to use CloudWatch Logs Event to get kinesis stream records from lambda function, for example, every 15 minutes instead of event source mapping in near real time. | |
This can work everywhere that has proper kinesis stream permissions. | |
CloudWatch Logs --> subscription filter --> Kinesis stream --> boto3.client('kinesis').get_records() | |
get_records() with 'AT_TIMESTAMP' option gets records from kinesis stream last 'data_window' minutes. | |
Arguments: | |
stream_name: target stream name | |
record_limit: You have to wisely choose 'record_limit' considering your average record size when determining this limit | |
because each data record can be up to 1 MB in size, and each shard can read up to 2 MB per second. | |
data_window: Time length that determines the start reading point. | |
''' | |
kinesis_client = boto3.client('kinesis') | |
response = kinesis_client.describe_stream(StreamName=stream_name) | |
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId'] | |
shard_iterator = kinesis_client.get_shard_iterator(StreamName=stream_name, | |
ShardId=my_shard_id, | |
ShardIteratorType='AT_TIMESTAMP', | |
Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=data_window) | |
)['ShardIterator'] | |
logs_per_record = [] | |
while True: | |
record_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=record_limit) | |
logs_per_record = logs_per_record + retrieve_logs_from_record(record_response) | |
from pprint import pprint | |
pprint(record_response) | |
# NOTE: Checking NextShardIterator does not grantee no more data in the stream. | |
# Look at the meaning of 'MillisBehindLatest' in the document. | |
# Ref. https://boto3.readthedocs.io/en/latest/reference/services/kinesis.html#Kinesis.Client.get_records | |
if (record_response['MillisBehindLatest'] > 0) and ('NextShardIterator' in record_response): | |
shard_iterator = record_response['NextShardIterator'] | |
else: | |
break | |
time.sleep(0.5) # To avoid an error of 'Rate exceeded' | |
return logs_per_record | |
def retrieve_logs_from_record(record_response: Dict) -> List[Dict[str, Any]]: | |
logs = [] | |
for record in record_response['Records']: | |
# NOTE: If you use boto3 instead of event source mapping, you do not need to do b64decode (for some reasons). | |
# Ref. https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/ValidateLogEventFlow.html | |
try: | |
logs.append(json.loads(gzip.decompress(record['Data']).decode('utf-8'))) | |
except OSError: | |
try: | |
logs.append(json.loads(record['Data'].decode('utf-8'))) | |
except Exception: | |
logs.append(record['Data'].decode('utf-8')) | |
return logs | |
if __name__ == '__main__': | |
argc = len(sys.argv) | |
if (argc != 4): | |
print('Usage: %s [stream_name] [record_limit] [data_window (minutes)]' % sys.argv[0]) | |
sys.exit(1) | |
stream_name = sys.argv[1] | |
record_limit = int(sys.argv[2]) | |
data_window = int(sys.argv[3]) | |
logs_per_record = get_cloudwatch_logs_records_from_kinesis_stream(stream_name, record_limit, data_window) | |
print(json.dumps(logs_per_record, indent=4)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment