Skip to content

Instantly share code, notes, and snippets.

@dleavitt
Created May 23, 2024 22:58
Show Gist options
  • Save dleavitt/f3684b23dc3344010ad4b2563d292150 to your computer and use it in GitHub Desktop.
Save dleavitt/f3684b23dc3344010ad4b2563d292150 to your computer and use it in GitHub Desktop.
Stream a CloudWatch Log Group to a HTTP Destination via Kinesis Firehose
// Firehose -> HTTP
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha";
import {
CommonDestinationProps,
DestinationS3BackupProps,
} from "@aws-cdk/aws-kinesisfirehose-destinations-alpha";
import * as cdk from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as s3 from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
// private helpers probably won't survive version upgrades
import {
DestinationBackupConfig,
createEncryptionConfig,
createLoggingOptions,
createProcessingConfig,
} from "@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/private/helpers";
import { CfnDeliveryStream } from "aws-cdk-lib/aws-kinesisfirehose";
export interface BufferingHints {
readonly bufferingInterval?: cdk.Duration;
readonly bufferingSize?: cdk.Size;
}
export interface FirehoseHttpEndpointDestinationProps
extends CommonDestinationProps,
BufferingHints {
readonly name?: string;
readonly contentEncoding?: ContentEncoding;
/**
* Amazon Data Firehose includes these key-value pairs in each HTTP call.
*/
readonly parameters?: Record<string, string>; // TODO: allow passing refs
/**
* @default 60s
*/
readonly retryDuration?: cdk.Duration;
/**
* @default BackupMode.FAILED
*/
readonly backupMode?: BackupMode;
readonly s3Backup?: DestinationHttpEndpointS3BackupProps;
}
// these are supposed to be generic and almost are, except for mode which isn't
// quite applicable
export interface DestinationHttpEndpointS3BackupProps
extends Omit<DestinationS3BackupProps, "mode"> {}
export enum ContentEncoding {
NONE = "NONE",
GZIP = "GZIP",
}
export enum BackupMode {
FAILED = "FailedDataOnly",
ALL = "AllData",
}
// Augment destination config, which currently only supports S3
// may not actually need to get this fancy
declare module "@aws-cdk/aws-kinesisfirehose-alpha" {
interface DestinationConfig {
/**
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-httpendpointdestinationconfiguration.html
*/
httpEndpointDestinationConfiguration?: CfnDeliveryStream.HttpEndpointDestinationConfigurationProperty;
}
}
/**
* Destination to allow an Amazon Data Firehose to point at a
* HttpEndpointDestination.
*
* Adapted from:
* https://github.com/aws/aws-cdk/blob/main/packages/@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/s3-bucket.ts
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-kinesisfirehose-deliverystream-httpendpointdestinationconfiguration.html
*/
export class FirehoseHttpEndpointDestination implements firehose.IDestination {
constructor(
private readonly url: string,
private readonly props: FirehoseHttpEndpointDestinationProps,
) {}
bind(
scope: Construct,
_options: firehose.DestinationBindOptions,
): firehose.DestinationConfig {
const role =
this.props.role ??
new iam.Role(scope, "HTTP Endpoint Destination Role", {
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});
const { loggingOptions, dependables: loggingDependables } =
createLoggingOptions(scope, {
logging: this.props.logging,
logGroup: this.props.logGroup,
role,
streamId: "HttpEndpointDestination",
}) ?? {};
const commonAttributes = this.props.parameters
? Object.entries(this.props.parameters).map(
([attributeName, attributeValue]) => ({
attributeName,
attributeValue,
}),
)
: undefined;
const { backupConfig, dependables: backupDependables } = createBackupConfig(
scope,
role,
this.props.s3Backup,
)!;
return {
httpEndpointDestinationConfiguration: {
endpointConfiguration: {
name:
this.props.name ??
cdk.Names.uniqueResourceName(scope, {}) + "HttpEndpointDestination",
url: this.url,
},
s3Configuration: backupConfig,
s3BackupMode: this.props.backupMode,
bufferingHints: createBufferingHints(
this.props.bufferingInterval,
this.props.bufferingSize,
),
cloudWatchLoggingOptions: loggingOptions,
processingConfiguration: createProcessingConfig(
scope,
role,
this.props.processor,
),
requestConfiguration: {
contentEncoding: this.props.contentEncoding,
commonAttributes,
},
roleArn: role.roleArn,
retryOptions: {
durationInSeconds: this.props.retryDuration?.toSeconds(),
},
},
dependables: [
...(loggingDependables ?? []),
...(backupDependables ?? []),
],
};
}
}
/**
* Has different limits and defaults than the lib one (they should parameterize
* those)
*/
function createBufferingHints(
interval?: cdk.Duration,
size?: cdk.Size,
): CfnDeliveryStream.BufferingHintsProperty | undefined {
if (!interval && !size) {
return undefined;
}
const intervalInSeconds = interval?.toSeconds() ?? 60;
if (intervalInSeconds > 900) {
throw new Error(
`Buffering interval must be less than 900 seconds. Buffering interval provided was ${intervalInSeconds} seconds.`,
);
}
const sizeInMBs = size?.toMebibytes() ?? 1;
if (sizeInMBs < 1 || sizeInMBs > 64) {
throw new Error(
`Buffering size must be between 1 and 64 MiBs. Buffering size provided was ${sizeInMBs} MiBs.`,
);
}
return { intervalInSeconds, sizeInMBs };
}
/**
* This is mandatory for http endpoint (optional for s3) so impl is different
*/
function createBackupConfig(
scope: Construct,
role: iam.IRole,
props: DestinationHttpEndpointS3BackupProps = {},
): DestinationBackupConfig {
const bucket = props.bucket ?? new s3.Bucket(scope, "BackupBucket");
const bucketGrant = bucket.grantReadWrite(role);
const { loggingOptions, dependables: loggingDependables } =
createLoggingOptions(scope, {
logging: props.logging,
logGroup: props.logGroup,
role,
streamId: "S3Backup",
}) ?? {};
return {
backupConfig: {
bucketArn: bucket.bucketArn,
roleArn: role.roleArn,
prefix: props.dataOutputPrefix,
errorOutputPrefix: props.errorOutputPrefix,
bufferingHints: createBufferingHints(
props.bufferingInterval,
props.bufferingSize,
),
compressionFormat: props.compression?.value,
encryptionConfiguration: createEncryptionConfig(
role,
props.encryptionKey,
),
cloudWatchLoggingOptions: loggingOptions,
},
dependables: [bucketGrant, ...(loggingDependables ?? [])],
};
}
// Cloudwatch -> Firehose
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha";
import * as iam from "aws-cdk-lib/aws-iam";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";
export interface FirehoseLogDestinationProps {
/**
* The role to assume to write log events to the destination
*
* @default - A new Role is created
*/
readonly role?: iam.IRole;
}
/**
* Destination to allow a log's SubscriptionFilter to point at a firehose
* DeliveryStream.
*
* Taken from:
* https://github.com/aws/aws-cdk/blob/main/packages/aws-cdk-lib/aws-logs-destinations/lib/kinesis.ts#L21
*/
export class FirehoseLogDestination
implements logs.ILogSubscriptionDestination
{
constructor(
private readonly stream: firehose.IDeliveryStream,
private readonly props: FirehoseLogDestinationProps = {},
) {}
public bind(
scope: Construct,
_sourceLogGroup: logs.ILogGroup,
): logs.LogSubscriptionDestinationConfig {
// Following example from https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
// Create a role to be assumed by CWL that can write to this stream and pass itself.
const id = "CloudWatchLogsCanPutRecords";
const role =
this.props.role ??
(scope.node.tryFindChild(id) as iam.IRole) ??
new iam.Role(scope, id, {
assumedBy: new iam.ServicePrincipal("logs.amazonaws.com"),
});
this.stream.grantPutRecords(role);
role.grantPassRole(role);
const policy = role.node.tryFindChild("DefaultPolicy") as iam.CfnPolicy;
if (policy) {
// Remove circular dependency
const cfnRole = role.node.defaultChild as iam.CfnRole;
cfnRole.addOverride("DependsOn", undefined);
// Ensure policy is created before subscription filter
scope.node.addDependency(policy);
}
return { arn: this.stream.deliveryStreamArn, role };
}
}
// L3 Construct demonstrating usage
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";
import { FirehoseHttpEndpointDestination } from "./FirehoseHttpEndpointDestination";
import { FirehoseLogDestination } from "./FirehoseLogDestination";
export interface SumoLogicFirehoseProps {
url: string;
logGroup: logs.LogGroup;
filterPattern?: logs.IFilterPattern;
logging?: boolean;
}
/**
* Exports a Cloudwatch Log Group to SumoLogic via a Amazon Data Firehose AKA
* Kinesis Firehose stream.
*
* It relies on a combination of alpha-level AWS contructs and custom stuff
* built on top of them, so probably not very stable.
*
* - TODO: parameterize destination stuff
* - S3 backup bucket
* - processor
* - logging
*/
export class SumoLogicFirehose extends Construct {
subscription: logs.SubscriptionFilter;
firehoseStream: firehose.DeliveryStream;
constructor(
scope: Construct,
id: string,
{ logGroup, url, filterPattern, logging }: SumoLogicFirehoseProps,
) {
super(scope, id);
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html
this.firehoseStream = new firehose.DeliveryStream(this, "dstream", {
destinations: [
new FirehoseHttpEndpointDestination(url, {
name: "Sumo Logic",
logging: logging ?? true,
}),
],
encryption: firehose.StreamEncryption.AWS_OWNED,
});
this.subscription = new logs.SubscriptionFilter(this, "subscription", {
logGroup,
filterPattern: filterPattern ?? logs.FilterPattern.allEvents(),
destination: new FirehoseLogDestination(this.firehoseStream),
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment