Skip to content

Instantly share code, notes, and snippets.

@joyrexus
Last active November 9, 2022 18:29
Show Gist options
  • Save joyrexus/dc9071944e9bcdd7d68d87af3dd86ba6 to your computer and use it in GitHub Desktop.
Save joyrexus/dc9071944e9bcdd7d68d87af3dd86ba6 to your computer and use it in GitHub Desktop.
demo parser/chunker
const fs = require("fs");
const axios = require("axios");
const { parse } = require("csv-parse");
const Renderer = require("squirrelly");
class Template {
constructor(template) {
this.template = template;
}
render(data) {
return Renderer.render(this.template, data);
}
}
class Bucket {
constructor(bucket) {
this.name = bucket;
}
async write(key, data) {
console.log(`Writing to ${this.name}/${key}`);
console.log(data); // here we'd actually write to s3
}
async getReadStream(key) {
console.log(`Getting read stream from ${this.name}/${key}`);
return fs.createReadStream("sample.csv"); // here we'd get the actual s3 read stream
}
}
class Reporter {
constructor(apiKey, bucket, key) {
const baseUrl = "https://api.datadoghq.com/api/v1";
this.url = `${this.baseUrl}/events?api_key=${apiKey}`;
this.tags = {
bucket,
key,
};
}
async sendError(text) {
const payload = {
alert_type: "error",
title: "transaction.ingestion.chunker",
service: "transaction.ingestion",
aggregation_key: "transaction.ingestion.chunker",
priority: "normal",
source_type_name: "amazon lambda",
date_happened: Math.floor(new Date().getTime() / 1000),
tags: this.tags,
text,
};
return axios.post(this.url, payload);
}
}
class Parser {
constructor(stream) {
this.stream = stream.pipe(
parse({
columns: true,
delimiter: "|",
trim: true,
})
);
}
}
class Processor {
constructor(bucket, key, template, chunkSize = 2) {
this.bucket = bucket;
this.key = key;
const { basePath, datePath, fileName } = this.parseKey(key);
this.basePath = basePath;
this.fileName = fileName;
this.template = template;
this.chunkSize = chunkSize;
this.accounts = new Set();
this.chunks = new Array();
}
parseKey(key) {
const regexp =
"^(?<basePath>upload/transaction/(?<datePath>\\d{4}/\\d{2}/\\d{2}(T\\d{2}_\\d{2})?))/raw/(?<fileName>\\w+).[tc]sv$";
return key.match(regexp).groups;
}
// persist list of account ids that had transactions loaded
async writeAccounts() {
const data = JSON.stringify(Array.from(this.accounts), null, 4);
const key = `${this.basePath}/accounts/${this.fileName}.json`;
return await this.bucket.write(key, data); // write records to s3
}
// write a chunk of parsed records to s3 bucket
async writeChunk(chunkNumber, records) {
const data = JSON.stringify(records, null, 4);
const key = `${this.basePath}/chunks/${this.fileName}${chunkNumber}.json`;
this.chunks.push(key);
return await this.bucket.write(key, data); // write records to s3
}
// parse and chunk an s3 file in the given cu bucket
async process() {
let chunkNumber = 1;
let parsedRecords = [];
const stream = await this.bucket.getReadStream(this.key);
const parser = new Parser(stream);
// iterate over each record in stream
for await (const record of parser.stream) {
const rendered = this.template.render(record);
const parsed = JSON.parse(rendered);
this.accounts.add(parsed.accountId);
parsedRecords.push(parsed);
// write out a chunk
if (parsedRecords.length === this.chunkSize) {
await this.writeChunk(chunkNumber, parsedRecords);
chunkNumber = chunkNumber + 1;
parsedRecords = [];
}
}
// if records remain, write out a last chunk
if (parsedRecords.length) {
await this.writeChunk(chunkNumber, parsedRecords);
}
// persist list of account ids that had transactions loaded
await this.writeAccounts();
console.log(`Parsed and chunked ${this.bucket.name}/${this.key}`);
}
}
(async () => {
const apiKey = "DATA_DOG_API_KEY";
const bucketName = "CU_BUCKET_NAME";
const key = "upload/transaction/2022/10/08/raw/sample.csv";
const reporter = new Reporter(apiKey, bucketName, key);
const bucket = new Bucket(bucketName);
const template = new Template(
'{"accountId": "{{it.AccountID}}","amount": "{{it.Amount}}","date": "{{it.Date}}"}'
);
try {
const processor = new Processor(bucket, key, template);
await processor.process();
} catch (err) {
const message = `Failed to process ${bucketName}/${key}: ${err.message}`;
console.error(message);
await reporter.sendError(message);
}
})().catch(console.error);
{
"dependencies": {
"axios": "^1.1.3",
"csv-parse": "^5.3.1",
"squirrelly": "^8.0.8"
}
}
We can make this file beautiful and searchable if this error is corrected: No commas found in this CSV file in line 0.
Column1|Column2|Column3
1-1|1-2|1-3
2-1|2-2|2-3
3-1|3-2|3-3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment