Skip to content

Instantly share code, notes, and snippets.

@phaer
Last active September 19, 2016 22:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phaer/0cd7083d34879aebc7ccd4b7e7f1033d to your computer and use it in GitHub Desktop.
Save phaer/0cd7083d34879aebc7ccd4b7e7f1033d to your computer and use it in GitHub Desktop.
open Lwt.Infix
open Websocket_lwt
open Ppx_lwt
open Frame
let ip_port = ("127.0.0.1", 3000);;
let clients = Hashtbl.create 10;;
let section = Lwt_log.Section.make "stream_shape";;
Lwt_log.(add_rule "*" Debug);;
let handle_message id req recv send =
let rec react () =
recv () >>= fun fr ->
Hashtbl.remove clients id;
Hashtbl.add clients id send;
Lwt_log.debug_f ~section "Client %d: %s" id Frame.(show fr)
>>= fun () ->
match fr.opcode with
| Opcode.Ping ->
send Frame.(create ~opcode:Opcode.Pong ~content:fr.content ())
>>= react
| Opcode.Pong ->
react ()
| Opcode.Text ->
send Frame.(create ~opcode:Opcode.Text ~content:"foo" ())
>>= react
| Opcode.Close ->
Hashtbl.remove clients id;
send Frame.(create ~opcode:Opcode.Close ~content:fr.content ())
| _ ->
send Frame.(close 1002)
in
Lwt_log.info_f ~section "Connection from client id %d" id >>= fun () ->
try%lwt react () with exn ->
Lwt_log.error_f ~section ~exn "Client %d error" id >>= fun () ->
Lwt.fail exn;;
let broadcast msg = Hashtbl.iter
(fun id send -> Lwt.async (fun () -> send Frame.(create ~opcode:Opcode.Text ~content:msg ())))
clients;;
let start_server =
let open Conduit_lwt_unix in
let endpoint = `TCP (Ipaddr.of_string_exn @@ fst ip_port, snd ip_port) in
endp_to_server ~ctx:default_ctx endpoint >>= fun server ->
establish_server ~ctx:default_ctx ~mode:server handle_message;;
let redis_subscribe host port channels =
let open Redis_lwt.Client in
let quote s = "\"" ^ s ^ "\""
and format_string s = function
| 0 -> "type: " ^ s
| 1 -> "channel: " ^ s
| _ -> "message: " ^ s in
let format_value index = function
| `Bulk Some str -> format_string (quote str) index
| `Error str -> "error: " ^ quote(str)
| `Status str -> "status: " ^ quote(str)
| `Int i -> "value: " ^ string_of_int i
| _ -> "error: \"invalid redis message\"" in
let broadcast_value l =
Lwt.return @@ broadcast @@ "{" ^ (String.concat ", " @@ List.mapi format_value l) ^ "}"
in
(connect {host=host; port=port})
>>= (fun conn -> subscribe conn channels; Lwt.return conn)
>>= (fun conn -> Lwt_stream.iter_s broadcast_value (stream conn)) in
Lwt_main.run @@ Lwt.join [start_server; redis_subscribe "127.0.0.1" 6379 ["test"] ];;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment