Skip to content

Instantly share code, notes, and snippets.

@snehamehrin
Last active August 3, 2020 13:49
Show Gist options
  • Save snehamehrin/b1b2db14eb420b7bc398010e31a4e07b to your computer and use it in GitHub Desktop.
Save snehamehrin/b1b2db14eb420b7bc398010e31a4e07b to your computer and use it in GitHub Desktop.
Kinesis Stack Ingestion
from stackapi import StackAPI
import subprocess
import setup
import boto3
import json
class Kinesis(object):
def __init__(self, StreamName=None):
self.StreamName = StreamName
self.session = boto3.Session(profile_name='bigdata_demo')
self.client = self.session.client('firehose', region_name='us-east-1')
def get_kinesis_records(self):
SITE = StackAPI('stackoverflow')
SITE.max_pages = 50
SITE.page_size = 100
key = setup.client_id
questions = SITE.fetch('questions', sort='creation')
for quest in questions['items']:
payload_lst = {'question_id': str(quest['question_id']),
'title': str(quest['title']),
'is_answered': str(quest['is_answered']),
'view_count': quest['view_count'],
'answer_count': quest['answer_count'],
'score': quest['score'],
'last_activity_date': quest['last_activity_date'],
'creation_date': quest['creation_date'],
}
json_payload = json.dumps(payload_lst)
json_payload_encode = json_payload.encode("utf-8")
kinesis_record = {
'DeliveryStreamName' : self.StreamName,
'Record' : {
'Data': json_payload_encode
}
}
response = self.client.put_record(**kinesis_record )
return response
def main():
kinesis_helper = Kinesis(StreamName='stack-firehose-stream')
aws_response =kinesis_helper.get_kinesis_records()
print(aws_response)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment