Skip to content

Instantly share code, notes, and snippets.

@rizo

rizo/pipes.ml

Last active Sep 26, 2018
Embed
What would you like to do?
Coroutine pipes in OCaml. Slides: <http://odis.io/talks/ocaml-coro.pdf>
type void
(* Core pipe type *)
type ('a, 'b, 'r) pipe =
| Yield of ('b * (unit -> ('a, 'b, 'r) pipe))
| Await of ('a -> ('a, 'b, 'r) pipe)
| Ready of 'r
(* Derived pipe types *)
type ('b, 'r) producer = (void, 'b, 'r) pipe
type ('a, 'r) consumer = ( 'a, void, 'r) pipe
type ( 'r) pipeline = (void, void, 'r) pipe
(* Monad instance *)
let return r = Ready r
let rec (>>=) p f =
match p with
| Yield (b, p') -> Yield (b, fun () -> p' () >>= f)
| Await k -> Await (fun a -> k a >>= f)
| Ready r -> f r
let (>>) p1 p2' =
p1 >>= fun _ -> p2' ()
let rec forever m =
m >> fun () -> forever m
let rec replicate n m =
if n = 0 then return ()
else m >> fun () -> replicate (n - 1) m
(* Pipes creation *)
let empty = Ready ()
let yield b = Yield (b, fun () -> empty)
let await = Await (fun b -> Ready b)
(* Category instance *)
let id = Await (fun a -> yield a)
let rec compose d u =
match d, u with
| Ready r , _ -> Ready r
| Yield (b, d') , _ -> Yield (b, fun () -> compose (d' ()) u)
| Await k , Yield (b, u') -> compose (k b) (u' ())
| Await _ , Await k -> Await (fun a -> compose d (k a))
| _ , Ready r -> Ready r
let (<-<) d u = compose d u
let (>->) u d = compose d u
(* Consumers *)
let next pipe =
match pipe with
| Yield (a, p') -> Some (a, p' ())
| _ -> None
let collect pipe =
let rec loop pipe' acc =
match next pipe' with
| Some (a, rest) -> loop rest (a::acc)
| None -> List.rev acc in
loop pipe []
let rec fold f acc source =
match next source with
| Some (a, rest) -> fold f (f acc a) rest
| None -> acc
let nth n source =
let rec loop n source =
match next source with
| Some (a, rest) ->
if n = 0 then Some a
else loop (n - 1) rest
| None -> None
in loop n source
let sum source = fold (+) 0 source
let length source = fold (fun c _ -> c + 1) 0 source
(* Transformers *)
let rec map f =
await >>= fun a -> yield (f a) >> fun () -> map f
let rec filter p =
await >>= fun a ->
if p a then yield a >> fun () -> filter p
else filter p
let rec take n =
if n = 0 then empty
else await >>= fun a -> yield a >> fun () -> take (n - 1)
let rec take_while pred =
await >>= fun a ->
if pred a then yield a >> fun () -> take_while pred
else empty
let rec drop n =
if n = 0 then id
else await >>= fun a -> drop (n - 1)
let rec drop_while pred =
await >>= fun a ->
if pred a then drop_while pred
else id
let slice i j =
drop i >-> take (j - i)
(* Producers *)
let count () =
let rec loop n =
yield n >> fun () -> loop (n + 1) in
loop 0
let rec repeat ?n x =
match n with
| Some n -> replicate n (yield x)
| None -> forever (yield x)
let rec iota stop =
count () >-> take stop
let range start stop =
count () >-> take stop >-> drop start
let rec file path =
let chan = open_in path in
let rec loop () =
match input_line chan with
| line -> yield line >> fun () -> loop ()
| exception End_of_file -> close_in chan; empty
in
loop ()
(* Examples *)
let () =
let stream = count ()
>-> take 10
>-> filter (fun n -> n mod 2 = 0)
>-> map string_of_int in
assert (collect stream = ["0"; "2"; "4"; "6"; "8"])
(* Pipes usage example
*
* Problem:
* Compute the total bytes sent to a particular host for
* successful responses until a given date.
*
* Build:
* $ ocamlbuild successful_bytes_sent.native
*
* Source: http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html *)
(* String extension *)
module String = struct
include String
let split ?(on=' ') str =
let rec indices acc i =
try
let i = succ (String.index_from str i on) in
indices (i::acc) i
with Not_found ->
(String.length str + 1) :: acc
in
let is = indices [0] 0 in
let rec aux acc = function
| last::start::tl ->
let w = String.sub str start (last - start - 1) in
aux (w::acc) (start::tl)
| _ -> acc
in
aux [] is
let starts_with s2 s1 =
let len1 = String.length s1
and len2 = String.length s2 in
if len1 < len2 then false else
let sub = String.sub s1 0 len2 in
(sub = s2)
end
(* Total bytes sent to a host until a given date *)
let successful_bytes_sent input ~host ~until:date =
let bytes =
file input
>-> map (String.split ~on:' ')
>-> filter (fun entries -> List.nth entries 0 = host)
>-> filter (fun entries -> List.nth entries 8 = "200")
>-> take_while (fun entries -> List.nth entries 3 |> String.starts_with ("[" ^ date))
>-> map (fun entries -> List.nth entries 9)
>-> map int_of_string
in
fold (+) 0 bytes
open Printf
let () =
let host = "port26.annex2.nwlink.com" in
let date = "01/Jul" in
let res = successful_bytes_sent "NASA_access_log_Jul95" ~host ~until:date
in
printf "Total successful bytes sent to %s until %s: %d" host date res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment