Last active
November 1, 2022 22:17
-
-
Save taxilian/d5fcf6ac5a91ee849da92960118b6252 to your computer and use it in GitHub Desktop.
Example service using the abstraction I'm working on, - in filename shows directories
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { sendEmailTemplate, EmailDomains as ValidDomains } from '@/lib/email/mailer'; | |
import { remoteMethod } from '@/lib/nats/types'; | |
// NATS-capable email service | |
class EmailService { | |
@remoteMethod({ | |
timeout: 10000, | |
}) | |
async sendEmail(domain: string, tplName: string, data: any, options: any) { | |
if (!ValidDomains.includes(domain as ValidDomains)) { | |
throw new Error(`Invalid email domain: ${domain}`); | |
} | |
const dmn = domain as ValidDomains; | |
const resp = await sendEmailTemplate(dmn as any, tplName, data as unknown, options); | |
return resp; | |
} | |
} | |
export default new EmailService; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import emailService from './EmailService'; | |
import { connectNats } from '@/lib/nats'; | |
import { natsEnv } from '@/hostInfo'; | |
import { NatsService } from '@/lib/nats/NatsService'; | |
import { LocalService } from '@/lib/nats/LocalService'; | |
const serviceSubject = `${natsEnv}.service.email`; | |
let emailServiceDfd: Promise<typeof emailService>; | |
export async function getConsumer(type: 'nats' | 'local' = 'nats') { | |
if (emailServiceDfd) return emailServiceDfd; | |
const natsConnection = await connectNats(); | |
if (type === 'nats') { | |
return NatsService.makeConsumer(natsConnection, serviceSubject, emailService); | |
} else { | |
return LocalService.makeConsumer(emailService); | |
} | |
} | |
export async function getNatsProvider() { | |
const natsConnection = await connectNats(); | |
const emailServiceProvider = new NatsService(natsConnection, serviceSubject, emailService); | |
return emailServiceProvider; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { connectNats } from '@/lib/nats'; | |
import { Worker } from 'worker_threads'; | |
import pdfService from './PDFService'; | |
import { natsEnv } from '@/hostInfo'; | |
import { NatsService } from '@/lib/nats/NatsService'; | |
import { WorkerService } from '@/lib/nats/WorkerService'; | |
import { LocalService } from '@/lib/nats/LocalService'; | |
import path from 'path'; | |
const serviceSubject = `${natsEnv}.service.etPdf`; | |
let pdfServiceDfd: Promise<typeof pdfService>; | |
function getServiceViaWorker() { | |
const PDFServiceViaWorker = require('./PDFServiceViaWorker').default as typeof import('./PDFServiceViaWorker').default; | |
return new PDFServiceViaWorker(); | |
} | |
export async function getConsumer(type: 'nats' | 'local' = 'nats') { | |
if (pdfServiceDfd) return pdfServiceDfd; | |
pdfServiceDfd = (async () => { | |
const natsConnection = await connectNats(); | |
if (type === 'nats') { | |
return NatsService.makeConsumer(natsConnection, serviceSubject, pdfService); | |
} else { | |
let service = process.env.DISABLE_WORKER ? pdfService : getServiceViaWorker(); | |
return LocalService.makeConsumer(service); | |
} | |
})(); | |
return await pdfServiceDfd; | |
} | |
export async function getNatsProvider() { | |
const natsConnection = await connectNats(); | |
let service = process.env.DISABLE_WORKER ? pdfService : getServiceViaWorker(); | |
const pdfServiceProvider = new NatsService(natsConnection, serviceSubject, service); | |
return pdfServiceProvider; | |
} | |
export function getWorkerService() { | |
const worker = new Worker(path.join(__dirname, 'PDFService.js')); | |
const service = WorkerService.makeConsumer(worker, pdfService); | |
return service; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import '../../initPathAlias'; | |
import { ApplicantSessionSummary, SessionManifestData } from '@/examtools/actions/applicant'; | |
import { generateManifestPdf } from '@/lib/examForms/ManifestForm'; | |
import { ExamData, Form605Data } from '@/lib/examForms/SharedTypes'; | |
import { remoteMethod, remoteStream } from '@/lib/nats/types'; | |
import { createDocument } from '@/lib/pdf/pdflib'; | |
import { VECList } from '@shared/licenseInfo'; | |
import * as form605 from '@/lib/examForms/form605generator'; | |
import * as csce from '@/lib/examForms/formCSCEgenerator'; | |
import * as examResults from "@/lib/results/generatePdfLib"; | |
import stream from 'stream'; | |
import { WorkerService } from '@/lib/nats/WorkerService'; | |
export interface PDFRenderManifestDoc { | |
type: 'manifest'; | |
data: SessionManifestData; | |
} | |
export interface PDFRenderCSCEDoc { | |
type: 'csce'; | |
vec: VECList; | |
applicant: ApplicantSessionSummary; | |
session: ExamData; | |
isDraft: boolean; | |
} | |
export interface PDFRender605Doc { | |
type: '605'; | |
vec: VECList; | |
data: Form605Data; | |
} | |
export interface PDFRenderExamResultsDoc { | |
type: 'examResults'; | |
results: examResults.ResultDocument[]; | |
passCopies: number; | |
failCopies: number; | |
} | |
export type PDFRenderDoc = PDFRenderManifestDoc | PDFRenderCSCEDoc | PDFRender605Doc | PDFRenderExamResultsDoc; | |
// NATS-capable email service | |
class PDFService { | |
@remoteStream({}) | |
makePDF(pages: PDFRenderDoc[]): stream.Readable { | |
const outStream = new stream.Readable({ | |
read() {}, // There is no way to pause the stream, so we just ignore this | |
// readableObjectMode: true, | |
// writableObjectMode: true, | |
// objectMode: true, | |
}); | |
(async () => { | |
const pdfDoc = await createDocument(); | |
for (const page of pages) { | |
switch (page.type) { | |
case 'manifest': | |
await generateManifestPdf(page.data, pdfDoc); | |
break; | |
case 'csce': | |
await csce.renderToPdf(page.vec, page.applicant, page.session, page.isDraft, pdfDoc); | |
break; | |
case '605': | |
await form605.renderToPdf([page.data], page.vec, pdfDoc); | |
break; | |
case 'examResults': | |
await examResults.generateResultsPdf(pdfDoc, page.results, page.passCopies, page.failCopies); | |
break; | |
default: | |
throw new Error(`Unknown PDFRenderDoc type ${(<any>page).type}`); | |
} | |
} | |
const savedBytes = await pdfDoc.save(); | |
outStream.push(savedBytes); | |
outStream.push(null); | |
})().catch((err) => { | |
outStream.emit('error', err); | |
}); | |
return outStream; | |
} | |
} | |
const service = new PDFService(); | |
WorkerService.createOnWorkerThread(service); | |
export default service; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { remoteStream } from '@/lib/nats/types'; | |
import stream from 'stream'; | |
import * as threads from 'worker_threads'; | |
import { PDFRenderDoc } from './PDFService'; | |
import PDFService from './PDFService'; | |
import { getWorkerService } from './index'; | |
// NATS-capable service that uses a worker thread to do the work | |
class PDFServiceViaWorker { | |
service: typeof PDFService; | |
worker: threads.Worker; | |
constructor() { | |
this.service = getWorkerService(); | |
} | |
@remoteStream({}) | |
makePDF(pages: PDFRenderDoc[]): stream.Readable { | |
return this.service.makePDF(pages); | |
} | |
} | |
export default PDFServiceViaWorker; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is the service runner -- call it like `node dist/service email`, etc | |
// to run a NATS microservice provider for emailService | |
import './initPathAlias'; | |
import './log_tweaks'; | |
import fs from 'fs'; | |
import path from 'path'; | |
// Make sure we have source maps for useful stack traces even if compiled in node | |
import * as smp from 'source-map-support'; | |
smp.install(); | |
import * as K8sLifecycle from 'k8s-lifecycle'; | |
import { connectNats } from './lib/nats'; | |
import { NatsService } from './lib/nats/NatsService'; | |
const serviceName = process.argv.slice(-1)[0]?.toLowerCase(); | |
async function fileExists(filePath: string) { | |
try { | |
const stat = await fs.promises.stat(filePath); | |
return !!(stat?.isFile()); | |
} catch { | |
return false; | |
} | |
} | |
const serviceFile = /Service\.(ts|js)$/; | |
// Find all directories in ./services/ and make a map from service name to directory | |
async function findServices(serviceDir: string) { | |
const filesInDir = await fs.promises.readdir(serviceDir); | |
const services: Record<string, string> = {}; | |
for (const file of filesInDir) { | |
const filePath = path.join(serviceDir, file); | |
if (serviceFile.test(file)) { | |
services[file.replace(serviceFile, '')] = filePath; | |
} else { | |
const [ | |
tsExists, jsExists | |
] = await Promise.all([ | |
fileExists(path.join(filePath, 'index.ts')), | |
fileExists(path.join(filePath, 'index.js')), | |
]); | |
if (tsExists || jsExists) { | |
services[file.replace(/Service$/, '').toLowerCase()] = filePath; | |
} | |
} | |
} | |
return services; | |
} | |
interface ServiceModule { | |
getConsumer(type?: 'nats' | 'local'): Promise<{}>; | |
getNatsProvider(): Promise<NatsService<any>>; | |
} | |
async function start() { | |
const allServices = await findServices(path.join(__dirname, 'services')); | |
if (serviceName in allServices) { | |
console.log("Connecting to NATS server:"); | |
const natsConn = await connectNats(); | |
const serviceModule = require(allServices[serviceName]) as ServiceModule; | |
if (!('getNatsProvider' in serviceModule)) { | |
console.error(`Service ${serviceName} does not export a getNatsProvider function`); | |
process.exit(1); | |
} | |
const serviceServer = await serviceModule.getNatsProvider(); | |
serviceServer.start(); | |
K8sLifecycle.onReadyCheck(async () => { | |
return serviceServer.isReady(); | |
}); | |
K8sLifecycle.onShutdown(async () => { | |
await serviceServer.stop(); | |
await natsConn.drain(); | |
}); | |
} else { | |
console.error(`Service ${serviceName} not found`); | |
console.error(`Available services: ${Object.keys(allServices).join(', ')}`); | |
} | |
} | |
if (require.main === module) { | |
process.on('unhandledRejection', (reason, p) => { | |
console.log('Unhandled Rejection at: Promise', p, 'reason:', reason); | |
// application specific logging, throwing an error, or other logic here | |
// statsClient?.increment(StatNames.error.unhandledRejection); | |
}); | |
process.on('uncaughtException', function (err) { | |
if ((<any>err)?.code === 'EADDRINUSE') { | |
K8sLifecycle.setUnrecoverableError(err); | |
} | |
console.log('Caught unhandled exception: ', err); | |
console.log("Stack: ", err.stack); | |
// statsClient?.increment(StatNames.error.unhandledRejection, {errorType: err?.name ?? 'unknown'}); | |
// const hub = Sentry.getCurrentHub(); | |
// hub.withScope(async (scope) => { | |
// scope.setLevel(Sentry.Severity.Critical); | |
// hub.captureException(err, { originalException: err }); | |
// }); | |
}); | |
start(); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment