Last active
November 4, 2021 21:20
-
-
Save rneswold/0d80560a80314ce3f1aa57a64ee406dd to your computer and use it in GitHub Desktop.
Functional streams for LWT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Lwt.Infix | |
exception Source_terminated | |
type 'a node = N of ('a * 'a node Lwt.t) | |
type 'a t = 'a node Lwt.t | |
let push ref_waker v = | |
let new_node, new_waker = Lwt.wait () in | |
begin | |
Lwt.wakeup_later !ref_waker @@ N (v, new_node); | |
ref_waker := new_waker | |
end | |
let final rw = | |
Lwt_gc.finalise Lwt.(fun _ -> wrap2 wakeup_exn !rw Source_terminated) | |
let create_push () = | |
let node, awakener = Lwt.wait () in | |
let rw = ref awakener in | |
let p = push rw in | |
( final rw p; (node, p) ) | |
let create_pull f i = | |
let rec loop v = | |
let%lwt (v, n) = f v in | |
Lwt.return @@ N (v, loop n) | |
in | |
loop i | |
let of_list l = | |
let f = function | |
| [] -> | |
Lwt.fail Source_terminated | |
| h :: t -> | |
Lwt.return (h, t) in | |
create_pull f l | |
let clone t = t | |
let is_empty t = match Lwt.state t with | |
| Lwt.Return _ | Lwt.Sleep -> false | |
| Lwt.Fail _ -> true | |
let next t = | |
let%lwt N v = t in | |
Lwt.return v | |
let iter f t = | |
let rec loop t = | |
match%lwt next t with | |
| (v, t) -> | |
(f v; (loop[@tailcall]) t) | |
| exception _ -> | |
Lwt.return_unit in | |
loop t | |
let map f t = | |
let rec tMap tCurr = | |
let%lwt v, tNext = next tCurr in | |
Lwt.return @@ N (f v, tMap tNext) in | |
tMap t | |
let filter f t = | |
let rec tFilter tCurr = | |
let%lwt v, tNext = next tCurr in | |
if f v then | |
Lwt.return @@ N (v, tFilter tNext) | |
else | |
(tFilter[@tailcall]) tNext | |
in | |
tFilter t | |
let append a b = | |
let rec tAppend tA = | |
match%lwt next tA with | |
| (v, t) -> | |
Lwt.return @@ N (v, tAppend t) | |
| exception _ -> | |
b | |
in | |
tAppend a | |
let combine a b = | |
let rec tCombine tA tB = | |
let%lwt vA, tNextA = next tA | |
and vB, tNextB = next tB in | |
Lwt.return @@ N ((vA, vB), tCombine tNextA tNextB) in | |
tCombine a b |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exception Source_terminated | |
type 'a t | |
val create_push : unit -> 'a t * ('a -> unit) | |
val create_pull : ('a -> ('b * 'a) Lwt.t) -> 'a -> 'b t | |
val of_list : 'a list -> 'a t | |
val clone : 'a t -> 'a t | |
val is_empty : 'a t -> bool | |
val next : 'a t -> ('a * 'a t) Lwt.t | |
val iter : ('a -> unit) -> 'a t -> unit Lwt.t | |
val map : ('a -> 'b) -> 'a t -> 'b t | |
val filter : ('a -> bool) -> 'a t -> 'a t | |
val append : 'a t -> 'a t -> 'a t | |
val combine : 'a t -> 'b t -> ('a * 'b) t |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment