Last active
November 4, 2021 21:20
-
-
Save rneswold/1f7feccf7383fe578c6d796b62ba654c to your computer and use it in GitHub Desktop.
Alternate `Lwt_stream` (structure)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Lwt.Infix | |
exception End_of_stream | |
type 'a node = N of 'a * 'a node Lwt.t | |
type 'a t = 'a node Lwt.t ref | |
let push ref_waker v = | |
let new_node, new_waker = Lwt.wait () in | |
begin | |
Lwt.wakeup !ref_waker @@ N (v, new_node); | |
ref_waker := new_waker | |
end | |
let final rw = | |
Lwt_gc.finalise (fun _ -> Lwt.wrap2 Lwt.wakeup_exn !rw End_of_stream) | |
let create () = | |
let node, waken = Lwt.wait () in | |
let rw = ref waken in | |
let p = push rw in | |
( final rw p; (ref node, p) ) | |
let clone t = ref !t | |
let next t = | |
let%lwt N (v, t') = !t in | |
( t := t'; Lwt.return v ) | |
let iter f t = | |
let rec loop () = | |
match%lwt next t with | |
| v -> f v >>= loop | |
| exception End_of_stream -> Lwt.return_unit in | |
loop () | |
let xform f t = | |
let t', p' = create () in | |
let rec loop () = | |
match%lwt next t with | |
| v -> Lwt.wrap2 f v p' >>= loop | |
| exception End_of_stream -> Lwt.return_unit | |
in | |
( Lwt.async loop; t' ) | |
let map f t = | |
xform (fun v p -> f v |> p) t | |
let filter f t = | |
xform (fun v p -> if f v then p v) t |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment