Skip to content

Instantly share code, notes, and snippets.

@Doerge
Created October 23, 2015 16:13
Show Gist options
  • Save Doerge/59f3890b887e62e33e2f to your computer and use it in GitHub Desktop.
Save Doerge/59f3890b887e62e33e2f to your computer and use it in GitHub Desktop.
Kinesis Firehose put_record() for boto
import base64
import boto
from boto.connection import AWSQueryConnection
from boto.regioninfo import RegionInfo
from boto.exception import JSONResponseError
from boto.kinesis import exceptions
from boto.compat import json
from boto.compat import six
class FirehoseConnection(AWSQueryConnection):
"""
Amazon Kinesis Service API Reference
Amazon Kinesis is a managed service that scales elastically for
real time processing of streaming big data.
"""
APIVersion = "2015-08-04"
DefaultRegionName = "us-east-1"
DefaultRegionEndpoint = "firehose.us-east-1.amazonaws.com"
ServiceName = "Firehose"
TargetPrefix = "Firehose_20150804"
ResponseError = JSONResponseError
_faults = {
"ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException,
"LimitExceededException": exceptions.LimitExceededException,
"ExpiredIteratorException": exceptions.ExpiredIteratorException,
"ResourceInUseException": exceptions.ResourceInUseException,
"ResourceNotFoundException": exceptions.ResourceNotFoundException,
"InvalidArgumentException": exceptions.InvalidArgumentException,
"SubscriptionRequiredException": exceptions.SubscriptionRequiredException
}
def __init__(self, **kwargs):
region = kwargs.pop('region', None)
if not region:
region = RegionInfo(self, self.DefaultRegionName,
self.DefaultRegionEndpoint)
if 'host' not in kwargs:
kwargs['host'] = region.endpoint
super(FirehoseConnection, self).__init__(**kwargs)
self.region = region
def _required_auth_capability(self):
return ['hmac-v4']
def put_record(self, stream_name, data, partition_key,
explicit_hash_key=None,
sequence_number_for_ordering=None,
exclusive_minimum_sequence_number=None,
b64_encode=True):
"""
This operation puts a data record into an Amazon Kinesis
stream from a producer. This operation must be called to send
data from the producer into the Amazon Kinesis stream for
real-time ingestion and subsequent processing. The `PutRecord`
operation requires the name of the stream that captures,
stores, and transports the data; a partition key; and the data
blob itself. The data blob could be a segment from a log file,
geographic/location data, website clickstream data, or any
other data type.
The partition key is used to distribute data across shards.
Amazon Kinesis segregates the data records that belong to a
data stream into multiple shards, using the partition key
associated with each data record to determine which shard a
given data record belongs to.
Partition keys are Unicode strings, with a maximum length
limit of 256 bytes. An MD5 hash function is used to map
partition keys to 128-bit integer values and to map associated
data records to shards using the hash key ranges of the
shards. You can override hashing the partition key to
determine the shard by explicitly specifying a hash value
using the `ExplicitHashKey` parameter. For more information,
see the `Amazon Kinesis Developer Guide`_.
`PutRecord` returns the shard ID of where the data record was
placed and the sequence number that was assigned to the data
record.
Sequence numbers generally increase over time. To guarantee
strictly increasing ordering, use the
`SequenceNumberForOrdering` parameter. For more information,
see the `Amazon Kinesis Developer Guide`_.
If a `PutRecord` request cannot be processed because of
insufficient provisioned throughput on the shard involved in
the request, `PutRecord` throws
`ProvisionedThroughputExceededException`.
Data records are accessible for only 24 hours from the time
that they are added to an Amazon Kinesis stream.
:type stream_name: string
:param stream_name: The name of the stream to put the data record into.
:type data: blob
:param data: The data blob to put into the record, which is
Base64-encoded when the blob is serialized.
The maximum size of the data blob (the payload after
Base64-decoding) is 50 kilobytes (KB)
Set `b64_encode` to disable automatic Base64 encoding.
:type partition_key: string
:param partition_key: Determines which shard in the stream the data
record is assigned to. Partition keys are Unicode strings with a
maximum length limit of 256 bytes. Amazon Kinesis uses the
partition key as input to a hash function that maps the partition
key and associated data to a specific shard. Specifically, an MD5
hash function is used to map partition keys to 128-bit integer
values and to map associated data records to shards. As a result of
this hashing mechanism, all data records with the same partition
key will map to the same shard within the stream.
:type explicit_hash_key: string
:param explicit_hash_key: The hash value used to explicitly determine
the shard the data record is assigned to by overriding the
partition key hash.
:type sequence_number_for_ordering: string
:param sequence_number_for_ordering: Guarantees strictly increasing
sequence numbers, for puts from the same client and to the same
partition key. Usage: set the `SequenceNumberForOrdering` of record
n to the sequence number of record n-1 (as returned in the
PutRecordResult when putting record n-1 ). If this parameter is not
set, records will be coarsely ordered based on arrival time.
:type b64_encode: boolean
:param b64_encode: Whether to Base64 encode `data`. Can be set to
``False`` if `data` is already encoded to prevent double encoding.
"""
params = {
'DeliveryStreamName': stream_name,
'Record': {
'Data': data
}
}
if explicit_hash_key is not None:
params['ExplicitHashKey'] = explicit_hash_key
if sequence_number_for_ordering is not None:
params['SequenceNumberForOrdering'] = sequence_number_for_ordering
if b64_encode:
if not isinstance(params['Record']['Data'], six.binary_type):
params['Record']['Data'] = params['Record']['Data'].encode('utf-8')
params['Record']['Data'] = base64.b64encode(params['Record']['Data']).decode('utf-8')
return self.make_request(action='PutRecord',
body=json.dumps(params))
def make_request(self, action, body):
headers = {
'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
'Host': self.region.endpoint,
'Content-Type': 'application/x-amz-json-1.1',
'Content-Length': str(len(body)),
}
http_request = self.build_base_http_request(
method='POST', path='/', auth_path='/', params={},
headers=headers, data=body)
response = self._mexe(http_request, sender=None,
override_num_retries=10)
response_body = response.read().decode('utf-8')
boto.log.debug(response.getheaders())
boto.log.debug(response_body)
if response.status == 200:
if response_body:
return json.loads(response_body)
else:
json_body = json.loads(response_body)
fault_name = json_body.get('__type', None)
exception_class = self._faults.get(fault_name, self.ResponseError)
raise exception_class(response.status, response.reason,
body=json_body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment