Created
May 23, 2024 22:58
-
-
Save dleavitt/f3684b23dc3344010ad4b2563d292150 to your computer and use it in GitHub Desktop.
Stream a CloudWatch Log Group to a HTTP Destination via Kinesis Firehose
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Firehose -> HTTP | |
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha"; | |
import { | |
CommonDestinationProps, | |
DestinationS3BackupProps, | |
} from "@aws-cdk/aws-kinesisfirehose-destinations-alpha"; | |
import * as cdk from "aws-cdk-lib"; | |
import * as iam from "aws-cdk-lib/aws-iam"; | |
import * as s3 from "aws-cdk-lib/aws-s3"; | |
import { Construct } from "constructs"; | |
// private helpers probably won't survive version upgrades | |
import { | |
DestinationBackupConfig, | |
createEncryptionConfig, | |
createLoggingOptions, | |
createProcessingConfig, | |
} from "@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/private/helpers"; | |
import { CfnDeliveryStream } from "aws-cdk-lib/aws-kinesisfirehose"; | |
export interface BufferingHints { | |
readonly bufferingInterval?: cdk.Duration; | |
readonly bufferingSize?: cdk.Size; | |
} | |
export interface FirehoseHttpEndpointDestinationProps | |
extends CommonDestinationProps, | |
BufferingHints { | |
readonly name?: string; | |
readonly contentEncoding?: ContentEncoding; | |
/** | |
* Amazon Data Firehose includes these key-value pairs in each HTTP call. | |
*/ | |
readonly parameters?: Record<string, string>; // TODO: allow passing refs | |
/** | |
* @default 60s | |
*/ | |
readonly retryDuration?: cdk.Duration; | |
/** | |
* @default BackupMode.FAILED | |
*/ | |
readonly backupMode?: BackupMode; | |
readonly s3Backup?: DestinationHttpEndpointS3BackupProps; | |
} | |
// these are supposed to be generic and almost are, except for mode which isn't | |
// quite applicable | |
export interface DestinationHttpEndpointS3BackupProps | |
extends Omit<DestinationS3BackupProps, "mode"> {} | |
export enum ContentEncoding { | |
NONE = "NONE", | |
GZIP = "GZIP", | |
} | |
export enum BackupMode { | |
FAILED = "FailedDataOnly", | |
ALL = "AllData", | |
} | |
// Augment destination config, which currently only supports S3 | |
// may not actually need to get this fancy | |
declare module "@aws-cdk/aws-kinesisfirehose-alpha" { | |
interface DestinationConfig { | |
/** | |
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-httpendpointdestinationconfiguration.html | |
*/ | |
httpEndpointDestinationConfiguration?: CfnDeliveryStream.HttpEndpointDestinationConfigurationProperty; | |
} | |
} | |
/** | |
* Destination to allow an Amazon Data Firehose to point at a | |
* HttpEndpointDestination. | |
* | |
* Adapted from: | |
* https://github.com/aws/aws-cdk/blob/main/packages/@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/s3-bucket.ts | |
* | |
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-httpendpointdestinationconfiguration.html | |
*/ | |
export class FirehoseHttpEndpointDestination implements firehose.IDestination { | |
constructor( | |
private readonly url: string, | |
private readonly props: FirehoseHttpEndpointDestinationProps, | |
) {} | |
bind( | |
scope: Construct, | |
_options: firehose.DestinationBindOptions, | |
): firehose.DestinationConfig { | |
const role = | |
this.props.role ?? | |
new iam.Role(scope, "HTTP Endpoint Destination Role", { | |
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"), | |
}); | |
const { loggingOptions, dependables: loggingDependables } = | |
createLoggingOptions(scope, { | |
logging: this.props.logging, | |
logGroup: this.props.logGroup, | |
role, | |
streamId: "HttpEndpointDestination", | |
}) ?? {}; | |
const commonAttributes = this.props.parameters | |
? Object.entries(this.props.parameters).map( | |
([attributeName, attributeValue]) => ({ | |
attributeName, | |
attributeValue, | |
}), | |
) | |
: undefined; | |
const { backupConfig, dependables: backupDependables } = createBackupConfig( | |
scope, | |
role, | |
this.props.s3Backup, | |
)!; | |
return { | |
httpEndpointDestinationConfiguration: { | |
endpointConfiguration: { | |
name: | |
this.props.name ?? | |
cdk.Names.uniqueResourceName(scope, {}) + "HttpEndpointDestination", | |
url: this.url, | |
}, | |
s3Configuration: backupConfig, | |
s3BackupMode: this.props.backupMode, | |
bufferingHints: createBufferingHints( | |
this.props.bufferingInterval, | |
this.props.bufferingSize, | |
), | |
cloudWatchLoggingOptions: loggingOptions, | |
processingConfiguration: createProcessingConfig( | |
scope, | |
role, | |
this.props.processor, | |
), | |
requestConfiguration: { | |
contentEncoding: this.props.contentEncoding, | |
commonAttributes, | |
}, | |
roleArn: role.roleArn, | |
retryOptions: { | |
durationInSeconds: this.props.retryDuration?.toSeconds(), | |
}, | |
}, | |
dependables: [ | |
...(loggingDependables ?? []), | |
...(backupDependables ?? []), | |
], | |
}; | |
} | |
} | |
/** | |
* Has different limits and defaults than the lib one (they should parameterize | |
* those) | |
*/ | |
function createBufferingHints( | |
interval?: cdk.Duration, | |
size?: cdk.Size, | |
): CfnDeliveryStream.BufferingHintsProperty | undefined { | |
if (!interval && !size) { | |
return undefined; | |
} | |
const intervalInSeconds = interval?.toSeconds() ?? 60; | |
if (intervalInSeconds > 900) { | |
throw new Error( | |
`Buffering interval must be less than 900 seconds. Buffering interval provided was ${intervalInSeconds} seconds.`, | |
); | |
} | |
const sizeInMBs = size?.toMebibytes() ?? 1; | |
if (sizeInMBs < 1 || sizeInMBs > 64) { | |
throw new Error( | |
`Buffering size must be between 1 and 64 MiBs. Buffering size provided was ${sizeInMBs} MiBs.`, | |
); | |
} | |
return { intervalInSeconds, sizeInMBs }; | |
} | |
/** | |
* This is mandatory for http endpoint (optional for s3) so impl is different | |
*/ | |
function createBackupConfig( | |
scope: Construct, | |
role: iam.IRole, | |
props: DestinationHttpEndpointS3BackupProps = {}, | |
): DestinationBackupConfig { | |
const bucket = props.bucket ?? new s3.Bucket(scope, "BackupBucket"); | |
const bucketGrant = bucket.grantReadWrite(role); | |
const { loggingOptions, dependables: loggingDependables } = | |
createLoggingOptions(scope, { | |
logging: props.logging, | |
logGroup: props.logGroup, | |
role, | |
streamId: "S3Backup", | |
}) ?? {}; | |
return { | |
backupConfig: { | |
bucketArn: bucket.bucketArn, | |
roleArn: role.roleArn, | |
prefix: props.dataOutputPrefix, | |
errorOutputPrefix: props.errorOutputPrefix, | |
bufferingHints: createBufferingHints( | |
props.bufferingInterval, | |
props.bufferingSize, | |
), | |
compressionFormat: props.compression?.value, | |
encryptionConfiguration: createEncryptionConfig( | |
role, | |
props.encryptionKey, | |
), | |
cloudWatchLoggingOptions: loggingOptions, | |
}, | |
dependables: [bucketGrant, ...(loggingDependables ?? [])], | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Cloudwatch -> Firehose | |
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha"; | |
import * as iam from "aws-cdk-lib/aws-iam"; | |
import * as logs from "aws-cdk-lib/aws-logs"; | |
import { Construct } from "constructs"; | |
export interface FirehoseLogDestinationProps { | |
/** | |
* The role to assume to write log events to the destination | |
* | |
* @default - A new Role is created | |
*/ | |
readonly role?: iam.IRole; | |
} | |
/** | |
* Destination to allow a log's SubscriptionFilter to point at a firehose | |
* DeliveryStream. | |
* | |
* Taken from: | |
* https://github.com/aws/aws-cdk/blob/main/packages/aws-cdk-lib/aws-logs-destinations/lib/kinesis.ts#L21 | |
*/ | |
export class FirehoseLogDestination | |
implements logs.ILogSubscriptionDestination | |
{ | |
constructor( | |
private readonly stream: firehose.IDeliveryStream, | |
private readonly props: FirehoseLogDestinationProps = {}, | |
) {} | |
public bind( | |
scope: Construct, | |
_sourceLogGroup: logs.ILogGroup, | |
): logs.LogSubscriptionDestinationConfig { | |
// Following example from https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample | |
// Create a role to be assumed by CWL that can write to this stream and pass itself. | |
const id = "CloudWatchLogsCanPutRecords"; | |
const role = | |
this.props.role ?? | |
(scope.node.tryFindChild(id) as iam.IRole) ?? | |
new iam.Role(scope, id, { | |
assumedBy: new iam.ServicePrincipal("logs.amazonaws.com"), | |
}); | |
this.stream.grantPutRecords(role); | |
role.grantPassRole(role); | |
const policy = role.node.tryFindChild("DefaultPolicy") as iam.CfnPolicy; | |
if (policy) { | |
// Remove circular dependency | |
const cfnRole = role.node.defaultChild as iam.CfnRole; | |
cfnRole.addOverride("DependsOn", undefined); | |
// Ensure policy is created before subscription filter | |
scope.node.addDependency(policy); | |
} | |
return { arn: this.stream.deliveryStreamArn, role }; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// L3 Construct demonstrating usage | |
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha"; | |
import * as logs from "aws-cdk-lib/aws-logs"; | |
import { Construct } from "constructs"; | |
import { FirehoseHttpEndpointDestination } from "./FirehoseHttpEndpointDestination"; | |
import { FirehoseLogDestination } from "./FirehoseLogDestination"; | |
export interface SumoLogicFirehoseProps { | |
url: string; | |
logGroup: logs.LogGroup; | |
filterPattern?: logs.IFilterPattern; | |
logging?: boolean; | |
} | |
/** | |
* Exports a Cloudwatch Log Group to SumoLogic via a Amazon Data Firehose AKA | |
* Kinesis Firehose stream. | |
* | |
* It relies on a combination of alpha-level AWS contructs and custom stuff | |
* built on top of them, so probably not very stable. | |
* | |
* - TODO: parameterize destination stuff | |
* - S3 backup bucket | |
* - processor | |
* - logging | |
*/ | |
export class SumoLogicFirehose extends Construct { | |
subscription: logs.SubscriptionFilter; | |
firehoseStream: firehose.DeliveryStream; | |
constructor( | |
scope: Construct, | |
id: string, | |
{ logGroup, url, filterPattern, logging }: SumoLogicFirehoseProps, | |
) { | |
super(scope, id); | |
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html | |
this.firehoseStream = new firehose.DeliveryStream(this, "dstream", { | |
destinations: [ | |
new FirehoseHttpEndpointDestination(url, { | |
name: "Sumo Logic", | |
logging: logging ?? true, | |
}), | |
], | |
encryption: firehose.StreamEncryption.AWS_OWNED, | |
}); | |
this.subscription = new logs.SubscriptionFilter(this, "subscription", { | |
logGroup, | |
filterPattern: filterPattern ?? logs.FilterPattern.allEvents(), | |
destination: new FirehoseLogDestination(this.firehoseStream), | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment