Skip to content

Instantly share code, notes, and snippets.

@identiq
Created October 19, 2023 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save identiq/7edc0fb21bee4c5accea618d7162e31c to your computer and use it in GitHub Desktop.
Save identiq/7edc0fb21bee4c5accea618d7162e31c to your computer and use it in GitHub Desktop.
node:stream/promises
import dayjs from 'dayjs'
import { parse } from 'csv-parse'
import { fecs, type Company, type Fec } from './schema'
import { type Readable, Transform } from 'node:stream'
import { stringify } from 'csv-stringify'
import { drz, sqlClient } from './drizzle'
import { and, eq, sql } from 'drizzle-orm'
import { pipeline } from 'node:stream/promises'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { s3 } from '~/services/s3/s3.server'
import { CompanyStatus, FileStatus } from './enums'
import { formatFileDate, isRejected } from './utils'
import { FEC_COLUMNS } from './constants'
import { getFilesByCompanyId } from '~/models/company/get-file'
import { updateCompany } from '~/models/company/update-company'
import { updateFile } from '~/models/company/update-file'
const transformFec = (companyId: string, untilDateStr: string, fromDateStr: string) => {
return new Transform({
objectMode: true,
transform(
{
ecritureNum,
debit,
credit,
ecritureDate: ecritureDateRaw,
dateLet,
validDate,
pieceDate,
compteNum,
...record
}: Fec,
encoding,
cb,
) {
const ecritureDate = formatFileDate(ecritureDateRaw)
const ecritureDateStr = ecritureDate?.slice(0, 7)
const isValid =
ecritureDateStr &&
ecritureDateStr >= fromDateStr &&
ecritureDateStr <= untilDateStr
if (!isValid) return cb()
const statement = {
companyId,
ecritureDate,
compteNum,
debit: debit.replace(/,/g, '.'),
credit: credit.replace(/,/g, '.'),
}
cb(null, statement)
},
})
}
export async function processFiles(companyId: Company['id']) {
const companyFiles = await getFilesByCompanyId(companyId)
const pendingFiles = companyFiles.filter(({ status }) => status === FileStatus.Pending)
if (!pendingFiles.length) {
return companyFiles
}
await updateCompany(companyId, { status: CompanyStatus.Processing })
const streams = await Promise.allSettled(
pendingFiles.map(async ({ id, data, untilDate }) => {
await updateFile(id, { status: FileStatus.Processing })
return await drz.transaction(async (tx) => {
const obj = await s3.send(
new GetObjectCommand({ Bucket: process.env.BUCKET_NAME, Key: data }),
)
if (!obj.Body) throw new Error('File has no Body')
const untilDateJs = dayjs(untilDate)
const untilDateStr = untilDateJs.format('YYYY-MM')
const fromDateStr = untilDateJs
.subtract(1, 'year')
.add(1, 'month')
.format('YYYY-MM')
const parser = parse({
delimiter: '\t',
trim: true,
skip_empty_lines: true,
relax_column_count: true,
relax_quotes: true,
columns: FEC_COLUMNS,
from: 2,
})
const ingestStream =
await sqlClient`copy fec (company_id, credit, debit, compte_num, ecriture_date) from stdin`.writable()
await tx
.delete(fecs)
.where(
and(
eq(fecs.companyId, companyId),
sql`to_char(${fecs.ecritureDate}, 'YYYY-MM') <= ${untilDateStr}`,
sql`to_char(${fecs.ecritureDate}, 'YYYY-MM') >= ${fromDateStr}`,
),
)
return pipeline(
obj.Body as Readable,
parser,
transformFec(companyId, untilDateStr, fromDateStr),
stringify({
delimiter: '\t',
columns: ['companyId', 'credit', 'debit', 'compteNum', 'ecritureDate'],
}),
ingestStream,
)
})
}),
)
const processedFiles = await Promise.all(
pendingFiles.map(async (file, i) => {
const p = streams[i]
return updateFile(file.id, {
status: p.status === 'rejected' ? FileStatus.Failed : FileStatus.Complete,
processedAt: p.status === 'fulfilled' ? new Date() : undefined,
errorMessage: p.status === 'rejected' ? p.reason.toString() : undefined,
})
}),
)
const hasError = streams.some(isRejected)
await updateCompany(companyId, {
status: hasError ? CompanyStatus.Failed : CompanyStatus.Pending,
})
if (hasError) throw new Error(streams.find(isRejected)?.reason)
return processedFiles
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment