Skip to content

Instantly share code, notes, and snippets.

@rneswold
rneswold / lwt_fstream.ml
Last active November 4, 2021 21:20
Functional streams for LWT
open Lwt.Infix
exception Source_terminated
type 'a node = N of ('a * 'a node Lwt.t)
type 'a t = 'a node Lwt.t
let push ref_waker v =
let new_node, new_waker = Lwt.wait () in
@rneswold
rneswold / bench.ml
Last active November 1, 2016 21:33
Benchmarking module for LWT streams
module TestStream = Nstream
let n_stream = 1_000 (* # of items to pre-load stream. *)
let n_threads = 1_000 (* # of threads reading the stream. *)
let s =
let s, p = TestStream.create () in
begin
for ii = 1 to n_stream do
p (Some ii)
@rneswold
rneswold / lwt_stream.ml
Last active November 4, 2021 21:20
Alternate `Lwt_stream` (structure)
open Lwt.Infix
exception End_of_stream
type 'a node = N of 'a * 'a node Lwt.t
type 'a t = 'a node Lwt.t ref
let push ref_waker v =
let new_node, new_waker = Lwt.wait () in
@rneswold
rneswold / lwt_stream.mli
Last active June 24, 2016 19:01
Alternate `Lwt_stream` (signature)
exception End_of_stream
type 'a t
val create : unit -> 'a t * ('a -> unit)
val clone : 'a t -> 'a t
val next : 'a t -> 'a Lwt.t
val iter : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t