Last active
September 19, 2016 22:34
-
-
Save phaer/0cd7083d34879aebc7ccd4b7e7f1033d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Lwt.Infix | |
open Websocket_lwt | |
open Ppx_lwt | |
open Frame | |
let ip_port = ("127.0.0.1", 3000);; | |
let clients = Hashtbl.create 10;; | |
let section = Lwt_log.Section.make "stream_shape";; | |
Lwt_log.(add_rule "*" Debug);; | |
let handle_message id req recv send = | |
let rec react () = | |
recv () >>= fun fr -> | |
Hashtbl.remove clients id; | |
Hashtbl.add clients id send; | |
Lwt_log.debug_f ~section "Client %d: %s" id Frame.(show fr) | |
>>= fun () -> | |
match fr.opcode with | |
| Opcode.Ping -> | |
send Frame.(create ~opcode:Opcode.Pong ~content:fr.content ()) | |
>>= react | |
| Opcode.Pong -> | |
react () | |
| Opcode.Text -> | |
send Frame.(create ~opcode:Opcode.Text ~content:"foo" ()) | |
>>= react | |
| Opcode.Close -> | |
Hashtbl.remove clients id; | |
send Frame.(create ~opcode:Opcode.Close ~content:fr.content ()) | |
| _ -> | |
send Frame.(close 1002) | |
in | |
Lwt_log.info_f ~section "Connection from client id %d" id >>= fun () -> | |
try%lwt react () with exn -> | |
Lwt_log.error_f ~section ~exn "Client %d error" id >>= fun () -> | |
Lwt.fail exn;; | |
let broadcast msg = Hashtbl.iter | |
(fun id send -> Lwt.async (fun () -> send Frame.(create ~opcode:Opcode.Text ~content:msg ()))) | |
clients;; | |
let start_server = | |
let open Conduit_lwt_unix in | |
let endpoint = `TCP (Ipaddr.of_string_exn @@ fst ip_port, snd ip_port) in | |
endp_to_server ~ctx:default_ctx endpoint >>= fun server -> | |
establish_server ~ctx:default_ctx ~mode:server handle_message;; | |
let redis_subscribe host port channels = | |
let open Redis_lwt.Client in | |
let quote s = "\"" ^ s ^ "\"" | |
and format_string s = function | |
| 0 -> "type: " ^ s | |
| 1 -> "channel: " ^ s | |
| _ -> "message: " ^ s in | |
let format_value index = function | |
| `Bulk Some str -> format_string (quote str) index | |
| `Error str -> "error: " ^ quote(str) | |
| `Status str -> "status: " ^ quote(str) | |
| `Int i -> "value: " ^ string_of_int i | |
| _ -> "error: \"invalid redis message\"" in | |
let broadcast_value l = | |
Lwt.return @@ broadcast @@ "{" ^ (String.concat ", " @@ List.mapi format_value l) ^ "}" | |
in | |
(connect {host=host; port=port}) | |
>>= (fun conn -> subscribe conn channels; Lwt.return conn) | |
>>= (fun conn -> Lwt_stream.iter_s broadcast_value (stream conn)) in | |
Lwt_main.run @@ Lwt.join [start_server; redis_subscribe "127.0.0.1" 6379 ["test"] ];; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment