Skip to content

Instantly share code, notes, and snippets.

@hscstudio
Created July 14, 2020 11:18
Show Gist options
  • Save hscstudio/31731a63b5c9c88077c74ea8c179c82a to your computer and use it in GitHub Desktop.
Save hscstudio/31731a63b5c9c88077c74ea8c179c82a to your computer and use it in GitHub Desktop.
Simple Queue with ExpressJS
const express = require('express')
const bodyParser = require('body-parser')
const methodOverride = require('method-override')
const app = express()
app.use(bodyParser.urlencoded({ limit: '50mb', extended: true }))
app.use(bodyParser.json({limit: '50mb'}))
app.use(methodOverride())
app.use((err, req, res, next) => {
console.error(err.stack)
res.status(500).send('Something broke!')
})
global.queue = {}
const processTask = async (task) => {
task.status = 'process'
task.retry--
console.log(task)
/* start process task */
await new Promise(resolve => setTimeout(resolve, 1000))
const result = Math.random() < 0.7;
if (result) {
task.status = 'success'
} else {
task.status = 'failed'
}
/* end process task */
console.log(task)
processJob(task.jobId)
}
const processJob = async (jobId) => {
const currentJob = queue[jobId]
if (currentJob){
currentJob.index++
if (currentJob.tasks && currentJob.tasks.length>0){
const readyTasks = currentJob.tasks.filter((task) => {
return (task.status == 'new' || task.status == 'failed') && task.retry > 0
})
if (readyTasks && readyTasks.length>0){
console.log(readyTasks[0])
processTask(readyTasks[0])
} else {
currentJob.end = Date.now()
console.log('Job '+jobId+' done!')
}
}
}
}
const addExecJob = async (req, res) => {
const jobId = req.params.jobId
const retry = req.params.retry
const jobDatas = req.body.jobDatas
queue = {
[jobId]: null
}
let id = 0
let tasks = []
for (let jobData of jobDatas) {
id++;
tasks.push({
id,
jobId,
data: jobData,
retry,
status: 'new'
})
}
res.send({
status:'success',
message: id+' data sedang dalam proses...',
data: tasks
})
// jalankan dibelakang layar
queue[jobId] = {
tasks,
index: 0,
failed: 0,
taskCount: tasks.length,
start: Date.now(),
end: Date.now(),
}
processJob(jobId);
}
const executeJob = async (req, res) => {
const jobId = req.params.jobId
const retry = req.params.retry
const currentJob = queue[jobId]
if (currentJob && currentJob.tasks){
const readyTasks = currentJob.tasks.filter((task) => {
if ((task.status == 'new' || task.status == 'failed')){
task.retry = retry
return true
} else {
return false
}
})
if (readyTasks && readyTasks.length>0){
currentJob.index = 0
currentJob.failed = 0
currentJob.start = Date.now()
currentJob.end = Date.now()
res.send({
status:'success',
message: 'Job-'+jobId+' sedang diproses...',
})
processJob(jobId);
} else {
res.send({
status:'warning',
message: 'Job-'+jobId+' not ready',
})
}
} else {
res.send({
status:'warning',
message: 'Job-'+jobId+' not ready',
})
}
}
const viewJob = async (req, res) => {
const jobId = req.params.jobId
if (queue[jobId]){
const tasks = queue[jobId].tasks;
const newTasks = tasks.filter(task=>{
return task.status == 'new'
})
const processTasks = tasks.filter(task=>{
return task.status == 'process'
})
const successTasks = tasks.filter(task=>{
return task.status == 'success'
})
const failedTasks = tasks.filter(task=>{
return task.status == 'failed'
})
const duration = Math.floor((queue[jobId].end - queue[jobId].start)/1000);
res.json({
new: newTasks.length,
process: processTasks.length,
success: successTasks.length,
failed: failedTasks.length,
duration: duration+' seconds',
detailTasks: queue[jobId].tasks
});
} else {
res.json({
message: 'Please add & execute job!'
})
}
}
app.get('/', (req, res) => {
res.json({
message:'queue system',
});
})
app.post('/add-exec-job/:jobId/:retry', addExecJob)
app.post('/exec-job/:jobId/:retry', executeJob)
app.get('/check-job/:jobId', viewJob)
let port = 666
app.listen(port, () => console.log(`app listening on port ${port}!`))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment