Last active
April 6, 2023 14:36
-
-
Save tywalch/8243afffac51eefd10a5db563d3913e3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Kinesis } from 'aws-sdk'; | |
import { Command } from 'commander'; | |
import { decodeMessage } from '@jupiterone/platform-sdk-message-codec'; | |
const kinesis = new Kinesis({ | |
apiVersion: '2013-12-02', | |
region: 'us-east-1' | |
}); | |
const noop = () => {}; | |
function parseRecord(record: any) { | |
const message = Buffer.from(record.Data, 'base64').toString('utf-8'); | |
return JSON.parse(message); | |
} | |
async function decodeRecord(record: any): Promise<any[]> { | |
const decoded = await decodeMessage({ | |
logger: { | |
debug: noop, | |
trace: noop, | |
info: noop, | |
warn: noop, | |
error: noop, | |
fatal: noop, | |
child() { | |
return this; | |
}, | |
} as any, | |
metadata: { | |
correlationId: '', | |
} | |
}, record.Data) as any[]; | |
return decoded ?? []; | |
} | |
async function toJSON(record: any): Promise<any[]> { | |
try { | |
return [parseRecord(record)]; | |
} catch (err) { | |
return decodeRecord(record); | |
} | |
} | |
type MaybeFilterAccountIdOptions = { | |
accountId?: string; | |
records: any[]; | |
} | |
function maybeFilterAccountId(options: MaybeFilterAccountIdOptions) { | |
const { accountId, records } = options; | |
if (!accountId) { | |
return records; | |
} | |
return records.filter(record => { | |
const isQueryEngineStreamRecordHasAccountId = record?.accountId === accountId; | |
const isPersisterOutgoingRecordHasAccountId = record?.entity?._accountId === accountId || record?.relationship?._accountId === accountId; | |
return isQueryEngineStreamRecordHasAccountId || isPersisterOutgoingRecordHasAccountId; | |
}); | |
} | |
type ConsumeOptions = { | |
streams: string[]; | |
batchSize: number; | |
shardIteratorType: 'TRIM_HORIZON' | 'AT_TIMESTAMP'; | |
accountId?: string; | |
} | |
async function consume(options: ConsumeOptions) { | |
const { streams, shardIteratorType, accountId, batchSize } = options; | |
// get records for kinesis stream on loop | |
while (true) { | |
for (const stream of streams) { | |
// get shardIds for stream | |
const shards = await kinesis.describeStream({ StreamName: stream }).promise(); | |
// get shard iterator for each shard | |
for (const shard of shards.StreamDescription.Shards) { | |
const { ShardIterator } = await kinesis.getShardIterator({ | |
ShardId: shard.ShardId, | |
ShardIteratorType: shardIteratorType, | |
StreamName: stream | |
}).promise(); | |
if (ShardIterator) { | |
const records = await kinesis.getRecords({ | |
ShardIterator, | |
Limit: batchSize, | |
}).promise(); | |
// process records | |
for (const record of records.Records) { | |
const decoded = await toJSON(record); | |
// maybe filter on accountId | |
const filtered = maybeFilterAccountId({ accountId, records: decoded }); | |
for (const item of filtered) { | |
console.log(JSON.stringify(item, null, 2)); | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
async function main() { | |
const program = new Command() | |
.name('Kinesis Consumer') | |
.description('Consumers the streams') | |
.version('0.0.1'); | |
program | |
.command('consume <streams...>') | |
.description('consume provided streams') | |
.option('-a, --accountId <accountId>', 'AccountId to filter on') | |
.option('-b, --batchSize <batchSize>', 'Batch size to consume', '50') | |
.option('-s, --shardIteratorType <shardIteratorType>', 'Shard iterator type', 'TRIM_HORIZON') | |
.action(async (streams: string[], options: ConsumeOptions) => { | |
try { | |
const { batchSize, shardIteratorType, accountId } = options; | |
await consume({ | |
streams, | |
batchSize: parseInt(`${batchSize}`, 10), | |
accountId, | |
shardIteratorType, | |
}); | |
} catch (err) { | |
console.error(err); | |
} | |
}); | |
program.parse(process.argv); | |
} | |
main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment