Created
March 4, 2024 20:16
-
-
Save maxhoheiser/e81bb62563b15700c0bc3a185817cc2c to your computer and use it in GitHub Desktop.
CDK Python app deploying firehose data stream with kinesis as source and redshift as target
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import string | |
import aws_cdk as cdk | |
import aws_cdk.aws_ec2 as ec2 | |
import aws_cdk.aws_iam as iam | |
import aws_cdk.aws_kinesis as kinesis | |
import aws_cdk.aws_kinesisfirehose as firehose | |
import aws_cdk.aws_logs as logs | |
import aws_cdk.aws_redshift as redshift | |
import aws_cdk.aws_s3 as s3 | |
from constructs import Construct | |
def short_uid(): | |
return "".join(random.choices(string.ascii_lowercase, k=5)) | |
REDSHIFT_DB_NAME = "streaming_db" | |
REDSHIFT_MASTER_USER = "dwh_user" | |
REDSHIFT_MASTER_PASSWORD = "123456789Test" | |
class TestKinesisFirehoseScenario(cdk.Stack): | |
def __init__(self, scope: Construct, id: str, **kwargs) -> None: | |
super().__init__(scope, id, **kwargs) | |
# kinesis stream | |
kinesis_stream = kinesis.Stream( | |
self, | |
"KinesisStream", | |
stream_name="kinesis-stream", | |
shard_count=1, | |
stream_mode=kinesis.StreamMode("PROVISIONED"), | |
) | |
# s3 bucket | |
bucket = s3.Bucket( | |
self, | |
"S3Bucket", | |
bucket_name="firehose-raw-data", | |
removal_policy=cdk.RemovalPolicy.DESTROY, # required since default value is RETAIN | |
# auto_delete_objects=True, # required to delete the not empty bucket | |
# auto_delete requires lambda therefore not supported currently by LocalStack | |
) | |
# redshift s3 access role | |
role_redshift_cluster = iam.Role( | |
self, | |
"RedshiftClusterRole", | |
role_name="redshift-cluster-role", | |
assumed_by=iam.ServicePrincipal("redshift.amazonaws.com"), | |
# managed_policies=[ | |
# iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3ReadOnlyAccess") | |
# ], | |
) | |
policy_redshift_cluster = iam.Policy( | |
self, | |
"RedshiftPolicy", | |
policy_name="redshift-policy", | |
statements=[ | |
iam.PolicyStatement( | |
effect=iam.Effect.ALLOW, | |
actions=[ | |
"s3:GetBucketLocation", | |
"s3:GetObject", | |
"s3:ListBucket", | |
"s3:ListBucketMultipartUploads", | |
], | |
resources=[bucket.bucket_arn, f"{bucket.bucket_arn}/*"], | |
), | |
], | |
) | |
role_redshift_cluster.attach_inline_policy(policy_redshift_cluster) | |
# create vpc for redshift cluster | |
redshift_vpc = ec2.Vpc( | |
self, | |
"RedshiftVpc", | |
vpc_name="redshift-vpc", | |
ip_addresses=ec2.IpAddresses.cidr("10.10.0.0/16"), | |
max_azs=1, | |
nat_gateways=1, | |
enable_dns_support=True, | |
enable_dns_hostnames=True, | |
subnet_configuration=[ | |
ec2.SubnetConfiguration( | |
name="public", cidr_mask=24, subnet_type=ec2.SubnetType.PUBLIC | |
), | |
], | |
) | |
redshift_vpc_subnet_ids = redshift_vpc.select_subnets( | |
subnet_type=ec2.SubnetType.PUBLIC | |
).subnet_ids | |
# create subnet group for redshift cluster | |
redshift_cluster_subnet_group = redshift.CfnClusterSubnetGroup( | |
self, | |
"RedshiftClusterSubnetGroup", | |
subnet_ids=redshift_vpc_subnet_ids, | |
description="Redshift Cluster Subnet Group", | |
) | |
# crete security group to allow inbound traffic | |
redshift_security_group = ec2.SecurityGroup( | |
self, | |
"RedshiftSecurityGroup", | |
vpc=redshift_vpc, | |
security_group_name="redshift-security-group", | |
description="Security group for redshift cluster", | |
allow_all_outbound=True, | |
) | |
redshift_security_group.add_ingress_rule( | |
ec2.Peer.any_ipv4(), | |
ec2.Port.tcp(REDSHIFT_DEFAULT_PORT), # allow redshift port | |
) | |
redshift_security_group.add_ingress_rule( | |
ec2.Peer.any_ipv4(), | |
ec2.Port.tcp(22), # allow ssh, | |
) | |
# create redshift cluster | |
redshift_cluster_name = "redshift-cluster" | |
ec2_instance_type = "dc2.large" | |
redshift_cluster = redshift.CfnCluster( | |
self, | |
"RedshiftCluster", | |
cluster_identifier=redshift_cluster_name, | |
cluster_type="single-node", | |
number_of_nodes=1, | |
db_name=REDSHIFT_DB_NAME, | |
master_username=REDSHIFT_MASTER_USER, | |
master_user_password=REDSHIFT_MASTER_PASSWORD, | |
iam_roles=[role_redshift_cluster.role_arn], | |
node_type=f"{ec2_instance_type}", | |
cluster_subnet_group_name=redshift_cluster_subnet_group.ref, | |
vpc_security_group_ids=[redshift_security_group.security_group_id], | |
publicly_accessible=True, | |
# port=REDSHIFT_DEFAULT_PORT, | |
) | |
# firehose delivery stream kinesis access role | |
role_firehose_kinesis = iam.Role( | |
self, | |
"FirehoseKinesisRole", | |
role_name="firehose-kinesis-role", | |
assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"), | |
) | |
policy_firehose_kinesis = iam.Policy( | |
self, | |
"FirehoseKinesisPolicy", | |
policy_name="firehose-kinesis-policy", | |
statements=[ | |
iam.PolicyStatement( | |
effect=iam.Effect.ALLOW, | |
actions=[ | |
"kinesis:DescribeStream", | |
"kinesis:GetShardIterator", | |
"kinesis:GetRecords", | |
"kinesis:ListShards", | |
], | |
resources=[kinesis_stream.stream_arn], | |
), | |
], | |
) | |
role_firehose_kinesis.attach_inline_policy(policy_firehose_kinesis) | |
# firehose kinesis stream source configuration | |
kinesis_stream_source_configuration = ( | |
firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty( | |
kinesis_stream_arn=kinesis_stream.stream_arn, | |
role_arn=role_firehose_kinesis.role_arn, | |
) | |
) | |
# cloud watch logging group and stream for firehose s3 error logging | |
firehose_s3_log_group_name = "firehose-s3-log-group" | |
firehose_s3_log_stream_name = "firehose-s3-log-stream" | |
firehose_s3_log_group = logs.LogGroup( | |
self, | |
"FirehoseLogGroup", | |
log_group_name=firehose_s3_log_group_name, | |
removal_policy=cdk.RemovalPolicy.DESTROY, # required since default value is RETAIN | |
) | |
firehose_s3_log_group.add_stream( | |
"FirehoseLogStream", log_stream_name=firehose_s3_log_stream_name | |
) | |
# firehose s3 access role | |
role_firehose_s3 = iam.Role( | |
self, | |
"FirehoseS3Role", | |
role_name="firehose-s3-role", | |
assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"), | |
) | |
policy_firehose_s3 = iam.Policy( | |
self, | |
"FirehoseS3Policy", | |
policy_name="firehose-s3-policy", | |
statements=[ | |
iam.PolicyStatement( | |
effect=iam.Effect.ALLOW, | |
actions=[ | |
"s3:AbortMultipartUpload", | |
"s3:GetBucketLocation", | |
"s3:GetObject", | |
"s3:ListBucket", | |
"s3:ListBucketMultipartUploads", | |
"s3:PutObject", | |
], | |
resources=[bucket.bucket_arn, f"{bucket.bucket_arn}/*"], | |
), | |
iam.PolicyStatement( | |
effect=iam.Effect.ALLOW, | |
actions=["logs:PutLogEvents", "logs:CreateLogStream"], | |
resources=[firehose_s3_log_group.log_group_arn], | |
), | |
], | |
) | |
role_firehose_s3.attach_inline_policy(policy_firehose_s3) | |
# firehose redshift destination configuration | |
redshift_s3_destination_configuration = ( | |
firehose.CfnDeliveryStream.S3DestinationConfigurationProperty( | |
bucket_arn=bucket.bucket_arn, | |
role_arn=role_firehose_s3.role_arn, | |
prefix="redshift-raw-data/", | |
compression_format="UNCOMPRESSED", | |
) | |
) | |
redshift_cluster_address = redshift_cluster.attr_endpoint_address | |
redshift_destination_configuration = firehose.CfnDeliveryStream.RedshiftDestinationConfigurationProperty( | |
cluster_jdbcurl=f"jdbc:redshift://{redshift_cluster_address}:{REDSHIFT_DEFAULT_PORT}/{REDSHIFT_DB_NAME}", | |
copy_command=firehose.CfnDeliveryStream.CopyCommandProperty( | |
data_table_name=TABLE_NAME | |
), | |
password=REDSHIFT_MASTER_PASSWORD, | |
username=REDSHIFT_MASTER_USER, | |
role_arn=role_firehose_s3.role_arn, | |
s3_configuration=redshift_s3_destination_configuration, | |
cloud_watch_logging_options=firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty( | |
enabled=True, | |
log_group_name=firehose_s3_log_group_name, | |
log_stream_name=firehose_s3_log_stream_name, | |
), | |
) | |
firehose_stream = firehose.CfnDeliveryStream( | |
self, | |
"FirehoseDeliveryStream", | |
delivery_stream_name="firehose-deliverystream", | |
delivery_stream_type="KinesisStreamAsSource", | |
kinesis_stream_source_configuration=kinesis_stream_source_configuration, | |
redshift_destination_configuration=redshift_destination_configuration, | |
) | |
app = cdk.App() | |
stack = TestKinesisFirehoseScenario(app, "TestKinesisFirehoseScenario") | |
app.synth() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment