Skip to content

Instantly share code, notes, and snippets.

@ImDevinC
Created May 11, 2023 06:49
Show Gist options
  • Save ImDevinC/69a2c5e952d0aac7e8b0b1179723414c to your computer and use it in GitHub Desktop.
Save ImDevinC/69a2c5e952d0aac7e8b0b1179723414c to your computer and use it in GitHub Desktop.
Lambda Otel Wrapper
locals {
consumer_package_name = "message-consumer.zip"
consumer_package_path = "${path.module}/dist/${local.consumer_package_name}"
}
resource "aws_lambda_function" "message_consumer" {
s3_bucket = aws_s3_bucket_object.consumer_s3_source.bucket
s3_key = aws_s3_bucket_object.consumer_s3_source.id
s3_object_version = aws_s3_bucket_object.consumer_s3_source.version_id
function_name = "apm-message-consumer"
role = aws_iam_role.consumer.arn
handler = "src/simple.handler"
runtime = "nodejs16.x"
source_code_hash = filesha256(local.consumer_package_path)
timeout = 30
environment {
variables = {
NODE_OPTIONS = "--require src/lambda-wrapper"
SQS_URL = aws_sqs_queue.publisher.url
}
}
vpc_config {
subnet_ids = local.subnet_ids
security_group_ids = local.security_group_ids
}
}
resource "aws_iam_role" "consumer" {
name = "apm-message-consumer-role"
assume_role_policy = data.aws_iam_policy_document.assume_role.json
}
resource "aws_iam_policy" "consumer" {
name = "apm-message-consumer-policy"
policy = data.aws_iam_policy_document.consumer_policy.json
}
resource "aws_iam_role_policy_attachment" "consumer_attach" {
role = aws_iam_role.consumer.name
policy_arn = aws_iam_policy.consumer.arn
}
resource "aws_iam_role_policy_attachment" "consumer_AWSLambdaBasicExecutionRole" {
role = aws_iam_role.consumer.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role_policy_attachment" "consumer_AWSLambdaVPCAccessExecutionRole" {
role = aws_iam_role.consumer.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}
data "aws_iam_policy_document" "consumer_policy" {
statement {
effect = "Allow"
actions = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
resources = [aws_sqs_queue.publisher.arn]
}
}
resource "aws_s3_bucket_object" "consumer_s3_source" {
bucket = "${var.deploy_config.environment}-lambdas"
key = "${var.deploy_config.project}/${local.consumer_package_name}"
source = local.consumer_package_path
etag = md5(local.consumer_package_path)
}
resource "aws_lambda_event_source_mapping" "consumer_mapping" {
event_source_arn = aws_sqs_queue.publisher.arn
enabled = true
function_name = aws_lambda_function.message_consumer.arn
}
import { diag, DiagConsoleLogger, DiagLogLevel } from "@opentelemetry/api";
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
import { registerInstrumentations } from "@opentelemetry/instrumentation";
import { awsLambdaDetector } from "@opentelemetry/resource-detector-aws";
import {
detectResourcesSync,
envDetector,
processDetector,
} from "@opentelemetry/resources";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
const initTracing = () => {
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
const defaultCollector = "https://o11y.ops.internal.dev:4317"
const resource = detectResourcesSync({
detectors: [awsLambdaDetector, envDetector, processDetector],
});
const provider = new NodeTracerProvider({
resource: resource,
});
const exporter = new OTLPTraceExporter({
url: defaultCollector,
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
registerInstrumentations({
tracerProvider: provider,
instrumentations: [
getNodeAutoInstrumentations({
"@opentelemetry/instrumentation-aws-lambda": {
disableAwsContextPropagation: true,
},
"@opentelemetry/instrumentation-fs": {
enabled: false, // This is very noisy, and at least on lambda isn't very helpful
},
}),
],
});
provider.register();
};
initTracing();
import { isSpanContextValid, SpanOptions, trace } from "@opentelemetry/api";
import { parseTraceParent } from "@opentelemetry/core";
import { Context, SQSEvent } from "aws-lambda";
const tracer = trace.getTracerProvider().getTracer("message-consumer");
const doWork = async (accountId: string) => {
const span = tracer.startSpan("dowork");
console.log("doing event", accountId);
span.setAttribute("accountId", accountId);
await new Promise((resolve) => setTimeout(resolve, 5000));
span.end();
};
export const handler = async (event: SQSEvent, context: Context) => {
for (const record of event.Records) {
const traceParent =
record.messageAttributes["traceparent"]?.stringValue ?? "";
console.log("traceparent", traceParent);
const parentCtx = parseTraceParent(traceParent);
console.log("parentctx", JSON.stringify(parentCtx));
const options: SpanOptions = {};
if (parentCtx && isSpanContextValid(parentCtx)) {
options.links = [{ context: parentCtx }];
}
console.log("options", JSON.stringify(options));
await tracer.startActiveSpan("handler", options, async (span) => {
await doWork(record.body);
console.log("spanContext", JSON.stringify(span.spanContext()));
span.end();
});
console.log("done");
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment