Last active
September 1, 2023 07:02
-
-
Save ymakino/c90dffe26304b4c22dda40113ab09c3e to your computer and use it in GitHub Desktop.
A sample implementation of a task scheduler with channel events in JS.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const MAX_PRIORITY = 10; | |
const MAX_CHANNELS = 1000; | |
const MAX_EVENTS = 1000; | |
let task_queues = []; | |
let event_queue = []; | |
let event_waiting_tasks = []; | |
let channels = []; | |
let step_delay=50; | |
function debug_println() { | |
console.log(...arguments); | |
} | |
function trace_println() { | |
// console.log(...arguments); | |
} | |
for (let i=0; i<=MAX_PRIORITY; i++) { | |
task_queues.push([]); | |
} | |
function schedule_task(task) { | |
if (task.priority < 0 || MAX_PRIORITY <= task.priority) { | |
throw new Error(`invalid priority: ${task.name}:${task.priority}`); | |
} | |
trace_println(`RUN: ${task.name}`); | |
task_queues[task.priority].unshift(task); | |
} | |
function done_task(task) { | |
trace_println(`REMOVE: ${task.name}`); | |
} | |
function new_task(priority, call, name) { | |
trace_println(`NEW: ${name}`); | |
return {priority: priority, call: call, name: name}; | |
} | |
for (let i=0; i<MAX_EVENTS; i++) { | |
event_waiting_tasks.push([]); | |
} | |
function event_wait_task(ev, task) { | |
if (ev < 0 || MAX_EVENTS <= ev) { | |
throw new Error(`invalid event: ${ev} (${task.name})`); | |
} | |
trace_println(`WAIT: ${ev} ${task.name}`); | |
event_waiting_tasks[ev].unshift(task); | |
} | |
for (let i=0; i<MAX_CHANNELS; i++) { | |
channels.push([]); | |
} | |
function channel_is_empty(index) { | |
return channels[index].length == 0; | |
} | |
function channel_dequeue(index) { | |
let len = channels[index].length; | |
trace_println(`DEQUEUE: ${index} ${channels[index][len - 1]}`); | |
return channels[index].pop(); | |
} | |
function channel_enqueue(index, value) { | |
trace_println(`ENQUEUE: ${index} ${value}`); | |
return channels[index].unshift(value); | |
} | |
function event_generate(ev) { | |
if (!event_queue.includes(ev)) { | |
trace_println(`EVENT: ${ev}`); | |
event_queue.unshift(ev); | |
} | |
} | |
function channel_generate_events() { | |
for (let i in channels) { | |
if (channels[i].length > 0) { | |
event_generate(i); | |
} | |
} | |
} | |
const readline = require('readline'); | |
const rl = readline.createInterface({ | |
input: process.stdin, | |
output: process.stdout, | |
}); | |
rl.on('line', (line) => { channel_enqueue(0, line); }); | |
rl.on('close', () => { process.exit(0); }); | |
function execute_task() { | |
for (let q of task_queues) { | |
if (q.length > 0) { | |
let task = q.pop(); | |
trace_println(`ACTIVE BEGIN: ${task.name}`); | |
let ret = task.call(task); | |
trace_println(`ACTIVE END: ${task.name} -> ${ret}`); | |
if (ret == undefined) { | |
schedule_task(task); | |
} else if (ret >= 0) { | |
event_wait_task(ret, task); | |
} else { | |
done_task(task); | |
} | |
break; | |
} | |
} | |
} | |
function process_events() { | |
while (event_queue.length > 0) { | |
let ev = event_queue.pop(); | |
if (event_waiting_tasks[ev].length > 0) { | |
while (event_waiting_tasks[ev].length > 0) { | |
let task = event_waiting_tasks[ev].pop(); | |
schedule_task(task); | |
} | |
} | |
} | |
} | |
function schedule_run() { | |
execute_task(); | |
channel_generate_events(); | |
process_events(); | |
setTimeout(schedule_run, step_delay); | |
} | |
function parse_command(task) { | |
if (channel_is_empty(0)) { | |
return 0; | |
} | |
let line = channel_dequeue(0); | |
if (line.trim() == '') { | |
return 0; | |
} | |
let words = line.split(' '); | |
for (let i in words) { words[i] = words[i].trim(); } | |
switch (words[0]) { | |
case 'show': channel_enqueue(3, words); break; | |
case 'event': channel_enqueue(4, words); break; | |
default: channel_enqueue(1, 'Commands: show | event'); | |
} | |
return 0; | |
} | |
function command_show(task) { | |
if (channel_is_empty(3)) { | |
return 3; | |
} | |
let words = channel_dequeue(3); | |
switch (words[1]) { | |
case 'running': channel_enqueue(1, task_queues); break; | |
case 'events': channel_enqueue(1, event_queue); break; | |
case 'waiting': channel_enqueue(1, event_waiting_tasks); break; | |
case 'channels': channel_enqueue(1, channels); break; | |
default: channel_enqueue(1, 'Commands: show [running | events | waiting | channels]'); | |
} | |
return 3; | |
} | |
function command_event(task) { | |
if (channel_is_empty(4)) { | |
return 4; | |
} | |
let words = channel_dequeue(4); | |
let ev = Number.parseInt(words[1]); | |
if (!Number.isInteger(ev)) { | |
channel_enqueue(1, 'Commands: event EVENT'); | |
return 4; | |
} | |
event_generate(ev); | |
return 4; | |
} | |
function uppercase(task) { | |
if (channel_is_empty(0)) { | |
return 0; | |
} | |
let line = channel_dequeue(0); | |
channel_enqueue(1, line.toUpperCase()); | |
return; | |
} | |
function lowercase(task) { | |
if (channel_is_empty(0)) { | |
return 0; | |
} | |
let line = channel_dequeue(0); | |
channel_enqueue(1, line.toLowerCase()); | |
return; | |
} | |
function reverse(task) { | |
if (channel_is_empty(1)) { | |
return 1; | |
} | |
let line = channel_dequeue(1); | |
channel_enqueue(2, line.split('').reverse().join('')); | |
return; | |
} | |
function println(task) { | |
while (!channel_is_empty(1)) { | |
let value = channel_dequeue(1); | |
console.log(value); | |
} | |
return 1; | |
} | |
let cur_ch = 10; | |
function new_channel() { | |
if (cur_ch >= MAX_CHANNELS) { | |
throw new Error("too many channels"); | |
} | |
return cur_ch++; | |
} | |
let ch_sort_in = new_channel(); | |
let ch_sort_out = new_channel(); | |
function pipe(ch_in, ch_out) { | |
function pipe_task(task) { | |
if (channel_is_empty(ch_in)) { | |
return ch_in; | |
} | |
let list = channel_dequeue(ch_in); | |
debug_println(ch_in, ch_out, `PIPE ${task.name}: ${ch_in} -> [${list}] => [${list}] -> ${ch_out}`); | |
channel_enqueue(ch_out, list); | |
return -1; | |
} | |
schedule_task(new_task(1, pipe_task, `pipe_${ch_in}_${ch_out}`)); | |
} | |
function qsort(ch_in, ch_out) { | |
let in1 = new_channel(); | |
let out1 = new_channel(); | |
let in2 = new_channel(); | |
let out2 = new_channel(); | |
let left = []; | |
let right = []; | |
function split(task) { | |
if (channel_is_empty(ch_in)) { | |
return ch_in; | |
} | |
let list = channel_dequeue(ch_in); | |
if (list.length < 2) { | |
left = list; | |
right = []; | |
pipe(in1, out1); | |
pipe(in2, out2); | |
} else { | |
let pivot = list[0]; | |
let center = list.filter(v => v == pivot); | |
left = list.filter(v => v < pivot); | |
right = list.filter(v => pivot < v); | |
if (left.length == 0) { | |
left = center; | |
} else if (right.length == 0) { | |
right = center; | |
} else { | |
left = left.concat(center); | |
} | |
if (left.length == 0 || right.length == 0) { | |
pipe(in1, out1); | |
pipe(in2, out2); | |
} else { | |
qsort(in1, out1); | |
qsort(in2, out2); | |
} | |
} | |
debug_println(ch_in, ch_out, `SPLIT ${task.name}: ${ch_in} -> [${list}] => ([${left}] -> ${in1}) -> ${out1}, ([${right}] -> ${in2}) -> ${out2}`); | |
channel_enqueue(in1, left); | |
channel_enqueue(in2, right); | |
return -1; | |
} | |
function join(task) { | |
if (channel_is_empty(out1)) { | |
return out1; | |
} | |
if (channel_is_empty(out2)) { | |
return out2; | |
} | |
let left = channel_dequeue(out1) | |
let right = channel_dequeue(out2); | |
let list = left.concat(right); | |
debug_println(ch_in, ch_out, `JOIN ${task.name}: ${out1} -> [${left}], ${out2} -> [${right}] => [${list}] -> ${ch_out}`); | |
channel_enqueue(ch_out, list); | |
return -1; | |
} | |
schedule_task(new_task(1, split, `split_${ch_in}_${ch_out}`)); | |
schedule_task(new_task(1, join, `join_${ch_in}_${ch_out}`)); | |
} | |
let input_list = [860, 156, 458, 895, 470, 787, 499, 261, 603, 693, 740, 603, 181, 236, 280, 317, | |
231, 289, 738, 853, 718, 256, 736, 428, 23, 698, 330, 312, 14, 147, 680, 212, | |
32, 224, 927, 212, 223, 582, 682, 500, 956, 878, 371, 420, 411, 678, 651, 618, | |
504, 27, 159, 488, 781, 317, 303, 315, 142, 552, 478, 754, 306, 205, 607, 899, | |
894, 838, 858, 770, 164, 614, 801, 810, 109, 767, 650, 619, 474, 653, 798, 233, | |
927, 422, 51, 689, 575, 493, 250, 358, 297, 425, 214, 5, 580, 932, 845, 312, | |
456, 218, 43, 251, 641, 655, 112, 680, 673, 721, 160, 146, 453, 475, 755, 729, | |
49, 45, 671, 990, 858, 120, 966, 906, 754, 744, 686, 539, 86, 866, 912, 938]; | |
// let input_list = [6, 7, 3, 2, 5, 8, 0, 9, 1, 4]; | |
function sort_run(task) { | |
qsort(ch_sort_in, ch_sort_out); | |
channel_enqueue(ch_sort_in, input_list); | |
return -1; | |
} | |
function sort_show(task) { | |
if (channel_is_empty(ch_sort_out)) { | |
return ch_sort_out; | |
} | |
let list = channel_dequeue(ch_sort_out); | |
channel_enqueue(1, list); | |
return ch_sort_out; | |
} | |
schedule_task(new_task(0, parse_command, 'parse_command')); | |
schedule_task(new_task(0, command_show, 'command_show')); | |
schedule_task(new_task(0, command_event, 'command_event')); | |
schedule_task(new_task(1, println, 'println')); | |
/* | |
schedule_task(new_task(1, uppercase, 'uppercase')); | |
schedule_task(new_task(1, lowercase, 'lowercase')); | |
schedule_task(new_task(1, reverse, 'reverse')); | |
*/ | |
schedule_task(new_task(0, sort_run, 'sort_run')); | |
schedule_task(new_task(0, sort_show, 'sort_show')); | |
schedule_run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment