Skip to content

Instantly share code, notes, and snippets.

@danielblazevski
Last active August 23, 2020 02:52
Show Gist options
  • Save danielblazevski/dee0849b31b6c5eaff8eb4a7ffdf980d to your computer and use it in GitHub Desktop.
Save danielblazevski/dee0849b31b6c5eaff8eb4a7ffdf980d to your computer and use it in GitHub Desktop.

Getting started with AWS severless architecture: Tutorial on Kinesis and DynamoDB using Twitter data

Introduction

In this post, we will discuss severless architecture and give simple examples of getting starting with severless tools, namely using Kinesis and DyanmoDB to process Twitter data.

Many companies are now shifting towards a severless model. There are currently three main contenders in this space, namely Amazon Web Services (AWS), Google Computing Platform (GCP) and Microsoft Azure. In this introductory post, we focus on setting up severless archtecture on AWS.

Introduction to severless architecture

There have been many paradigms for organizations to collect, store and process large amounts of data effectively.

Before any clould computing, organiztaions had to build out their own computing infrastructure and maintain severs in house. The rise of cloud infrastructure gave rise to a paradigm shift where companies could suddenly spin up hundreds of EC2 instances on AWS and instantly have a place where they could set up their infrastructure.

Open source tools like Hadoop, Cassandra, HBase, Hive, Storm etc took off. These tools were well suited for the model of spinning up N number of severs and set up their back-end platorm.

To minimize the cost of having a team to manage a collection of severs to store and process data, large tech companies like Amazon and Google have developed managed severless tools that take the place of tools commonly setup on individual severs. While these tools will generall run on multiple severs, this fact is abstracted from the user of the tool.

Kinesis and DynamoDB

Intro to Kinesis Streams

Amazon Kinesis is a tool used for working with data in streams. It has a few features, namely Kinesis Firehose, Kinesis Analytics and Kinesis Streams and we will focus on creating and using a Kinesis Stream. Kinesis streams has standard concepts such as comsumer and producer of other queing and pub/sub systems.

The fundamental concepts of Kinesis Streams are

  • A stream: A queue for incoming data to reside in. Stream are labeled by a string. For example, Amazon might have an "Orders" stream, a "Customer-Review" stream, and so on.

  • A shard: A stream can be composed of one or more shards. One shard can read data at a rate of up to 2 MB/sec and can write up to 1,000 records/sec up to a max of 1 MB/sec. A user should specify the number of shards that coincides with the amount of data expected to be present in their system. Note that pricing of Kinesis streams is done on a per/shard basis

  • Partition Key: The Partition Key is a string that is hashed to determine which shard the record goes to. For instance, given record r = {name: 'Jane', city: 'New York'} one can, for example, specify the Partition Key as r["city"] which will send all the records with the same city to the same shard.

  • Producer: A producer is a source of data, typically generated external to your system in real-world applications (e.g. user click data)

  • Consumer: Once the data is placed in a stream, it can be processed and stored somewhere (e.g. on HDFS or a database). Anything that reads in data from a stream is said to be a consumer.

Intro to DynamoDB

DynamoDB is Aamazon's distributed key-value store, optimized for fast read and writes of data. Like many other distributed key-value stores, its query language does not support joins but is optimized for fast reading an writing of data allowing for a more flexible table strucutre than traditional relational models.

Some key concepts include

  • Parition key Like all key-value stores, a partition key is a unique identifier for an entry. This allows for O(1) access to a row by the partition key in DynamoDB.

  • Sort key Each row can be broken up and have some additional structure. Each row can be thought of as a hashmap ordered by the keys. In the language of Java, I like to think of DynamoDB as a

HashMap < String, TreeMap <String, T> >

where T is a generic type that can be different for each "column". Note that a TreeMap is essentially a HashMap where the keys are sorted. (e.g. in the figure note that the "email" key is comes before the "name" key).

  • Provisioned Throughput This is the amount of reads and writes you expect your table to encur. Pricing of DynamoDB is done basen on how many reads/writes you provision

Example using Twtitter data

Cost of running this tutorial

The following sections show how one can ingest Twitter data into Kinesis and store hashtags in DynamoDB. We will be using the minimal amount of resources necessary in this tutorial, namely 1 stream and 1 shard in Kinesis, which costs less than 0.02 per hour (pricing is dependent on the region you choose). DynamoDB will also cost less than 0.02 per hour.

prerequisites: AWS and boto

If you do not already have one, you can setup an AWS account here for free. We will connect to the AWS ecosystem using the boto library in Python. Using pip, one can easily install the latest version of boto, namely

pip install boto3

See here for installing pip on your operation system.

You can specify the physical region in which all your data pipeline resides via a config file, located in

[default]
region=us-east-1

Export your AWS keys in terminal, namely

$ nano ~/.bash_profile 

and copy

export AWS_ACCESS_KEY_ID=XXXXXXX
export AWS_SECRET_ACCESS_KEY=XXXxxxXXXxxxXX

into bash_profile and be sure to run

$ source ~/.bash_profile

to make sure that environment variables have been set.

prerequisites: Twitter credentials

To get started, you will need credentials from Twitter to make calls to its public API. To this end, go to apps.twitter.com and click create a new app and fill out the form to get your unique credentials for making requests from Twitter for their data.

Once you have your Twitter credentials you can put them in a config-like file called twitterCreds.py, which would look like

## template of what twitterCreds.py should look like
## the actual twitterCreds file should contain
## your actual Twitter credentials that one can
## obtain/create here: https://apps.twitter.com/

consumer_key = "XXXxxXXX"
consumer_secret = "xxXXxXX"
access_token_key = "XXX-XXxxXXX"
access_token_secret = "XXxxXXX"

If you are using GitHub to work on this tutorial, please be sure to add twitterCrediantials.py in your .gitignore file to avoid putting your credentials on GitHub

Feeding data from Twitter to a Kinesis stream

To put data from Twitter into a Kinesis stream, we use the boto library in Python to create and put records into a Kinesis steam.

We split up the code to create a stream and put data in a stream to avoid confusion. One can do this in one script, but the naive way would not run and a few extra lines of code are needed, as we explain below.

# Simple script to create a kinesis stream
# create-stream.py
from boto import kinesis

kinesis.create_stream(name = "twitter", number_shards = 1)

One can check on your AWS console that the stream has in fact been created. Once you have verified that, you can now start ingesting live Twitter data! Doing so in Kinesis is quite straightforward using their API as seen in the following twitter-kinesis.py script:

# Example to use twitter api and feed data into kinesis

from TwitterAPI import TwitterAPI
import boto3
import json
import twitterCreds

## twitter credentials

consumer_key = twitterCreds.consumer_key
consumer_secret = twitterCreds.consumer_secret
access_token_key = twitterCreds.access_token_key
access_token_secret = twitterCreds.access_token_secret

api = TwitterAPI(consumer_key, consumer_secret, access_token_key, access_token_secret)

kinesis = boto3.client('kinesis')

request = api.request('statuses/filter', {'locations':'-90,-90,90,90'})
for item in request:
        kinesis.put_record(StreamName="twitter", Data=json.dumps(item), PartitionKey="filler")

Since we only have one shard, we had a uniform partition key called "filler" that was the same for each item in request. If we had more shards, one should specify a partition key that is a function to the data. For example

kinesis.put_record(StreamName="twitter",
 Data=json.dumps(item), 
 PartitionKey=str(item["zip_code"])

would partition the shards based on the zip code of the tweet if we had more shards.

Simple consumer

After running python twitter-kinesis.py, you can verify that data is being written to Kinesis by checking the AWS console.

Another way to test that data is being written to Kinesis is to write a simple consumer. In the following section we will write a more complex consumer that parses the data, extracts hashtags and updates DynamoDB, though the following simple-consumer.py consumer script is a quick way to see the most recent data being written to Kinesis

import boto3
import time
import json

## aws creds are stored in ~/.boto
kinesis = boto3.client("kinesis")
shard_id = "shardId-000000000000" #only one shard!
pre_shard_it = kinesis.get_shard_iterator(StreamName="twitter", ShardId=shard_id, ShardIteratorType="LATEST")

shard_it = pre_shard_it["ShardIterator"]
while 1==1:
     out = kinesis.get_records(ShardIterator=shard_it, Limit=1)
     shard_it = out["NextShardIterator"]
     print out;
     time.sleep(1.0)

Running python simple-consumer.py should yield output to your terminal, assuming that the Kinesis consumer is also running. With two open terminal tabs for the producer and consumer, your terminal should look something like the following diagram

Comsuming data from the Twitter stream and storing data in DynamoDB

The boto library also makes it easy to read data from a Kinesis stream, and write to a DynamoDB table. First, let's see how one creates a DynamoDB table.

## create a table to store twitter hashtags in DynamoDB

import boto3

dynamodb = boto3.resource('dynamodb')

table = dynamodb.create_table(
    TableName='hashtags',
    KeySchema=[
        {
            'AttributeName': 'hashtag',
            'KeyType': 'HASH'
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'hashtag',
            'AttributeType': 'S'
        }

    ],
    # pricing determined by ProvisionedThroughput
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)

table.meta.client.get_waiter('table_exists').wait(TableName='hashtags')

One can go to the AWS console and verify that the table has been created. For the example above, clicking on "DynamoDB" and then "tables" and "twitter" should yield the following on the AWS console

Once one has done that, we can write a simple script that reads the data from the Kinesis streams, extracts the Hashtag field and updates the counter in DynamoDB

## script to read data from Kinesis, extract hashtags and store into 
## dynamoDB

import boto3
import time
import json
import decimal

## aws creds are stored in ~/.boto

## Connent to the kinesis stream
kinesis = boto3.client("kinesis")
shard_id = 'shardId-000000000000' #only one shard
shard_it = kinesis.get_shard_iterator(StreamName="twitter", ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('hashtags')

while 1==1:
	out = kinesis.get_records(ShardIterator=shard_it, Limit=100)
	for record in out['Records']:
		if 'entities' in json.loads(record['Data']):
			htags = json.loads(record['Data'])['entities']['hashtags']
			if htags:
 				for ht in htags:
					htag = ht['text']	
					try:
						response = table.update_item(
							Key={
								'hashtag': htag 
							},
							UpdateExpression="set htCount  = htCount + :val",
							ConditionExpression="htCount > 0",
							ExpressionAttributeValues={
								':val': decimal.Decimal(1) 	
							},
							ReturnValues="UPDATED_NEW"
						)
					except: 
                                		response = table.update_item(
                                        		Key={
                                                		'hashtag': htag
                                        		},
                                        		UpdateExpression="set htCount = :val",
                                        		ExpressionAttributeValues={
                                                		':val': decimal.Decimal(1)
                                        		},
                                        		ReturnValues="UPDATED_NEW"
                                		)    
	shard_it = out["NextShardIterator"]

One can verify that hashtags are being stored in DynamoDB by going to the AWS console and clicking on "items", which should look something like

Concluding remarks

Setting up infrastcture to handle large amounts of data to collect and process data in realtime can be quite simple when deployed using the severless products currently availale. This tutorial gave an introduction to using AWS managed services to ingest and store Twitter data using Kinesis and DynamoDB.

Note that we never spun up a single sever and setup a cluster to install and manage, yet tools tools like Kinesis and DynamoDB can scale to read and write GBs of data per second.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment