Skip to content

Instantly share code, notes, and snippets.

@gbvanrenswoude
Last active December 19, 2022 11:04
Show Gist options
  • Save gbvanrenswoude/c430fb2514ce53a5d755d3ac33635091 to your computer and use it in GitHub Desktop.
Save gbvanrenswoude/c430fb2514ce53a5d755d3ac33635091 to your computer and use it in GitHub Desktop.
OpenSearch ISM handler in CDK - CR example
import { Sha256 } from '@aws-crypto/sha256-browser';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
import { NodeHttpHandler } from '@aws-sdk/node-http-handler';
import { HttpRequest } from '@aws-sdk/protocol-http';
import { SignatureV4 } from '@aws-sdk/signature-v4';
import { CdkCustomResourceEvent, CdkCustomResourceHandler } from 'aws-lambda';
export async function signRequest(request: HttpRequest, region = 'eu-west-1') {
const signer = new SignatureV4({
credentials: defaultProvider(),
region,
service: 'es',
sha256: Sha256,
});
return signer.sign(request);
}
export async function sendRequest(request: HttpRequest) {
try {
const client = new NodeHttpHandler();
const { response } = await client.handle(request);
console.log(
`OpenSearch replied with: ${response.statusCode} ${response.body.statusMessage}`,
);
let responseBody = '';
await new Promise(() => {
response.body.on('data', (chunk: unknown) => {
responseBody += chunk;
});
response.body.on('end', () => {
console.log('Response body: ' + responseBody);
});
}).catch((error) => {
console.log(`Error: ${error}`);
throw new Error(`Error: ${error}`);
});
if (response.statusCode < 200 || response.statusCode > 299) {
throw new Error(
`OpenSearch replied with: ${response.statusCode} ${response.body.statusMessage}, so we're throwing an error.`,
);
}
return responseBody;
} catch (error) {
console.log(`Error: ${error}`);
throw new Error(`Error: ${error}`);
}
}
export async function processIsm(
domain: string,
body: Record<string, unknown>,
method = 'PUT',
) {
console.log(`Processing ISM ${JSON.stringify(body)} for ${domain}`);
const request = new HttpRequest({
body: JSON.stringify(body),
headers: {
'Content-Type': 'application/json',
host: domain,
},
hostname: domain,
method,
path: '/_plugins/_ism/policies/policy_id',
});
const signedRequest = await signRequest(request);
// @ts-ignore property clone is missing in type HttpRequest
return sendRequest(signedRequest);
}
export const handler: CdkCustomResourceHandler = async (event) => {
console.log('CR invoked Handler');
console.log('Received CR Event: ', JSON.stringify(event, null, 2));
try {
switch (event.RequestType) {
case 'Create':
console.log('Create event processing');
return await onCreate(event);
case 'Update':
console.log('Update event processing');
return await onUpdate(event);
case 'Delete':
console.log('Delete event processing');
return await onDelete(event);
default:
throw new Error('Unsupported request type!');
}
} catch (err) {
console.error(err);
throw err;
}
};
export function hashCode(str: string) {
let hash = 0;
for (let i = 0; i < str.length; i++) {
hash = str.charCodeAt(i) + ((hash << 5) - hash);
}
return hash.toString();
}
const onCreate = async (event: CdkCustomResourceEvent) => {
const { ServiceToken, ismPolicy, domainEndpoint } = event.ResourceProperties;
await processIsm(domainEndpoint, ismPolicy);
return {
PhysicalResourceId: hashCode(JSON.stringify(ismPolicy)),
Data: {
ismPolicy,
},
};
};
const onUpdate = async (event: CdkCustomResourceEvent) => {
const { ServiceToken, ismPolicy, domainEndpoint } = event.ResourceProperties;
await processIsm(domainEndpoint, ismPolicy);
return {
PhysicalResourceId: hashCode(JSON.stringify(ismPolicy)),
Data: {
ismPolicy,
},
};
};
const onDelete = async (event: CdkCustomResourceEvent) => {
const { ServiceToken, ismPolicy, domainEndpoint } = event.ResourceProperties;
await processIsm(domainEndpoint, ismPolicy, 'DELETE');
return {
PhysicalResourceId: hashCode(JSON.stringify(ismPolicy)),
Data: {
ismPolicy,
},
};
};
// some imports
const ismPolicy = {
policy: {
description: "Move indexes between storage tiers to save money",
default_state: "hot",
states: [
{
name: "hot",
actions: [],
transitions: [
{
state_name: "snapshot",
conditions: {
min_index_age: "24h",
},
},
],
},
{
name: "snapshot",
actions: [
{
retry: {
count: 5,
backoff: "exponential",
delay: "30m",
},
snapshot: {
repository: "snapshot-repo",
snapshot: "ism-snapshot",
},
},
],
transitions: [
{
state_name: "warm",
conditions: {
min_index_age: "2d",
},
},
],
},
{
name: "warm",
actions: [
{
retry: {
count: 5,
backoff: "exponential",
delay: "1h",
},
warm_migration: {},
},
],
transitions: [
{
state_name: "cold",
conditions: {
min_index_age: "90d",
},
},
],
},
{
name: "cold",
actions: [
{
retry: {
count: 5,
backoff: "exponential",
delay: "1h",
},
cold_migration: {
timestamp_field: "@timestamp",
ignore: "none",
},
},
],
transitions: [
{
state_name: "delete",
conditions: {
min_index_age: "120d",
},
},
],
},
{
name: "delete",
actions: [
{
cold_delete: {},
},
],
transitions: [],
},
],
ism_template: [
{
index_patterns: ["cwl-*"],
priority: 100,
},
],
},
};
this.domain = this.createDomain(
securityGroupUserAccess,
);
const ismHandlerFunction = new lambdaNodeJs.NodejsFunction(
this.scope,
"IsmHandlerFunction",
{
logRetention: logs.RetentionDays.ONE_WEEK,
entry: path.resolve(__dirname, "./handler.ts"),
timeout: Duration.minutes(2),
role: this.createCRHandlerIamRole(),
vpc: this.props.vpc,
securityGroups: [this.toOpenSearchSG, new ec2.SecurityGroup(this.scope, "IsmHandlerFunctionSecurityGroup", {
vpc: this.props.vpc,
allowAllOutbound: true,
description: "Security group for the ISM handler function - allow egress access to public OpenSearch domains",
})],
}
);
const ismHandlerProvider = new cr.Provider(
this.scope,
"IsmHandlerProvider",
{
onEventHandler: ismHandlerFunction,
logRetention: logs.RetentionDays.ONE_DAY,
}
);
new CustomResource(this.scope, "IsmHandlerCustomResource", {
serviceToken: ismHandlerProvider.serviceToken,
properties: {
domainEndpoint: this.domain.domainEndpoint,
ismPolicy,
},
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment