Skip to content

Instantly share code, notes, and snippets.

@fl0wo
Created September 22, 2023 21:30
Show Gist options
  • Save fl0wo/ddd54d3dd66e7ac4b1429f2b486a1f0f to your computer and use it in GitHub Desktop.
Save fl0wo/ddd54d3dd66e7ac4b1429f2b486a1f0f to your computer and use it in GitHub Desktop.
A way to batch-write records to an AWS Timestream Database
import {
_Record,
Dimension,
MeasureValue,
MeasureValueType, TimestreamWriteClient,
WriteRecordsCommand, WriteRecordsCommandOutput,
WriteRecordsRequest,
} from '@aws-sdk/client-timestream-write';
export const getInsertRecordParameterToTimeStream = (
time: number | undefined,
fieldDimensionName: string,
fieldDimensionValue: string,
fields: Array<{
label: string;
value: number;
}>,
fieldsMeasureName: string,
databaseName: string,
tableName: string) => {
const currentTime = getSafeNull(time, Date.now());
const dimension: Dimension = {
Name: fieldDimensionName,
Value: fieldDimensionValue,
};
const measures: MeasureValue[] = fields.map((el) => {
const measure: MeasureValue = {
Name: el.label,
Type: MeasureValueType.DOUBLE,
Value: String(el.value),
};
return measure;
});
const toInsertRecord: Record<any, any> = {
Dimensions: [dimension],
MeasureName: fieldsMeasureName,
MeasureValues: measures,
MeasureValueType: MeasureValueType.MULTI,
Time: currentTime.toString(),
Version: currentTime,
};
const params: WriteRecordsRequest = {
DatabaseName: databaseName,
TableName: tableName,
Records: [toInsertRecord],
};
return { params };
};
export const writeBatchToTimestream = async (params: WriteRecordsRequest[][]) => {
const writeClient = loadWriteTimeStreamClientBulk();
const allWriteRecordRequests = params.flat();
const DatabaseName = getSafeNull(
allWriteRecordRequests.find(el=>!!el.DatabaseName)?.DatabaseName,
process.env.TIMESTREAM_DB_NAME,
);
const TableName = getSafeNull(
allWriteRecordRequests.find(el=>!!el.TableName)?.TableName,
process.env.TIMESTREAM_TABLE_NAME,
);
// @ts-ignore
const records: _Record[] = allWriteRecordRequests.map(
(el) => el.Records,
)
.flat()
.filter((el) => !!el);
console.log('Sending', records.length, ' total candles');
return saveToTimeStreamInBatches(
DatabaseName,
TableName,
records,
writeClient,
100,
);
};
export default async function saveToTimeStreamInBatches(
DatabaseName: string,
TableName: string,
records: _Record[],
writeClient: TimestreamWriteClient,
batchSize: number = 100) {
const batches: _Record[][] = splitArrayInChunks(batchSize, records);
const operations:WriteRecordsCommandOutput[] = [];
for (const batch of batches) {
const outPut = await writeRecordsToTimeStream(
DatabaseName,
TableName,
batch,
writeClient,
);
if (!!outPut) {
operations.push(outPut);
}
}
return operations;
}
async function writeRecordsToTimeStream(
DatabaseName: string,
TableName: string,
records: _Record[],
writeClient: TimestreamWriteClient) {
const oneRequest: WriteRecordsRequest = {
DatabaseName: DatabaseName,
TableName: TableName,
Records: records,
};
// One shot all candles of different types and symbols
const command = new WriteRecordsCommand(oneRequest);
return await writeClient.send(command)
}
export const splitArrayInChunks = <T> (chunkSize:number, array:T[]):T[][] => {
const arrays:T[][]= [];
for (let i = 0; i < array.length; i += chunkSize) {
const chunk = array.slice(i, i + chunkSize);
arrays.push(chunk);
}
return arrays;
};
export const getSafeNull = (obj:any, fallback:any) => {
if (obj!==null && obj!==undefined) {
return obj;
} else {
return fallback;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment