Last active
November 9, 2022 18:29
-
-
Save joyrexus/dc9071944e9bcdd7d68d87af3dd86ba6 to your computer and use it in GitHub Desktop.
demo parser/chunker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require("fs"); | |
const axios = require("axios"); | |
const { parse } = require("csv-parse"); | |
const Renderer = require("squirrelly"); | |
class Template { | |
constructor(template) { | |
this.template = template; | |
} | |
render(data) { | |
return Renderer.render(this.template, data); | |
} | |
} | |
class Bucket { | |
constructor(bucket) { | |
this.name = bucket; | |
} | |
async write(key, data) { | |
console.log(`Writing to ${this.name}/${key}`); | |
console.log(data); // here we'd actually write to s3 | |
} | |
async getReadStream(key) { | |
console.log(`Getting read stream from ${this.name}/${key}`); | |
return fs.createReadStream("sample.csv"); // here we'd get the actual s3 read stream | |
} | |
} | |
class Reporter { | |
constructor(apiKey, bucket, key) { | |
const baseUrl = "https://api.datadoghq.com/api/v1"; | |
this.url = `${this.baseUrl}/events?api_key=${apiKey}`; | |
this.tags = { | |
bucket, | |
key, | |
}; | |
} | |
async sendError(text) { | |
const payload = { | |
alert_type: "error", | |
title: "transaction.ingestion.chunker", | |
service: "transaction.ingestion", | |
aggregation_key: "transaction.ingestion.chunker", | |
priority: "normal", | |
source_type_name: "amazon lambda", | |
date_happened: Math.floor(new Date().getTime() / 1000), | |
tags: this.tags, | |
text, | |
}; | |
return axios.post(this.url, payload); | |
} | |
} | |
class Parser { | |
constructor(stream) { | |
this.stream = stream.pipe( | |
parse({ | |
columns: true, | |
delimiter: "|", | |
trim: true, | |
}) | |
); | |
} | |
} | |
class Processor { | |
constructor(bucket, key, template, chunkSize = 2) { | |
this.bucket = bucket; | |
this.key = key; | |
const { basePath, datePath, fileName } = this.parseKey(key); | |
this.basePath = basePath; | |
this.fileName = fileName; | |
this.template = template; | |
this.chunkSize = chunkSize; | |
this.accounts = new Set(); | |
this.chunks = new Array(); | |
} | |
parseKey(key) { | |
const regexp = | |
"^(?<basePath>upload/transaction/(?<datePath>\\d{4}/\\d{2}/\\d{2}(T\\d{2}_\\d{2})?))/raw/(?<fileName>\\w+).[tc]sv$"; | |
return key.match(regexp).groups; | |
} | |
// persist list of account ids that had transactions loaded | |
async writeAccounts() { | |
const data = JSON.stringify(Array.from(this.accounts), null, 4); | |
const key = `${this.basePath}/accounts/${this.fileName}.json`; | |
return await this.bucket.write(key, data); // write records to s3 | |
} | |
// write a chunk of parsed records to s3 bucket | |
async writeChunk(chunkNumber, records) { | |
const data = JSON.stringify(records, null, 4); | |
const key = `${this.basePath}/chunks/${this.fileName}${chunkNumber}.json`; | |
this.chunks.push(key); | |
return await this.bucket.write(key, data); // write records to s3 | |
} | |
// parse and chunk an s3 file in the given cu bucket | |
async process() { | |
let chunkNumber = 1; | |
let parsedRecords = []; | |
const stream = await this.bucket.getReadStream(this.key); | |
const parser = new Parser(stream); | |
// iterate over each record in stream | |
for await (const record of parser.stream) { | |
const rendered = this.template.render(record); | |
const parsed = JSON.parse(rendered); | |
this.accounts.add(parsed.accountId); | |
parsedRecords.push(parsed); | |
// write out a chunk | |
if (parsedRecords.length === this.chunkSize) { | |
await this.writeChunk(chunkNumber, parsedRecords); | |
chunkNumber = chunkNumber + 1; | |
parsedRecords = []; | |
} | |
} | |
// if records remain, write out a last chunk | |
if (parsedRecords.length) { | |
await this.writeChunk(chunkNumber, parsedRecords); | |
} | |
// persist list of account ids that had transactions loaded | |
await this.writeAccounts(); | |
console.log(`Parsed and chunked ${this.bucket.name}/${this.key}`); | |
} | |
} | |
(async () => { | |
const apiKey = "DATA_DOG_API_KEY"; | |
const bucketName = "CU_BUCKET_NAME"; | |
const key = "upload/transaction/2022/10/08/raw/sample.csv"; | |
const reporter = new Reporter(apiKey, bucketName, key); | |
const bucket = new Bucket(bucketName); | |
const template = new Template( | |
'{"accountId": "{{it.AccountID}}","amount": "{{it.Amount}}","date": "{{it.Date}}"}' | |
); | |
try { | |
const processor = new Processor(bucket, key, template); | |
await processor.process(); | |
} catch (err) { | |
const message = `Failed to process ${bucketName}/${key}: ${err.message}`; | |
console.error(message); | |
await reporter.sendError(message); | |
} | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"dependencies": { | |
"axios": "^1.1.3", | |
"csv-parse": "^5.3.1", | |
"squirrelly": "^8.0.8" | |
} | |
} |
We can make this file beautiful and searchable if this error is corrected: No commas found in this CSV file in line 0.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Column1|Column2|Column3 | |
1-1|1-2|1-3 | |
2-1|2-2|2-3 | |
3-1|3-2|3-3 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment