Skip to content

Instantly share code, notes, and snippets.

@nhammad
Created August 19, 2023 13:06
Show Gist options
  • Save nhammad/1950d68434d754413a798e5a8d1c9f80 to your computer and use it in GitHub Desktop.
Save nhammad/1950d68434d754413a798e5a8d1c9f80 to your computer and use it in GitHub Desktop.
# Copyright 2013-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not
# use this file except in compliance with the License. A copy of the License
# is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
from __future__ import print_function
import base64
import boto3
import botocore.exceptions
import argparse
import json
import threading
import time
import datetime
import uuid
from argparse import RawTextHelpFormatter
from random import choice
# To preclude inclusion of aws keys into this code, you may temporarily add
# your AWS credentials to the file:
# ~/.boto
# as follows:
# [Credentials]
# aws_access_key_id = <your access key>
# aws_secret_access_key = <your secret key>
contexts = {
"schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1",
"data": [
{
"schema": "iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0",
"data": {"useragentFamily": "Other", "useragentMajor": None, "useragentMinor": None, "useragentPatch": None, "useragentVersion": "Other", "osFamily": "Other", "osMajor": None, "osMinor": None, "osPatch": None, "osPatchMinor": None, "osVersion": "Other", "deviceFamily": "Other"}
},
{
"schema": "iglu:nl.basjes/yauaa_context/jsonschema/1-0-2",
"data": {"deviceBrand": "Unknown", "deviceName": "Unknown", "operatingSystemVersionMajor": "??", "layoutEngineNameVersion": "Unknown ??", "operatingSystemNameVersion": "Unknown ??", "layoutEngineNameVersionMajor": "Unknown ??", "operatingSystemName": "Unknown", "agentVersionMajor": "3", "layoutEngineVersionMajor": "??", "deviceClass": "Unknown", "agentNameVersionMajor": "Snowplow-Nodejs-Tracker 3", "operatingSystemNameVersionMajor": "Unknown ??", "operatingSystemClass": "Unknown", "layoutEngineName": "Unknown", "agentName": "Snowplow-Nodejs-Tracker", "agentVersion": "3.1.6", "layoutEngineClass": "Unknown", "agentNameVersion": "Snowplow-Nodejs-Tracker 3.1.6", "operatingSystemVersion": "??", "agentClass": "Special", "layoutEngineVersion": "??"}
}
]
}
event_with_placeholders = [
"{app_id}",
"srv",
"{timestamp}",
"{timestamp}",
"{timestamp}",
"struct",
"{event_id}",
"",
"python",
"1.0",
# 10: v_collector
"ssc-2.3.2-rc1-kinesis",
"streamCommon-2.0.2-common-2.0.2",
"",
"46.114.141.x",
"",
"",
"",
"{network_userid}",
"DE",
"NI",
# 20: geo_city
"Garbsen",
"30823",
"52.4164",
"9.5963",
"Lower Saxony",
"",
"",
"",
"",
"",
# 30: page_title
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 40: refr_urlport
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 50: mkt_content
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 60: tr_affiliation
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 70: ti_category
"",
"",
"",
"",
"",
"",
"",
"snowplow-nodejs-tracker/3.1.6",
"",
"",
# 80: br_version
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 90: br_features_windowsmedia
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 100: os_timezone
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# 110: tr_tax_base
"",
"",
"",
"",
"",
"Europe/Berlin",
"",
"",
"",
"{timestamp}",
# 120: refr_domain_userid
"",
"",
"{contexts}",
"",
"{timestamp}",
"{event_vendor}",
"{event_name}",
"jsonschema",
"1-0-1",
"",
"{timestamp}"
]
event = "\t".join(event_with_placeholders)
def make_string(x):
return event.format(
app_id="load-test",
contexts=json.dumps(contexts, separators=(',', ':')),
event_id=uuid.uuid4(),
event_vendor="com.axelspringer.ott",
event_name="hls_manifest_requested",
network_userid=uuid.uuid4(),
timestamp=datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds"))
def get_or_create_stream(stream_name, shard_count):
stream = None
try:
stream = kinesis.describe_stream(StreamName=stream_name)
# print(json.dumps(stream, sort_keys=True, indent=2,
# separators=(',', ': ')))
except kinesis.exceptions.ResourceNotFoundException as rnfe:
while (stream is None) or ('ACTIVE' not in stream['StreamDescription']['StreamStatus']):
if stream is None:
print ('Could not find ACTIVE stream:{0} trying to create.'.format(
stream_name))
kinesis.create_stream(stream_name, shard_count)
else:
print ("Stream status: %s" % stream['StreamDescription']['StreamStatus'])
time.sleep(1)
stream = kinesis.describe_stream(stream_name)
return stream
def sum_posts(kinesis_actors):
"""Sum all posts across an array of KinesisPosters
"""
total_records = 0
for actor in kinesis_actors:
total_records += actor.total_records
return total_records
class KinesisPoster(threading.Thread):
"""The Poster thread that repeatedly posts records to shards in a given
Kinesis stream.
"""
def __init__(self, stream_name, partition_key, poster_time=30, quiet=False,
name=None, group=None, filename=None, args=(), kwargs={}):
super(KinesisPoster, self).__init__(name=name, group=group,
args=args, kwargs=kwargs)
self._pending_records = []
self.stream_name = stream_name
self.partition_key = partition_key
self.quiet = quiet
self.default_records = [
make_string(100), make_string(1000), make_string(500),
make_string(5000), make_string(10), make_string(750),
make_string(10), make_string(2000), make_string(500)
]
self.poster_time = poster_time
self.total_records = 0
self.file_contents = None
if filename is not None:
print('~> opening file:{0}'.format(filename))
with open(filename, 'r') as content_file:
self.file_contents = content_file.read(40000)
def add_records(self, records):
""" Add given records to the Poster's pending records list.
"""
# print('~> adding records:{0}'.format(records))
if len(records) == 1:
self._pending_records.extend(records[0])
else:
self._pending_records.extend(records)
def put_all_records(self):
"""Put all pending records in the Kinesis stream."""
precs = self._pending_records
self._pending_records = []
self.put_records(precs)
self.total_records += len(precs)
return len(precs)
def put_file_contents(self):
if self.file_contents:
response = kinesis.put_record(
StreamName=self.stream_name,
Data=self.file_contents, PartitionKey=self.partition_key)
self.total_records += 1
if self.quiet is False:
print ("-= put seqNum:", response['SequenceNumber'])
def put_records(self, records):
"""Put the given records in the Kinesis stream."""
def wrap(line):
return {
'Data': bytes(line, 'utf-8'),
'PartitionKey': self.partition_key
}
kinesis_records = list(map(wrap, records))
response = kinesis.put_records(
StreamName=self.stream_name,
Records=kinesis_records)
if self.quiet is False:
print ("-= put successfulRecords:", len(response['Records']))
def run(self):
start = datetime.datetime.now()
finish = start + datetime.timedelta(seconds=self.poster_time)
while finish > datetime.datetime.now():
if self.file_contents:
self.put_file_contents()
else:
self.add_records([make_string(x) for x in range(50)])
records_put = self.put_all_records()
if self.quiet is False:
print(' Total Records Put:', self.total_records)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='''Create or attach to a Kinesis stream and put records in the stream''',
formatter_class=RawTextHelpFormatter)
parser.add_argument('stream_name',
help='''the name of the Kinesis stream to either connect to or create''')
parser.add_argument('--region', type=str, default='us-east-1',
help='''the name of the Kinesis region to connect with [default: us-east-1]''')
parser.add_argument('--shard_count', type=int, default=1,
help='''the number of shards to create in the stream, if creating [default: 1]''')
parser.add_argument('--partition_key', default='PyKinesisExample',
help='''the partition key to use when communicating records to the
stream [default: 'PyKinesisExample-##']''')
parser.add_argument('--poster_count', type=int, default=2,
help='''the number of poster threads [default: 2]''')
parser.add_argument('--poster_time', type=int, default=30,
help='''how many seconds the poster threads should put records into
the stream [default: 30]''')
parser.add_argument('--record_file', type=str, default=None,
help='''the file whose contents to use as a record''')
parser.add_argument('--quiet', action='store_true', default=False,
help='''reduce console output to just initialization info''')
parser.add_argument('--delete_stream', action='store_true', default=False,
help='''delete the Kinesis stream matching the given stream_name''')
parser.add_argument('--describe_only', action='store_true', default=False,
help='''only describe the Kinesis stream matching the given stream_name''')
threads = []
args = parser.parse_args()
#kinesis = boto3.client('kinesis', region_name=args.region)
kinesis = boto3.client('kinesis',region_name=args.region)
if (args.delete_stream):
# delete the given Kinesis stream name
kinesis.delete_stream(stream_name=args.stream_name)
else:
start_time = datetime.datetime.now()
if args.describe_only is True:
# describe the given Kinesis stream name
stream = kinesis.describe_stream(StreamName=args.stream_name)
print(json.dumps(stream, sort_keys=True, indent=2,
separators=(',', ': ')))
else:
stream = get_or_create_stream(args.stream_name, args.shard_count)
# Create a KinesisPoster thread up to the poster_count value
for pid in range(args.poster_count):
# create poster name per poster thread
poster_name = 'shard_poster:%s' % pid
# create partition key per poster thread
part_key = args.partition_key + '-' + str(pid)
poster = KinesisPoster(
stream_name=args.stream_name,
partition_key=part_key, # poster's partition key
poster_time=args.poster_time,
name=poster_name, # thread name
filename=args.record_file,
quiet=args.quiet)
poster.daemon = True
threads.append(poster)
print ('starting: ', poster_name)
poster.start()
# Wait for all threads to complete
for t in threads:
t.join()
finish_time = datetime.datetime.now()
duration = (finish_time - start_time).total_seconds()
total_records = sum_posts(threads)
print ("-=> Exiting Poster Main <=-")
print (" Total Records:", total_records)
print (" Total Time:", duration)
print (" Records / sec:", total_records / duration)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment