Skip to content

Instantly share code, notes, and snippets.

@taxilian
Last active November 1, 2022 22:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taxilian/d5fcf6ac5a91ee849da92960118b6252 to your computer and use it in GitHub Desktop.
Save taxilian/d5fcf6ac5a91ee849da92960118b6252 to your computer and use it in GitHub Desktop.
Example service using the abstraction I'm working on, - in filename shows directories
import { sendEmailTemplate, EmailDomains as ValidDomains } from '@/lib/email/mailer';
import { remoteMethod } from '@/lib/nats/types';
// NATS-capable email service
class EmailService {
@remoteMethod({
timeout: 10000,
})
async sendEmail(domain: string, tplName: string, data: any, options: any) {
if (!ValidDomains.includes(domain as ValidDomains)) {
throw new Error(`Invalid email domain: ${domain}`);
}
const dmn = domain as ValidDomains;
const resp = await sendEmailTemplate(dmn as any, tplName, data as unknown, options);
return resp;
}
}
export default new EmailService;
import emailService from './EmailService';
import { connectNats } from '@/lib/nats';
import { natsEnv } from '@/hostInfo';
import { NatsService } from '@/lib/nats/NatsService';
import { LocalService } from '@/lib/nats/LocalService';
const serviceSubject = `${natsEnv}.service.email`;
let emailServiceDfd: Promise<typeof emailService>;
export async function getConsumer(type: 'nats' | 'local' = 'nats') {
if (emailServiceDfd) return emailServiceDfd;
const natsConnection = await connectNats();
if (type === 'nats') {
return NatsService.makeConsumer(natsConnection, serviceSubject, emailService);
} else {
return LocalService.makeConsumer(emailService);
}
}
export async function getNatsProvider() {
const natsConnection = await connectNats();
const emailServiceProvider = new NatsService(natsConnection, serviceSubject, emailService);
return emailServiceProvider;
}
import { connectNats } from '@/lib/nats';
import { Worker } from 'worker_threads';
import pdfService from './PDFService';
import { natsEnv } from '@/hostInfo';
import { NatsService } from '@/lib/nats/NatsService';
import { WorkerService } from '@/lib/nats/WorkerService';
import { LocalService } from '@/lib/nats/LocalService';
import path from 'path';
const serviceSubject = `${natsEnv}.service.etPdf`;
let pdfServiceDfd: Promise<typeof pdfService>;
function getServiceViaWorker() {
const PDFServiceViaWorker = require('./PDFServiceViaWorker').default as typeof import('./PDFServiceViaWorker').default;
return new PDFServiceViaWorker();
}
export async function getConsumer(type: 'nats' | 'local' = 'nats') {
if (pdfServiceDfd) return pdfServiceDfd;
pdfServiceDfd = (async () => {
const natsConnection = await connectNats();
if (type === 'nats') {
return NatsService.makeConsumer(natsConnection, serviceSubject, pdfService);
} else {
let service = process.env.DISABLE_WORKER ? pdfService : getServiceViaWorker();
return LocalService.makeConsumer(service);
}
})();
return await pdfServiceDfd;
}
export async function getNatsProvider() {
const natsConnection = await connectNats();
let service = process.env.DISABLE_WORKER ? pdfService : getServiceViaWorker();
const pdfServiceProvider = new NatsService(natsConnection, serviceSubject, service);
return pdfServiceProvider;
}
export function getWorkerService() {
const worker = new Worker(path.join(__dirname, 'PDFService.js'));
const service = WorkerService.makeConsumer(worker, pdfService);
return service;
}
import '../../initPathAlias';
import { ApplicantSessionSummary, SessionManifestData } from '@/examtools/actions/applicant';
import { generateManifestPdf } from '@/lib/examForms/ManifestForm';
import { ExamData, Form605Data } from '@/lib/examForms/SharedTypes';
import { remoteMethod, remoteStream } from '@/lib/nats/types';
import { createDocument } from '@/lib/pdf/pdflib';
import { VECList } from '@shared/licenseInfo';
import * as form605 from '@/lib/examForms/form605generator';
import * as csce from '@/lib/examForms/formCSCEgenerator';
import * as examResults from "@/lib/results/generatePdfLib";
import stream from 'stream';
import { WorkerService } from '@/lib/nats/WorkerService';
export interface PDFRenderManifestDoc {
type: 'manifest';
data: SessionManifestData;
}
export interface PDFRenderCSCEDoc {
type: 'csce';
vec: VECList;
applicant: ApplicantSessionSummary;
session: ExamData;
isDraft: boolean;
}
export interface PDFRender605Doc {
type: '605';
vec: VECList;
data: Form605Data;
}
export interface PDFRenderExamResultsDoc {
type: 'examResults';
results: examResults.ResultDocument[];
passCopies: number;
failCopies: number;
}
export type PDFRenderDoc = PDFRenderManifestDoc | PDFRenderCSCEDoc | PDFRender605Doc | PDFRenderExamResultsDoc;
// NATS-capable email service
class PDFService {
@remoteStream({})
makePDF(pages: PDFRenderDoc[]): stream.Readable {
const outStream = new stream.Readable({
read() {}, // There is no way to pause the stream, so we just ignore this
// readableObjectMode: true,
// writableObjectMode: true,
// objectMode: true,
});
(async () => {
const pdfDoc = await createDocument();
for (const page of pages) {
switch (page.type) {
case 'manifest':
await generateManifestPdf(page.data, pdfDoc);
break;
case 'csce':
await csce.renderToPdf(page.vec, page.applicant, page.session, page.isDraft, pdfDoc);
break;
case '605':
await form605.renderToPdf([page.data], page.vec, pdfDoc);
break;
case 'examResults':
await examResults.generateResultsPdf(pdfDoc, page.results, page.passCopies, page.failCopies);
break;
default:
throw new Error(`Unknown PDFRenderDoc type ${(<any>page).type}`);
}
}
const savedBytes = await pdfDoc.save();
outStream.push(savedBytes);
outStream.push(null);
})().catch((err) => {
outStream.emit('error', err);
});
return outStream;
}
}
const service = new PDFService();
WorkerService.createOnWorkerThread(service);
export default service;
import { remoteStream } from '@/lib/nats/types';
import stream from 'stream';
import * as threads from 'worker_threads';
import { PDFRenderDoc } from './PDFService';
import PDFService from './PDFService';
import { getWorkerService } from './index';
// NATS-capable service that uses a worker thread to do the work
class PDFServiceViaWorker {
service: typeof PDFService;
worker: threads.Worker;
constructor() {
this.service = getWorkerService();
}
@remoteStream({})
makePDF(pages: PDFRenderDoc[]): stream.Readable {
return this.service.makePDF(pages);
}
}
export default PDFServiceViaWorker;
// This is the service runner -- call it like `node dist/service email`, etc
// to run a NATS microservice provider for emailService
import './initPathAlias';
import './log_tweaks';
import fs from 'fs';
import path from 'path';
// Make sure we have source maps for useful stack traces even if compiled in node
import * as smp from 'source-map-support';
smp.install();
import * as K8sLifecycle from 'k8s-lifecycle';
import { connectNats } from './lib/nats';
import { NatsService } from './lib/nats/NatsService';
const serviceName = process.argv.slice(-1)[0]?.toLowerCase();
async function fileExists(filePath: string) {
try {
const stat = await fs.promises.stat(filePath);
return !!(stat?.isFile());
} catch {
return false;
}
}
const serviceFile = /Service\.(ts|js)$/;
// Find all directories in ./services/ and make a map from service name to directory
async function findServices(serviceDir: string) {
const filesInDir = await fs.promises.readdir(serviceDir);
const services: Record<string, string> = {};
for (const file of filesInDir) {
const filePath = path.join(serviceDir, file);
if (serviceFile.test(file)) {
services[file.replace(serviceFile, '')] = filePath;
} else {
const [
tsExists, jsExists
] = await Promise.all([
fileExists(path.join(filePath, 'index.ts')),
fileExists(path.join(filePath, 'index.js')),
]);
if (tsExists || jsExists) {
services[file.replace(/Service$/, '').toLowerCase()] = filePath;
}
}
}
return services;
}
interface ServiceModule {
getConsumer(type?: 'nats' | 'local'): Promise<{}>;
getNatsProvider(): Promise<NatsService<any>>;
}
async function start() {
const allServices = await findServices(path.join(__dirname, 'services'));
if (serviceName in allServices) {
console.log("Connecting to NATS server:");
const natsConn = await connectNats();
const serviceModule = require(allServices[serviceName]) as ServiceModule;
if (!('getNatsProvider' in serviceModule)) {
console.error(`Service ${serviceName} does not export a getNatsProvider function`);
process.exit(1);
}
const serviceServer = await serviceModule.getNatsProvider();
serviceServer.start();
K8sLifecycle.onReadyCheck(async () => {
return serviceServer.isReady();
});
K8sLifecycle.onShutdown(async () => {
await serviceServer.stop();
await natsConn.drain();
});
} else {
console.error(`Service ${serviceName} not found`);
console.error(`Available services: ${Object.keys(allServices).join(', ')}`);
}
}
if (require.main === module) {
process.on('unhandledRejection', (reason, p) => {
console.log('Unhandled Rejection at: Promise', p, 'reason:', reason);
// application specific logging, throwing an error, or other logic here
// statsClient?.increment(StatNames.error.unhandledRejection);
});
process.on('uncaughtException', function (err) {
if ((<any>err)?.code === 'EADDRINUSE') {
K8sLifecycle.setUnrecoverableError(err);
}
console.log('Caught unhandled exception: ', err);
console.log("Stack: ", err.stack);
// statsClient?.increment(StatNames.error.unhandledRejection, {errorType: err?.name ?? 'unknown'});
// const hub = Sentry.getCurrentHub();
// hub.withScope(async (scope) => {
// scope.setLevel(Sentry.Severity.Critical);
// hub.captureException(err, { originalException: err });
// });
});
start();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment