Skip to content

Instantly share code, notes, and snippets.

@ymakino
Last active September 1, 2023 07:02
Show Gist options
  • Save ymakino/c90dffe26304b4c22dda40113ab09c3e to your computer and use it in GitHub Desktop.
Save ymakino/c90dffe26304b4c22dda40113ab09c3e to your computer and use it in GitHub Desktop.
A sample implementation of a task scheduler with channel events in JS.
const MAX_PRIORITY = 10;
const MAX_CHANNELS = 1000;
const MAX_EVENTS = 1000;
let task_queues = [];
let event_queue = [];
let event_waiting_tasks = [];
let channels = [];
let step_delay=50;
function debug_println() {
console.log(...arguments);
}
function trace_println() {
// console.log(...arguments);
}
for (let i=0; i<=MAX_PRIORITY; i++) {
task_queues.push([]);
}
function schedule_task(task) {
if (task.priority < 0 || MAX_PRIORITY <= task.priority) {
throw new Error(`invalid priority: ${task.name}:${task.priority}`);
}
trace_println(`RUN: ${task.name}`);
task_queues[task.priority].unshift(task);
}
function done_task(task) {
trace_println(`REMOVE: ${task.name}`);
}
function new_task(priority, call, name) {
trace_println(`NEW: ${name}`);
return {priority: priority, call: call, name: name};
}
for (let i=0; i<MAX_EVENTS; i++) {
event_waiting_tasks.push([]);
}
function event_wait_task(ev, task) {
if (ev < 0 || MAX_EVENTS <= ev) {
throw new Error(`invalid event: ${ev} (${task.name})`);
}
trace_println(`WAIT: ${ev} ${task.name}`);
event_waiting_tasks[ev].unshift(task);
}
for (let i=0; i<MAX_CHANNELS; i++) {
channels.push([]);
}
function channel_is_empty(index) {
return channels[index].length == 0;
}
function channel_dequeue(index) {
let len = channels[index].length;
trace_println(`DEQUEUE: ${index} ${channels[index][len - 1]}`);
return channels[index].pop();
}
function channel_enqueue(index, value) {
trace_println(`ENQUEUE: ${index} ${value}`);
return channels[index].unshift(value);
}
function event_generate(ev) {
if (!event_queue.includes(ev)) {
trace_println(`EVENT: ${ev}`);
event_queue.unshift(ev);
}
}
function channel_generate_events() {
for (let i in channels) {
if (channels[i].length > 0) {
event_generate(i);
}
}
}
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.on('line', (line) => { channel_enqueue(0, line); });
rl.on('close', () => { process.exit(0); });
function execute_task() {
for (let q of task_queues) {
if (q.length > 0) {
let task = q.pop();
trace_println(`ACTIVE BEGIN: ${task.name}`);
let ret = task.call(task);
trace_println(`ACTIVE END: ${task.name} -> ${ret}`);
if (ret == undefined) {
schedule_task(task);
} else if (ret >= 0) {
event_wait_task(ret, task);
} else {
done_task(task);
}
break;
}
}
}
function process_events() {
while (event_queue.length > 0) {
let ev = event_queue.pop();
if (event_waiting_tasks[ev].length > 0) {
while (event_waiting_tasks[ev].length > 0) {
let task = event_waiting_tasks[ev].pop();
schedule_task(task);
}
}
}
}
function schedule_run() {
execute_task();
channel_generate_events();
process_events();
setTimeout(schedule_run, step_delay);
}
function parse_command(task) {
if (channel_is_empty(0)) {
return 0;
}
let line = channel_dequeue(0);
if (line.trim() == '') {
return 0;
}
let words = line.split(' ');
for (let i in words) { words[i] = words[i].trim(); }
switch (words[0]) {
case 'show': channel_enqueue(3, words); break;
case 'event': channel_enqueue(4, words); break;
default: channel_enqueue(1, 'Commands: show | event');
}
return 0;
}
function command_show(task) {
if (channel_is_empty(3)) {
return 3;
}
let words = channel_dequeue(3);
switch (words[1]) {
case 'running': channel_enqueue(1, task_queues); break;
case 'events': channel_enqueue(1, event_queue); break;
case 'waiting': channel_enqueue(1, event_waiting_tasks); break;
case 'channels': channel_enqueue(1, channels); break;
default: channel_enqueue(1, 'Commands: show [running | events | waiting | channels]');
}
return 3;
}
function command_event(task) {
if (channel_is_empty(4)) {
return 4;
}
let words = channel_dequeue(4);
let ev = Number.parseInt(words[1]);
if (!Number.isInteger(ev)) {
channel_enqueue(1, 'Commands: event EVENT');
return 4;
}
event_generate(ev);
return 4;
}
function uppercase(task) {
if (channel_is_empty(0)) {
return 0;
}
let line = channel_dequeue(0);
channel_enqueue(1, line.toUpperCase());
return;
}
function lowercase(task) {
if (channel_is_empty(0)) {
return 0;
}
let line = channel_dequeue(0);
channel_enqueue(1, line.toLowerCase());
return;
}
function reverse(task) {
if (channel_is_empty(1)) {
return 1;
}
let line = channel_dequeue(1);
channel_enqueue(2, line.split('').reverse().join(''));
return;
}
function println(task) {
while (!channel_is_empty(1)) {
let value = channel_dequeue(1);
console.log(value);
}
return 1;
}
let cur_ch = 10;
function new_channel() {
if (cur_ch >= MAX_CHANNELS) {
throw new Error("too many channels");
}
return cur_ch++;
}
let ch_sort_in = new_channel();
let ch_sort_out = new_channel();
function pipe(ch_in, ch_out) {
function pipe_task(task) {
if (channel_is_empty(ch_in)) {
return ch_in;
}
let list = channel_dequeue(ch_in);
debug_println(ch_in, ch_out, `PIPE ${task.name}: ${ch_in} -> [${list}] => [${list}] -> ${ch_out}`);
channel_enqueue(ch_out, list);
return -1;
}
schedule_task(new_task(1, pipe_task, `pipe_${ch_in}_${ch_out}`));
}
function qsort(ch_in, ch_out) {
let in1 = new_channel();
let out1 = new_channel();
let in2 = new_channel();
let out2 = new_channel();
let left = [];
let right = [];
function split(task) {
if (channel_is_empty(ch_in)) {
return ch_in;
}
let list = channel_dequeue(ch_in);
if (list.length < 2) {
left = list;
right = [];
pipe(in1, out1);
pipe(in2, out2);
} else {
let pivot = list[0];
let center = list.filter(v => v == pivot);
left = list.filter(v => v < pivot);
right = list.filter(v => pivot < v);
if (left.length == 0) {
left = center;
} else if (right.length == 0) {
right = center;
} else {
left = left.concat(center);
}
if (left.length == 0 || right.length == 0) {
pipe(in1, out1);
pipe(in2, out2);
} else {
qsort(in1, out1);
qsort(in2, out2);
}
}
debug_println(ch_in, ch_out, `SPLIT ${task.name}: ${ch_in} -> [${list}] => ([${left}] -> ${in1}) -> ${out1}, ([${right}] -> ${in2}) -> ${out2}`);
channel_enqueue(in1, left);
channel_enqueue(in2, right);
return -1;
}
function join(task) {
if (channel_is_empty(out1)) {
return out1;
}
if (channel_is_empty(out2)) {
return out2;
}
let left = channel_dequeue(out1)
let right = channel_dequeue(out2);
let list = left.concat(right);
debug_println(ch_in, ch_out, `JOIN ${task.name}: ${out1} -> [${left}], ${out2} -> [${right}] => [${list}] -> ${ch_out}`);
channel_enqueue(ch_out, list);
return -1;
}
schedule_task(new_task(1, split, `split_${ch_in}_${ch_out}`));
schedule_task(new_task(1, join, `join_${ch_in}_${ch_out}`));
}
let input_list = [860, 156, 458, 895, 470, 787, 499, 261, 603, 693, 740, 603, 181, 236, 280, 317,
231, 289, 738, 853, 718, 256, 736, 428, 23, 698, 330, 312, 14, 147, 680, 212,
32, 224, 927, 212, 223, 582, 682, 500, 956, 878, 371, 420, 411, 678, 651, 618,
504, 27, 159, 488, 781, 317, 303, 315, 142, 552, 478, 754, 306, 205, 607, 899,
894, 838, 858, 770, 164, 614, 801, 810, 109, 767, 650, 619, 474, 653, 798, 233,
927, 422, 51, 689, 575, 493, 250, 358, 297, 425, 214, 5, 580, 932, 845, 312,
456, 218, 43, 251, 641, 655, 112, 680, 673, 721, 160, 146, 453, 475, 755, 729,
49, 45, 671, 990, 858, 120, 966, 906, 754, 744, 686, 539, 86, 866, 912, 938];
// let input_list = [6, 7, 3, 2, 5, 8, 0, 9, 1, 4];
function sort_run(task) {
qsort(ch_sort_in, ch_sort_out);
channel_enqueue(ch_sort_in, input_list);
return -1;
}
function sort_show(task) {
if (channel_is_empty(ch_sort_out)) {
return ch_sort_out;
}
let list = channel_dequeue(ch_sort_out);
channel_enqueue(1, list);
return ch_sort_out;
}
schedule_task(new_task(0, parse_command, 'parse_command'));
schedule_task(new_task(0, command_show, 'command_show'));
schedule_task(new_task(0, command_event, 'command_event'));
schedule_task(new_task(1, println, 'println'));
/*
schedule_task(new_task(1, uppercase, 'uppercase'));
schedule_task(new_task(1, lowercase, 'lowercase'));
schedule_task(new_task(1, reverse, 'reverse'));
*/
schedule_task(new_task(0, sort_run, 'sort_run'));
schedule_task(new_task(0, sort_show, 'sort_show'));
schedule_run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment