Skip to content

Instantly share code, notes, and snippets.

@andreigec
Created September 21, 2022 06:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreigec/de7fd483b28dd4376bbefdee39982d80 to your computer and use it in GitHub Desktop.
Save andreigec/de7fd483b28dd4376bbefdee39982d80 to your computer and use it in GitHub Desktop.
push dynamo to opensearch
/* eslint-disable @typescript-eslint/no-var-requires */
const AWS = require('aws-sdk');
const db = new AWS.DynamoDB({ region: 'ap-southeast-2' });
const esDomain = {
region: 'ap-southeast-2',
domain: 'domain.ap-southeast-2.es.amazonaws.com',
doctype: 'lambda-type',
};
const dynamoTable = 'table-here';
var endpointurl = `https://${esDomain.domain}`;
const endpoint = new AWS.Endpoint(endpointurl);
const creds = new AWS.EnvironmentCredentials('AWS');
// const creds = new AWS.Credentials({
// accessKeyId: 'aaa',
// secretAccessKey: 'bbb',
// });
function postToES(doc, keyname, keyvalue) {
if (!keyname) {
throw new Error('no keyname');
}
if (!keyvalue) {
throw new Error('no keyvalue');
}
return new Promise((res, rej) => {
const req = new AWS.HttpRequest(endpoint, esDomain.region);
req.method = 'PUT';
req.path +=
keyname.toLowerCase() +
'/' +
esDomain.doctype +
'/' +
encodeURIComponent(keyvalue);
req.region = esDomain.region;
req.headers['Content-Type'] = 'application/json';
req.headers.Host = esDomain.domain;
req.body = JSON.stringify(parse(doc.NewImage));
req.headers['Content-Length'] = Buffer.byteLength(req.body);
const signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
const send = new AWS.HttpClient();
send.handleRequest(
req,
null,
(httpResp) => {
let respBody = '';
httpResp.on('data', (chunk) => {
respBody += chunk;
});
httpResp.on('end', () => {
const jc = JSON.parse(respBody);
if (!jc._shards && jc.message) {
rej(jc.message);
} else if (jc.error) {
rej(jc.error);
} else if (jc._shards.failed !== 0) {
rej(new Error('has failures'));
} else {
res(jc);
}
});
},
(err) => {
console.log(`Error: ${JSON.stringify(err, null, 2)}`);
rej(err);
},
);
});
}
const scan = async (tableName) => {
const Items = [];
let ExclusiveStartKey = undefined;
do {
const {
Items: newitems,
LastEvaluatedKey,
$response,
// eslint-disable-next-line no-await-in-loop
} = await db.scan({ TableName: tableName, ExclusiveStartKey }).promise();
ExclusiveStartKey = LastEvaluatedKey;
if ($response.error && $response.error.statusCode) {
throw new Error($response.error.message);
}
if (newitems) {
Items.push(...newitems);
}
} while (ExclusiveStartKey);
console.log(`dynamo scan against ${tableName} ok, count=${Items?.length}`);
return Items;
};
async function main() {
var scanned = await scan(dynamoTable);
for (var a = 0; a < scanned.length - 1; a += 1) {
if (a % 100 === 0) {
console.log('p=' + a);
}
var x2 = scanned[a];
try {
await postToES(
{ NewImage: x2, Keys: { UrlSafeTitle: x2.UrlSafeTitle } },
'UrlSafeTitle',
x2.UrlSafeTitle.S,
);
} catch (e) {
console.log('ES err=', e);
}
}
console.log('finished');
}
//exports.handler = main;
void main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment