Skip to content

Instantly share code, notes, and snippets.

@menduz
Last active February 7, 2022 18:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save menduz/e22808d90591585430c0bd113b86aeac to your computer and use it in GitHub Desktop.
Save menduz/e22808d90591585430c0bd113b86aeac to your computer and use it in GitHub Desktop.
import { ILoggerComponent, IMetricsComponent } from '@well-known-components/interfaces'
import { validateMetricsDeclaration } from '@well-known-components/metrics'
import PQueue from 'p-queue'
export type ISequecuentialJobExecutorComponent = {
/**
* Runs sequential jobs with a max concurrency of 1 per jobName.
*/
run<T>(jobName: string, fn: () => Promise<T>): Promise<T>
}
type SequentialJobComponents = {
metrics: IMetricsComponent<keyof typeof sequentialJobMetrics>
logs: ILoggerComponent
}
export function createSequecuentialJobExecutor(
components: SequentialJobComponents
): ISequecuentialJobExecutorComponent {
const { metrics, logs } = components
const logger = logs.getLogger('SequentialJobComponent')
const queues = new Map<string, PQueue>()
function getQueue(jobName: string) {
if (queues.has(jobName)) return queues.get(jobName)!
const queue = new PQueue({ autoStart: true, concurrency: 1, throwOnTimeout: false })
queues.set(jobName, queue)
return queue
}
function run<T>(jobName: string, fn: () => Promise<T>): Promise<T> {
const queue = getQueue(jobName)
metrics.increment('wkc_sequential_job_total', { job_name: jobName })
return queue.add<T>(async () => {
const timer = metrics.startTimer('wkc_sequential_job_duration_seconds', { job_name: jobName })
try {
const result = await fn()
metrics.increment('wkc_sequential_job_run_total', { job_name: jobName, error: 'false' })
return result
} catch (err) {
metrics.increment('wkc_sequential_job_run_total', { job_name: jobName, error: 'true' })
logger.error('Error running job.', { jobName, error: err.message || err.toString() })
throw err
} finally {
timer.end()
}
})
}
return {
run
}
}
export const sequentialJobMetrics = validateMetricsDeclaration({
wkc_sequential_job_total: {
help: 'Total number of sequential jobs scheduled',
type: IMetricsComponent.CounterType,
labelNames: ['job_name']
},
wkc_sequential_job_run_total: {
help: 'Total number of sequential jobs run',
type: IMetricsComponent.CounterType,
labelNames: ['job_name', 'error']
},
wkc_sequential_job_duration_seconds: {
help: 'Histogram of runtime per job_name',
type: IMetricsComponent.HistogramType,
labelNames: ['job_name']
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment