Skip to content

Instantly share code, notes, and snippets.

@athaeryn
Last active June 28, 2021 15:43
Show Gist options
  • Save athaeryn/e85a4b2fdd35b9df934a1ca162c9323d to your computer and use it in GitHub Desktop.
Save athaeryn/e85a4b2fdd35b9df934a1ca162c9323d to your computer and use it in GitHub Desktop.
ReScript worker threads WIP design
let app = Express.App.make()
let taskPool = TaskPool.make()
// How can I just get the Pause.output back out of this, and avoid
// doing extra work to check if the output is from the wrong Task?
app.get(."/pause", (. _req, res) => {
taskPool.run(Pause({howLongMs: 4200}))
->Promise.thenResolve(result => {
switch result {
| Ok(Pause(output)) => res.send(. output->Tasks.Pause.output_encode->Js.Json.stringify)
| Ok(_) => Express.status(res, 500).send(. "Wrong output")
| Error(message) => Express.status(res, 500).send(. message)
}
})
->ignore
})
app.get(."/fibonacci", (. _req, res) => {
taskPool.run(Fibonacci({n: 45}))
->Promise.thenResolve(result => {
switch result {
| Ok(Fibonacci(output)) => res.send(. output->Tasks.Fibonacci.output_encode->Js.Json.stringify)
| Ok(_) => Express.status(res, 500).send(. "Wrong output")
| Error(message) => Express.status(res, 500).send(. message)
}
})
->ignore
})
let staticPath = Path.fromRoot(["static"])
app.use(. Express.static(staticPath))
let port = 3030
app.listen(.port, () => {
Js.log(j`Listening on port $port`)
Js.log(j`Serving static resources from $staticPath`)
Js.log(Os.cpuCount()->Int.toString ++ " CPUs")
Js.log(Os.cpus()->Array.map(cpu => cpu.model))
})
let workerPath = Path.fromRoot(["src", "TaskRunner.bs.js"])
type t = {run: Tasks.input => Promise.t<result<Tasks.output, string>>}
let make = () => {
let workerCount = Os.cpuCount()
Js.log(j`creating pool of $workerCount workers...`)
// TODO create multiple (workerCount) workers
let worker = WorkerThreads.make(workerPath)
{
run: input => {
Promise.make((resolve, _reject) => {
worker->WorkerThreads.once(
#message(
json => {
Js.log(json)
let output = switch Tasks.output_decode(json) {
| Ok(output) => Ok(output)
| Error({message}) => Error(message)
}
resolve(. output)
},
),
)
worker->WorkerThreads.postMessage(input->Tasks.input_encode)
})
},
}
}
// This is the file run as a worker
open WorkerThreads
Js.log(j`Worker $threadId started`)
Parent.port->Parent.on(
#message(
json => {
switch json->Tasks.input_decode {
| Ok(input) =>
switch input {
| Pause(input) =>
Tasks.Pause.run(input)
->Promise.thenResolve(output => {
Parent.port->Parent.postMessage(Pause(output)->Tasks.output_encode)
})
->ignore
| Fibonacci(input) =>
Tasks.Fibonacci.run(input)
->Promise.thenResolve(output => {
Parent.port->Parent.postMessage(Fibonacci(output)->Tasks.output_encode)
})
->ignore
}
| Error({message}) => Parent.port->Parent.postMessage(Js.Json.string(message))
}
},
),
)
module Pause = {
@decco
type input = {howLongMs: int}
@decco
type output = {pausedFor: int}
let run = input => {
Promise.make((resolve, _) => {
Js.Global.setTimeout(() => {
resolve(. {pausedFor: input.howLongMs})
}, input.howLongMs)->ignore
})
}
}
module Fibonacci = {
@decco
type input = {n: int}
@decco
type output = {result: int}
let rec getFib = n => {
switch n {
| 0 => 0
| 1 => 1
| n => getFib(n - 1) + getFib(n - 2)
}
}
let run = input => {
Promise.make((resolve, _reject) => {
resolve(. {result: getFib(input.n)})
})
}
}
// I'd like to get rid of these combined input/output types…
@decco
type input = Pause(Pause.input) | Fibonacci(Fibonacci.input)
@decco
type output = Pause(Pause.output) | Fibonacci(Fibonacci.output)
type t
@module("worker_threads") @new
external make: string => t = "Worker"
@module("worker_threads") @val
external threadId: int = "threadId"
@send
external on: (t, @string [#message(Js.Json.t => unit)]) => unit = "on"
@send
external once: (t, @string [#message(Js.Json.t => unit)]) => unit = "once"
@send
external postMessage: (t, Js.Json.t) => unit = "postMessage"
module Parent = {
type t
@module("worker_threads") @val
external port: t = "parentPort"
@send
external on: (t, @string [#message(Js.Json.t => unit)]) => unit = "on"
@send
external once: (t, @string [#message(Js.Json.t => unit)]) => unit = "once"
@send
external postMessage: (t, Js.Json.t) => unit = "postMessage"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment