Created
June 24, 2016 19:32
-
-
Save hcarty/5f613dcd87052fc429e47180fafcea2c to your computer and use it in GitHub Desktop.
Medium-weight threads?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* Based on module lwt_preemptive.ml from Lwt which is | |
* Copyright (C) 2005 Nataliya Guts, Vincent Balat, Jérôme Vouillon | |
* Laboratoire PPS - CNRS Université Paris Diderot | |
* 2009 Jérémie Dimino | |
* | |
* This program is free software; you can redistribute it and/or modify | |
* it under the terms of the GNU Lesser General Public License as published by | |
* the Free Software Foundation, with linking exceptions; | |
* either version 2.1 of the License, or (at your option) any later version. | |
* See COPYING file for details. | |
* | |
* This program is distributed in the hope that it will be useful, | |
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
* GNU Lesser General Public License for more details. | |
* | |
* You should have received a copy of the GNU Lesser General Public License | |
* along with this program; if not, write to the Free Software | |
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. | |
*) | |
open Lwt.Infix | |
type worker = { | |
task_channel: (int * (unit -> unit)) Event.channel; | |
(* Channel used to communicate notification id and tasks to the | |
worker thread. *) | |
mutable thread : Thread.t; (* The worker thread. *) | |
pool : t; (* The worker's parent thread pool *) | |
} | |
and t = { | |
num_threads : int; (* Number of preemptive threads in the pool *) | |
max_threads_queued : int; (* Maximum size of the waiting queue *) | |
workers : worker Queue.t; (* Queue of worker threads *) | |
waiters : worker Lwt.u Lwt_sequence.t; (* Queue of clients waiting for a | |
worker to be available *) | |
} | |
(* Code executed by a worker: *) | |
let rec worker_loop worker = | |
let id, task = Event.sync (Event.receive worker.task_channel) in | |
task (); | |
(* Tell the main thread that work is done: *) | |
Lwt_unix.send_notification id; | |
worker_loop worker | |
(* create a new worker: *) | |
let make_worker pool = | |
let worker = | |
{ | |
task_channel = Event.new_channel (); | |
thread = Thread.self (); | |
pool; | |
} | |
in | |
worker.thread <- Thread.create worker_loop worker; | |
worker | |
(* Add a worker to the pool: *) | |
let add_worker pool worker = | |
match Lwt_sequence.take_opt_l pool.waiters with | |
| None -> | |
Queue.add worker pool.workers | |
| Some w -> | |
Lwt.wakeup w worker | |
(* Wait for worker to be available, then return it: *) | |
let rec get_worker pool = | |
if not (Queue.is_empty pool.workers) then | |
Lwt.return (Queue.take pool.workers) | |
else | |
Lwt.add_task_r pool.waiters | |
let make ?(max_threads_queued = 1000) num_threads = | |
let pool = | |
{ num_threads; max_threads_queued; | |
workers = Queue.create (); waiters = Lwt_sequence.create () } | |
in | |
for _i = 1 to num_threads do | |
add_worker pool (make_worker pool) | |
done; | |
pool | |
let detach pool f args = | |
let result = ref (Lwt.make_error (Failure "Mwt.detach")) in | |
(* The task for the worker thread: *) | |
let task () = | |
try | |
result := Lwt.make_value (f args) | |
with exn -> | |
result := Lwt.make_error exn | |
in | |
get_worker pool >>= fun worker -> | |
let waiter, wakener = Lwt.wait () in | |
let id = | |
Lwt_unix.make_notification ~once:true | |
(fun () -> Lwt.wakeup_result wakener !result) | |
in | |
Lwt.finalize | |
(fun () -> | |
(* Send the id and the task to the worker: *) | |
Event.sync (Event.send worker.task_channel (id, task)); | |
waiter) | |
(fun () -> | |
add_worker pool worker; | |
Lwt.return_unit) | |
(* +-----------------------------------------------------------------+ | |
| Running Lwt threads in the main thread | | |
+-----------------------------------------------------------------+ *) | |
type 'a result = | |
| Value of 'a | |
| Error of exn | |
(* Queue of [unit -> unit Lwt.t] functions. *) | |
let jobs = Queue.create () | |
(* Mutex to protect access to [jobs]. *) | |
let jobs_mutex = Mutex.create () | |
let job_notification = | |
Lwt_unix.make_notification | |
(fun () -> | |
(* Take the first job. The queue is never empty at this | |
point. *) | |
Mutex.lock jobs_mutex; | |
let thunk = Queue.take jobs in | |
Mutex.unlock jobs_mutex; | |
ignore (thunk ())) | |
let run_in_main f = | |
let channel = Event.new_channel () in | |
(* Create the job. *) | |
let job () = | |
(* Execute [f] and wait for its result. *) | |
Lwt.try_bind f | |
(fun ret -> Lwt.return (Value ret)) | |
(fun exn -> Lwt.return (Error exn)) >>= fun result -> | |
(* Send the result. *) | |
Event.sync (Event.send channel result); | |
Lwt.return_unit | |
in | |
(* Add the job to the queue. *) | |
Mutex.lock jobs_mutex; | |
Queue.add job jobs; | |
Mutex.unlock jobs_mutex; | |
(* Notify the main thread. *) | |
Lwt_unix.send_notification job_notification; | |
(* Wait for the result. *) | |
match Event.sync (Event.receive channel) with | |
| Value ret -> ret | |
| Error exn -> raise exn |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment