Skip to content

Instantly share code, notes, and snippets.

@maxhoheiser
Created March 4, 2024 20:16
Show Gist options
  • Save maxhoheiser/e81bb62563b15700c0bc3a185817cc2c to your computer and use it in GitHub Desktop.
Save maxhoheiser/e81bb62563b15700c0bc3a185817cc2c to your computer and use it in GitHub Desktop.
CDK Python app deploying firehose data stream with kinesis as source and redshift as target
import random
import string
import aws_cdk as cdk
import aws_cdk.aws_ec2 as ec2
import aws_cdk.aws_iam as iam
import aws_cdk.aws_kinesis as kinesis
import aws_cdk.aws_kinesisfirehose as firehose
import aws_cdk.aws_logs as logs
import aws_cdk.aws_redshift as redshift
import aws_cdk.aws_s3 as s3
from constructs import Construct
def short_uid():
return "".join(random.choices(string.ascii_lowercase, k=5))
REDSHIFT_DB_NAME = "streaming_db"
REDSHIFT_MASTER_USER = "dwh_user"
REDSHIFT_MASTER_PASSWORD = "123456789Test"
class TestKinesisFirehoseScenario(cdk.Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# kinesis stream
kinesis_stream = kinesis.Stream(
self,
"KinesisStream",
stream_name="kinesis-stream",
shard_count=1,
stream_mode=kinesis.StreamMode("PROVISIONED"),
)
# s3 bucket
bucket = s3.Bucket(
self,
"S3Bucket",
bucket_name="firehose-raw-data",
removal_policy=cdk.RemovalPolicy.DESTROY, # required since default value is RETAIN
# auto_delete_objects=True, # required to delete the not empty bucket
# auto_delete requires lambda therefore not supported currently by LocalStack
)
# redshift s3 access role
role_redshift_cluster = iam.Role(
self,
"RedshiftClusterRole",
role_name="redshift-cluster-role",
assumed_by=iam.ServicePrincipal("redshift.amazonaws.com"),
# managed_policies=[
# iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3ReadOnlyAccess")
# ],
)
policy_redshift_cluster = iam.Policy(
self,
"RedshiftPolicy",
policy_name="redshift-policy",
statements=[
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
],
resources=[bucket.bucket_arn, f"{bucket.bucket_arn}/*"],
),
],
)
role_redshift_cluster.attach_inline_policy(policy_redshift_cluster)
# create vpc for redshift cluster
redshift_vpc = ec2.Vpc(
self,
"RedshiftVpc",
vpc_name="redshift-vpc",
ip_addresses=ec2.IpAddresses.cidr("10.10.0.0/16"),
max_azs=1,
nat_gateways=1,
enable_dns_support=True,
enable_dns_hostnames=True,
subnet_configuration=[
ec2.SubnetConfiguration(
name="public", cidr_mask=24, subnet_type=ec2.SubnetType.PUBLIC
),
],
)
redshift_vpc_subnet_ids = redshift_vpc.select_subnets(
subnet_type=ec2.SubnetType.PUBLIC
).subnet_ids
# create subnet group for redshift cluster
redshift_cluster_subnet_group = redshift.CfnClusterSubnetGroup(
self,
"RedshiftClusterSubnetGroup",
subnet_ids=redshift_vpc_subnet_ids,
description="Redshift Cluster Subnet Group",
)
# crete security group to allow inbound traffic
redshift_security_group = ec2.SecurityGroup(
self,
"RedshiftSecurityGroup",
vpc=redshift_vpc,
security_group_name="redshift-security-group",
description="Security group for redshift cluster",
allow_all_outbound=True,
)
redshift_security_group.add_ingress_rule(
ec2.Peer.any_ipv4(),
ec2.Port.tcp(REDSHIFT_DEFAULT_PORT), # allow redshift port
)
redshift_security_group.add_ingress_rule(
ec2.Peer.any_ipv4(),
ec2.Port.tcp(22), # allow ssh,
)
# create redshift cluster
redshift_cluster_name = "redshift-cluster"
ec2_instance_type = "dc2.large"
redshift_cluster = redshift.CfnCluster(
self,
"RedshiftCluster",
cluster_identifier=redshift_cluster_name,
cluster_type="single-node",
number_of_nodes=1,
db_name=REDSHIFT_DB_NAME,
master_username=REDSHIFT_MASTER_USER,
master_user_password=REDSHIFT_MASTER_PASSWORD,
iam_roles=[role_redshift_cluster.role_arn],
node_type=f"{ec2_instance_type}",
cluster_subnet_group_name=redshift_cluster_subnet_group.ref,
vpc_security_group_ids=[redshift_security_group.security_group_id],
publicly_accessible=True,
# port=REDSHIFT_DEFAULT_PORT,
)
# firehose delivery stream kinesis access role
role_firehose_kinesis = iam.Role(
self,
"FirehoseKinesisRole",
role_name="firehose-kinesis-role",
assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"),
)
policy_firehose_kinesis = iam.Policy(
self,
"FirehoseKinesisPolicy",
policy_name="firehose-kinesis-policy",
statements=[
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards",
],
resources=[kinesis_stream.stream_arn],
),
],
)
role_firehose_kinesis.attach_inline_policy(policy_firehose_kinesis)
# firehose kinesis stream source configuration
kinesis_stream_source_configuration = (
firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=kinesis_stream.stream_arn,
role_arn=role_firehose_kinesis.role_arn,
)
)
# cloud watch logging group and stream for firehose s3 error logging
firehose_s3_log_group_name = "firehose-s3-log-group"
firehose_s3_log_stream_name = "firehose-s3-log-stream"
firehose_s3_log_group = logs.LogGroup(
self,
"FirehoseLogGroup",
log_group_name=firehose_s3_log_group_name,
removal_policy=cdk.RemovalPolicy.DESTROY, # required since default value is RETAIN
)
firehose_s3_log_group.add_stream(
"FirehoseLogStream", log_stream_name=firehose_s3_log_stream_name
)
# firehose s3 access role
role_firehose_s3 = iam.Role(
self,
"FirehoseS3Role",
role_name="firehose-s3-role",
assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"),
)
policy_firehose_s3 = iam.Policy(
self,
"FirehoseS3Policy",
policy_name="firehose-s3-policy",
statements=[
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
],
resources=[bucket.bucket_arn, f"{bucket.bucket_arn}/*"],
),
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["logs:PutLogEvents", "logs:CreateLogStream"],
resources=[firehose_s3_log_group.log_group_arn],
),
],
)
role_firehose_s3.attach_inline_policy(policy_firehose_s3)
# firehose redshift destination configuration
redshift_s3_destination_configuration = (
firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
bucket_arn=bucket.bucket_arn,
role_arn=role_firehose_s3.role_arn,
prefix="redshift-raw-data/",
compression_format="UNCOMPRESSED",
)
)
redshift_cluster_address = redshift_cluster.attr_endpoint_address
redshift_destination_configuration = firehose.CfnDeliveryStream.RedshiftDestinationConfigurationProperty(
cluster_jdbcurl=f"jdbc:redshift://{redshift_cluster_address}:{REDSHIFT_DEFAULT_PORT}/{REDSHIFT_DB_NAME}",
copy_command=firehose.CfnDeliveryStream.CopyCommandProperty(
data_table_name=TABLE_NAME
),
password=REDSHIFT_MASTER_PASSWORD,
username=REDSHIFT_MASTER_USER,
role_arn=role_firehose_s3.role_arn,
s3_configuration=redshift_s3_destination_configuration,
cloud_watch_logging_options=firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
enabled=True,
log_group_name=firehose_s3_log_group_name,
log_stream_name=firehose_s3_log_stream_name,
),
)
firehose_stream = firehose.CfnDeliveryStream(
self,
"FirehoseDeliveryStream",
delivery_stream_name="firehose-deliverystream",
delivery_stream_type="KinesisStreamAsSource",
kinesis_stream_source_configuration=kinesis_stream_source_configuration,
redshift_destination_configuration=redshift_destination_configuration,
)
app = cdk.App()
stack = TestKinesisFirehoseScenario(app, "TestKinesisFirehoseScenario")
app.synth()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment