Skip to content

Instantly share code, notes, and snippets.

@omrihar
Last active September 20, 2018 14:22
Show Gist options
  • Save omrihar/a832964f6af19c3f51a8e1fd83af40a7 to your computer and use it in GitHub Desktop.
Save omrihar/a832964f6af19c3f51a8e1fd83af40a7 to your computer and use it in GitHub Desktop.
An object that can send events to AWS Kinesis Firehose
import os
import logging
import boto3
class FirehoseStream(object):
"""Allows sending of events to Kinesis Firehose"""
def __init__(self, stream=None, region=None,
project=None, logger=None):
"""Create the stream client and wait for events.
Parameters
----------
stream : str, optional
The Firehose stream to use
region : str, optional
AWS Region to use
project : str, optional
Name of the project sending the events
"""
self._stream = stream or os.getenv('AWS_KINESIS_STREAM')
self._region = region or os.getenv('AWS_REGION_NAME')
self._project = project or os.getenv('PROJECT_ID')
self.logger = logger or logging.getLogger(__name__)
self.logger.info("Created a FirehoseStream")
self.client = boto3.client('firehose', region_name=self._region)
self.logger.info(f"Connected to Firehose stream {self._stream} in region "
f"{self._region}")
def _get_timestamp(self, the_date=None):
"""Converts datetimes into time since the unix epoch
Parameters
----------
the_date : datetime
Returns
-------
time_since_epoch : int
Number of seconds since the unix epoch
"""
if not the_date:
the_date = datetime.datetime.utcnow()
return calendar.timegm(the_date.timetuple())
def send_event(self, event_name, data, timestamp=None):
"""Sends an event to record to the Kinesis Firehose client using the
put_record API.
Parameters
----------
event_name : str
One of the event types defined for the project
data : dict
A JSON-serializable object with data to describe the event.
timestamp : datetime, optional
The timestamp associated with the event. If not provided, will be
generated by send_event.
Returns
-------
RecordId : str
The record id of the sent record
"""
payload = {
'event_name': event_name,
'event_timestamp': self._get_timestamp(timestamp),
'project': self._project,
'data': data
}
self.logger.debug(f'Sending to Firehose: {json.dumps(payload)}')
result = self.client.put_record(
DeliveryStreamName=self._stream,
Record={
'Data': json.dumps(payload) + "\n"
}
)
self.logger.debug(f'Sent to Firehose. Response: {result}')
if result and 'RecordId' in result:
return result.get('RecordId')
self.logger.warning('Did not receive a record_id!')
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment