Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mariuszmikolajczak/ebdc957e67a3b204914aa26774c1ec72 to your computer and use it in GitHub Desktop.
Save mariuszmikolajczak/ebdc957e67a3b204914aa26774c1ec72 to your computer and use it in GitHub Desktop.
"use strict";
const pulumi = require("@pulumi/pulumi");
const aws = require("@pulumi/aws");
const { destinationHttpHostname, destinationHttpPort, destinationHttpBasePath } = require("./config");
const queue = new aws.sqs.Queue("webhooksQueue", { visibilityTimeoutSeconds: 180 });
const processMessageFunc = new aws.lambda.CallbackFunction("processMessage", {
callback: (event) => {
const https = require('https');
for (const record of event.Records) {
const requestBody = JSON.parse(record.body);
const httpMethod = record.messageAttributes && record.messageAttributes.httpMethod ? record.messageAttributes.httpMethod.stringValue : "GET" ;
const httpPath = record.messageAttributes && record.messageAttributes.httpPath ? record.messageAttributes.httpPath.stringValue : "/" ;
const httpBasePath = process.env.destinationHttpBasePath ? process.env.destinationHttpBasePath : "";
const headers = {};
if (record.messageAttributes && record.messageAttributes.authorizationHeader) {
headers['Authorization'] = record.messageAttributes.authorizationHeader.stringValue;
}
if (record.messageAttributes && record.messageAttributes.contentTypeHeader) {
headers['Content-Type'] = record.messageAttributes.contentTypeHeader.stringValue;
}
const httpOptions = {
hostname: process.env.destinationHttpHostname,
port: process.env.destinationHttpPort,
path: `${httpBasePath}/${httpPath}`,
method: httpMethod,
headers
};
const req = https.request(httpOptions, (response) => {
let responseBody = '';
console.log('Status:', response.statusCode);
console.log('Headers:', JSON.stringify(response.headers));
response.setEncoding('utf8');
response.on('data', (chunk) => responseBody += chunk);
response.on('end', () => {
console.log('Successfully processed HTTPS response');
// If we know it's JSON, parse it
if (response.headers['content-type'] === 'application/json') {
responseBody = JSON.parse(responseBody);
}
});
});
req.on("error", (error) => {
console.log(error);
throw error;
});
req.write(JSON.stringify(requestBody));
req.end();
}
return `Successfully processed ${event.Records.length} messages.`;
},
environment: {
variables: {
destinationHttpHostname,
destinationHttpPort,
destinationHttpBasePath
},
}
});
queue.onEvent("processMessage", processMessageFunc, { batchSize: 1 });
const apiRole = new aws.iam.Role("apiRole", {
assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: [{
Action: "sts:AssumeRole",
Effect: "Allow",
Sid: "",
"Principal": {
"Service": "apigateway.amazonaws.com"
}
}],
})
});
const apiRolePolicy = new aws.iam.Policy("apiRolePerms", {
path: "/",
description: "",
policy: {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:FilterLogEvents"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"sqs:GetQueueUrl",
"sqs:ChangeMessageVisibility",
"sqs:ListDeadLetterSourceQueues",
"sqs:SendMessageBatch",
"sqs:PurgeQueue",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:CreateQueue",
"sqs:ListQueueTags",
"sqs:ChangeMessageVisibilityBatch",
"sqs:SetQueueAttributes"
],
"Resource": queue.arn
},
{
"Effect": "Allow",
"Action": "sqs:ListQueues",
"Resource": "*"
}
]
}
});
const iamPolicyAttachment = new aws.iam.PolicyAttachment("sqsAttachment", {
roles: [apiRole.name],
policyArn: apiRolePolicy.arn,
});
const restApi = new aws.apigateway.RestApi("webhooksRestApi");
const sqsRequestTemplate = ({ httpMethod, httpPath }) => {
const requestObject = {
action: "SendMessage",
messageBody: "$util.urlEncode($input.body)",
messageAttributes: [
{ name: "httpMethod", value: encodeURIComponent(httpMethod), dataType: "String" },
{ name: "authorizationHeader", value: "$util.urlEncode($input.params(\"Authorization\"))", dataType: "String" },
{ name: "contentTypeHeader", value: "$util.urlEncode($input.params(\"Content-Type\"))", dataType: "String" },
{ name: "httpPath", value: encodeURIComponent(httpPath), dataType: "String" }
]
}
const attributes = [];
for (let i = 0 ; i < requestObject.messageAttributes.length ; i++) {
const attr = requestObject.messageAttributes[i];
const itemCount = i + 1;
attributes.push(`MessageAttribute.${itemCount}.Name=${attr.name}&MessageAttribute.${itemCount}.Value.StringValue=${attr.value}&MessageAttribute.${itemCount}.Value.DataType=String`);
}
return pulumi.interpolate `Action=${requestObject.action}&MessageBody=${requestObject.messageBody}&${attributes.join("&")}`;
}
const createIntegrationRoute = ({ restApi, resourceName, resourcePath, destinationPath, httpMethod }) => {
const webhookResource = new aws.apigateway.Resource(`${resourceName}Resource`, {
restApi: restApi.id,
parentId: restApi.rootResourceId,
pathPart: resourcePath,
});
const webhookMethod = new aws.apigateway.Method(`${resourceName}Method`, {
restApi: restApi.id,
resourceId: webhookResource.id,
apiKeyRequired: false,
httpMethod: httpMethod,
authorization: "NONE",
});
const sqsIntegration = new aws.apigateway.Integration(`${resourceName}SqsIntegration`, {
restApi: restApi.id,
resourceId: webhookResource.id,
httpMethod: "POST",
integrationHttpMethod: "POST",
type: "AWS",
passthroughBehavior: "NEVER",
credentials: apiRole.arn,
uri: pulumi.interpolate `arn:aws:apigateway:eu-central-1:sqs:path/${queue.name}`,
requestParameters: {
"integration.request.header.Content-Type": "'application/x-www-form-urlencoded'"
},
requestTemplates: {
"application/json": sqsRequestTemplate({ httpMethod, httpPath: destinationPath })
}
}, { dependsOn: [webhookResource, webhookMethod] });
const integrationResponse = new aws.apigateway.IntegrationResponse(`${resourceName}200`, {
restApi: restApi.id,
resourceId: webhookResource.id,
httpMethod: webhookMethod.httpMethod,
statusCode: 200,
selectionPattern: "^2[0-9][0-9]",
responseTemplates: {
"application/json": "{\"status\": \"ok\"}"
}
}, { dependsOn: [sqsIntegration] })
const apiResponseSuccess = new aws.apigateway.MethodResponse(`${resourceName}200`, {
restApi: restApi.id,
resourceId: webhookResource.id,
httpMethod: webhookMethod.httpMethod,
statusCode: 200,
responseModels: {
"application/json": "Empty"
}
});
return [webhookResource, webhookMethod, sqsIntegration, integrationResponse, apiResponseSuccess];
}
// => POST example/path
const examplePathWebhookSqsIntegration = createIntegrationRoute({
restApi,
resourceName: "examplePath",
resourcePath: "example_path_webhook",
httpMethod: "POST",
destinationPath: "example/path"
})
// <= POST example/path
// API Deploy
const apiDeployment = new aws.apigateway.Deployment("apiDeploy", {
restApi: restApi.id,
stageName: "v1"
}, {
dependsOn: [].concat(examplePathWebhookSqsIntegration)
});
module.exports = {
queue,
apiDeployment
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment