Skip to content

Instantly share code, notes, and snippets.

@rneswold
Last active November 4, 2021 21:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rneswold/0d80560a80314ce3f1aa57a64ee406dd to your computer and use it in GitHub Desktop.
Save rneswold/0d80560a80314ce3f1aa57a64ee406dd to your computer and use it in GitHub Desktop.
Functional streams for LWT
open Lwt.Infix
exception Source_terminated
type 'a node = N of ('a * 'a node Lwt.t)
type 'a t = 'a node Lwt.t
let push ref_waker v =
let new_node, new_waker = Lwt.wait () in
begin
Lwt.wakeup_later !ref_waker @@ N (v, new_node);
ref_waker := new_waker
end
let final rw =
Lwt_gc.finalise Lwt.(fun _ -> wrap2 wakeup_exn !rw Source_terminated)
let create_push () =
let node, awakener = Lwt.wait () in
let rw = ref awakener in
let p = push rw in
( final rw p; (node, p) )
let create_pull f i =
let rec loop v =
let%lwt (v, n) = f v in
Lwt.return @@ N (v, loop n)
in
loop i
let of_list l =
let f = function
| [] ->
Lwt.fail Source_terminated
| h :: t ->
Lwt.return (h, t) in
create_pull f l
let clone t = t
let is_empty t = match Lwt.state t with
| Lwt.Return _ | Lwt.Sleep -> false
| Lwt.Fail _ -> true
let next t =
let%lwt N v = t in
Lwt.return v
let iter f t =
let rec loop t =
match%lwt next t with
| (v, t) ->
(f v; (loop[@tailcall]) t)
| exception _ ->
Lwt.return_unit in
loop t
let map f t =
let rec tMap tCurr =
let%lwt v, tNext = next tCurr in
Lwt.return @@ N (f v, tMap tNext) in
tMap t
let filter f t =
let rec tFilter tCurr =
let%lwt v, tNext = next tCurr in
if f v then
Lwt.return @@ N (v, tFilter tNext)
else
(tFilter[@tailcall]) tNext
in
tFilter t
let append a b =
let rec tAppend tA =
match%lwt next tA with
| (v, t) ->
Lwt.return @@ N (v, tAppend t)
| exception _ ->
b
in
tAppend a
let combine a b =
let rec tCombine tA tB =
let%lwt vA, tNextA = next tA
and vB, tNextB = next tB in
Lwt.return @@ N ((vA, vB), tCombine tNextA tNextB) in
tCombine a b
exception Source_terminated
type 'a t
val create_push : unit -> 'a t * ('a -> unit)
val create_pull : ('a -> ('b * 'a) Lwt.t) -> 'a -> 'b t
val of_list : 'a list -> 'a t
val clone : 'a t -> 'a t
val is_empty : 'a t -> bool
val next : 'a t -> ('a * 'a t) Lwt.t
val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val map : ('a -> 'b) -> 'a t -> 'b t
val filter : ('a -> bool) -> 'a t -> 'a t
val append : 'a t -> 'a t -> 'a t
val combine : 'a t -> 'b t -> ('a * 'b) t
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment