Skip to content

Instantly share code, notes, and snippets.

@leemengtaiwan

leemengtaiwan/producer.py Secret

Last active Apr 17, 2018
Embed
What would you like to do?
Simple demo for writing messages to Kinesis Stream
import time
import json
import boto3
import random
import calendar
from datetime import datetime
from pprint import pprint
KINESIS_STREAM_NAME = 'YOUR_OWN_KINESIS_STREAM_NAME'
REGION_NAME = 'YOUR_AWS_REGION'
def write_to_stream(event_id, event, region_name, stream_name):
"""Write streaming event to specified Kinesis Stream within specified region.
Parameters
----------
event_id: str
The unique identifer for the event which will be needed in partitioning.
event: dict
The actual payload including all the details of the event.
region_name: str
AWS region identifier, e.g., "ap-northeast-1".
stream_name: str
Kinesis Stream name to write.
Returns
-------
res: Response returned by `put_record` func defined in boto3.client('kinesis')
"""
client = boto3.client('kinesis', region_name=region_name)
res = client.put_record(
StreamName=stream_name,
Data=json.dumps(event) + '\n',
PartitionKey=event_id
)
return res
# simulate streaming data generation
while True:
event = {
"event_id": str(random.randint(1, 100000)),
"event_type": random.choice(['read_post', 'write_post', 'make_comments']),
"timestamp": calendar.timegm(datetime.utcnow().timetuple())
}
pprint(event)
# send to Kinesis Stream
event_id = event['event_id']
write_to_stream(event_id, event, REGION_NAME, KINESIS_STREAM_NAME)
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.