Coroutine pipes in OCaml. Slides: <http://odis.io/talks/ocaml-coro.pdf>
type void | |
(* Core pipe type *) | |
type ('a, 'b, 'r) pipe = | |
| Yield of ('b * (unit -> ('a, 'b, 'r) pipe)) | |
| Await of ('a -> ('a, 'b, 'r) pipe) | |
| Ready of 'r | |
(* Derived pipe types *) | |
type ('b, 'r) producer = (void, 'b, 'r) pipe | |
type ('a, 'r) consumer = ( 'a, void, 'r) pipe | |
type ( 'r) pipeline = (void, void, 'r) pipe | |
(* Monad instance *) | |
let return r = Ready r | |
let rec (>>=) p f = | |
match p with | |
| Yield (b, p') -> Yield (b, fun () -> p' () >>= f) | |
| Await k -> Await (fun a -> k a >>= f) | |
| Ready r -> f r | |
let (>>) p1 p2' = | |
p1 >>= fun _ -> p2' () | |
let rec forever m = | |
m >> fun () -> forever m | |
let rec replicate n m = | |
if n = 0 then return () | |
else m >> fun () -> replicate (n - 1) m | |
(* Pipes creation *) | |
let empty = Ready () | |
let yield b = Yield (b, fun () -> empty) | |
let await = Await (fun b -> Ready b) | |
(* Category instance *) | |
let id = Await (fun a -> yield a) | |
let rec compose d u = | |
match d, u with | |
| Ready r , _ -> Ready r | |
| Yield (b, d') , _ -> Yield (b, fun () -> compose (d' ()) u) | |
| Await k , Yield (b, u') -> compose (k b) (u' ()) | |
| Await _ , Await k -> Await (fun a -> compose d (k a)) | |
| _ , Ready r -> Ready r | |
let (<-<) d u = compose d u | |
let (>->) u d = compose d u | |
(* Consumers *) | |
let next pipe = | |
match pipe with | |
| Yield (a, p') -> Some (a, p' ()) | |
| _ -> None | |
let collect pipe = | |
let rec loop pipe' acc = | |
match next pipe' with | |
| Some (a, rest) -> loop rest (a::acc) | |
| None -> List.rev acc in | |
loop pipe [] | |
let rec fold f acc source = | |
match next source with | |
| Some (a, rest) -> fold f (f acc a) rest | |
| None -> acc | |
let nth n source = | |
let rec loop n source = | |
match next source with | |
| Some (a, rest) -> | |
if n = 0 then Some a | |
else loop (n - 1) rest | |
| None -> None | |
in loop n source | |
let sum source = fold (+) 0 source | |
let length source = fold (fun c _ -> c + 1) 0 source | |
(* Transformers *) | |
let rec map f = | |
await >>= fun a -> yield (f a) >> fun () -> map f | |
let rec filter p = | |
await >>= fun a -> | |
if p a then yield a >> fun () -> filter p | |
else filter p | |
let rec take n = | |
if n = 0 then empty | |
else await >>= fun a -> yield a >> fun () -> take (n - 1) | |
let rec take_while pred = | |
await >>= fun a -> | |
if pred a then yield a >> fun () -> take_while pred | |
else empty | |
let rec drop n = | |
if n = 0 then id | |
else await >>= fun a -> drop (n - 1) | |
let rec drop_while pred = | |
await >>= fun a -> | |
if pred a then drop_while pred | |
else id | |
let slice i j = | |
drop i >-> take (j - i) | |
(* Producers *) | |
let count () = | |
let rec loop n = | |
yield n >> fun () -> loop (n + 1) in | |
loop 0 | |
let rec repeat ?n x = | |
match n with | |
| Some n -> replicate n (yield x) | |
| None -> forever (yield x) | |
let rec iota stop = | |
count () >-> take stop | |
let range start stop = | |
count () >-> take stop >-> drop start | |
let rec file path = | |
let chan = open_in path in | |
let rec loop () = | |
match input_line chan with | |
| line -> yield line >> fun () -> loop () | |
| exception End_of_file -> close_in chan; empty | |
in | |
loop () | |
(* Examples *) | |
let () = | |
let stream = count () | |
>-> take 10 | |
>-> filter (fun n -> n mod 2 = 0) | |
>-> map string_of_int in | |
assert (collect stream = ["0"; "2"; "4"; "6"; "8"]) |
(* Pipes usage example | |
* | |
* Problem: | |
* Compute the total bytes sent to a particular host for | |
* successful responses until a given date. | |
* | |
* Build: | |
* $ ocamlbuild successful_bytes_sent.native | |
* | |
* Source: http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html *) | |
(* String extension *) | |
module String = struct | |
include String | |
let split ?(on=' ') str = | |
let rec indices acc i = | |
try | |
let i = succ (String.index_from str i on) in | |
indices (i::acc) i | |
with Not_found -> | |
(String.length str + 1) :: acc | |
in | |
let is = indices [0] 0 in | |
let rec aux acc = function | |
| last::start::tl -> | |
let w = String.sub str start (last - start - 1) in | |
aux (w::acc) (start::tl) | |
| _ -> acc | |
in | |
aux [] is | |
let starts_with s2 s1 = | |
let len1 = String.length s1 | |
and len2 = String.length s2 in | |
if len1 < len2 then false else | |
let sub = String.sub s1 0 len2 in | |
(sub = s2) | |
end | |
(* Total bytes sent to a host until a given date *) | |
let successful_bytes_sent input ~host ~until:date = | |
let bytes = | |
file input | |
>-> map (String.split ~on:' ') | |
>-> filter (fun entries -> List.nth entries 0 = host) | |
>-> filter (fun entries -> List.nth entries 8 = "200") | |
>-> take_while (fun entries -> List.nth entries 3 |> String.starts_with ("[" ^ date)) | |
>-> map (fun entries -> List.nth entries 9) | |
>-> map int_of_string | |
in | |
fold (+) 0 bytes | |
open Printf | |
let () = | |
let host = "port26.annex2.nwlink.com" in | |
let date = "01/Jul" in | |
let res = successful_bytes_sent "NASA_access_log_Jul95" ~host ~until:date | |
in | |
printf "Total successful bytes sent to %s until %s: %d" host date res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment