Skip to content

Instantly share code, notes, and snippets.

@tywalch
Last active April 6, 2023 14:36
Show Gist options
  • Save tywalch/8243afffac51eefd10a5db563d3913e3 to your computer and use it in GitHub Desktop.
Save tywalch/8243afffac51eefd10a5db563d3913e3 to your computer and use it in GitHub Desktop.
import { Kinesis } from 'aws-sdk';
import { Command } from 'commander';
import { decodeMessage } from '@jupiterone/platform-sdk-message-codec';
const kinesis = new Kinesis({
apiVersion: '2013-12-02',
region: 'us-east-1'
});
const noop = () => {};
function parseRecord(record: any) {
const message = Buffer.from(record.Data, 'base64').toString('utf-8');
return JSON.parse(message);
}
async function decodeRecord(record: any): Promise<any[]> {
const decoded = await decodeMessage({
logger: {
debug: noop,
trace: noop,
info: noop,
warn: noop,
error: noop,
fatal: noop,
child() {
return this;
},
} as any,
metadata: {
correlationId: '',
}
}, record.Data) as any[];
return decoded ?? [];
}
async function toJSON(record: any): Promise<any[]> {
try {
return [parseRecord(record)];
} catch (err) {
return decodeRecord(record);
}
}
type MaybeFilterAccountIdOptions = {
accountId?: string;
records: any[];
}
function maybeFilterAccountId(options: MaybeFilterAccountIdOptions) {
const { accountId, records } = options;
if (!accountId) {
return records;
}
return records.filter(record => {
const isQueryEngineStreamRecordHasAccountId = record?.accountId === accountId;
const isPersisterOutgoingRecordHasAccountId = record?.entity?._accountId === accountId || record?.relationship?._accountId === accountId;
return isQueryEngineStreamRecordHasAccountId || isPersisterOutgoingRecordHasAccountId;
});
}
type ConsumeOptions = {
streams: string[];
batchSize: number;
shardIteratorType: 'TRIM_HORIZON' | 'AT_TIMESTAMP';
accountId?: string;
}
async function consume(options: ConsumeOptions) {
const { streams, shardIteratorType, accountId, batchSize } = options;
// get records for kinesis stream on loop
while (true) {
for (const stream of streams) {
// get shardIds for stream
const shards = await kinesis.describeStream({ StreamName: stream }).promise();
// get shard iterator for each shard
for (const shard of shards.StreamDescription.Shards) {
const { ShardIterator } = await kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: shardIteratorType,
StreamName: stream
}).promise();
if (ShardIterator) {
const records = await kinesis.getRecords({
ShardIterator,
Limit: batchSize,
}).promise();
// process records
for (const record of records.Records) {
const decoded = await toJSON(record);
// maybe filter on accountId
const filtered = maybeFilterAccountId({ accountId, records: decoded });
for (const item of filtered) {
console.log(JSON.stringify(item, null, 2));
}
}
}
}
}
}
}
async function main() {
const program = new Command()
.name('Kinesis Consumer')
.description('Consumers the streams')
.version('0.0.1');
program
.command('consume <streams...>')
.description('consume provided streams')
.option('-a, --accountId <accountId>', 'AccountId to filter on')
.option('-b, --batchSize <batchSize>', 'Batch size to consume', '50')
.option('-s, --shardIteratorType <shardIteratorType>', 'Shard iterator type', 'TRIM_HORIZON')
.action(async (streams: string[], options: ConsumeOptions) => {
try {
const { batchSize, shardIteratorType, accountId } = options;
await consume({
streams,
batchSize: parseInt(`${batchSize}`, 10),
accountId,
shardIteratorType,
});
} catch (err) {
console.error(err);
}
});
program.parse(process.argv);
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment