Skip to content

Instantly share code, notes, and snippets.

@filmaj
Last active April 6, 2024 03:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filmaj/e4965efbc1aae013d611b00231325301 to your computer and use it in GitHub Desktop.
Save filmaj/e4965efbc1aae013d611b00231325301 to your computer and use it in GitHub Desktop.
readable dynamodb query stream using aws-lite
const { Readable } = require('node:stream');
const DocumentSource = require('./documentsource');
module.exports = class DocumentClientQueryReadable extends Readable {
constructor (client, params) {
super({ objectMode: true });
this.source = new DocumentSource(client, params);
this.source.on('data', this.onData.bind(this));
this.source.on('end', this.onEnd.bind(this));
}
_read () {
this.source.readStart();
}
onData (chunk) {
this.push(chunk);
}
onEnd () {
this.push(null);
this.source.removeListener('data', this.onData);
this.source.removeListener('end', this.onEnd);
}
};
const { EventEmitter } = require('node:events');
module.exports = class DocumentSource extends EventEmitter {
constructor (client, params) {
super();
this.client = client;
this.params = params;
}
readStart () {
this.read().then((res) => {
if (res.LastEvaluatedKey) {
this.params = {
...this.params,
ExclusiveStartKey: res.LastEvaluatedKey
};
this.emit('data', res);
} else {
this.emit('data', res);
this.emit('end');
}
});
}
read () {
return this.client.DynamoDB.Query(this.params);
}
}
const awsLite = require('@aws-lite/client');
const DocumentClientQueryReadable = require('./documentclientqueryreadable');
const JSONStream = require('JSONStream');
awsLite({ plugins: [import('@aws-lite/dynamodb')]}).then((aws) => {
new DocumentClientQueryReadable(aws, {
TableName: 'whatevers',
IndexName: 'PartitionedByYear',
KeyConditionExpression: 'yearUpdated = :yearUpdated AND startdate BETWEEN :start AND :end',
ExpressionAttributeValues: {
':yearUpdated': `2024`,
':start': `2024-03-01`,
':end': `2024-03-31`
},
ScanIndexForward: false // sort descending to get most recent records
}).pipe(JSONStream.stringify(false)).pipe(process.stdout);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment