-
-
Save InfiniteLee/381da79c89e75a0571c905e4717ae3fb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#file project_web/transports/flat_buffers_serializer.ex | |
defmodule Project.Transports.FlatBuffersSerializer do | |
@moduledoc false | |
@behaviour Phoenix.Transports.Serializer | |
alias Phoenix.Socket.{Reply, Message, Broadcast} | |
def fastlane!(%Broadcast{event: "presence_diff"} = broadcast) do | |
payload = %{ | |
joins: format_presence(broadcast.payload.joins), | |
leaves: format_presence(broadcast.payload.leaves) | |
} | |
broadcast | |
|> Map.put(:payload, payload) | |
|> encode_and_pack! | |
end | |
def fastlane!(%Broadcast{} = broadcast) do | |
encode_and_pack!(broadcast) | |
end | |
def encode!(%Reply{} = reply) do | |
reply | |
|> Map.put(:event, "phx_reply") | |
|> Map.put(:payload, %{status: reply.status, response: reply.payload}) | |
|> encode_and_pack! | |
end | |
def encode!(%Message{event: "presence_state"} = msg) do | |
%Message{msg | payload: %{state: format_presence(msg.payload)}} | |
|> encode_and_pack! | |
end | |
def encode!(%Message{} = msg) do | |
encode_and_pack!(msg) | |
end | |
def decode!(raw_message, _opts \\ []) do | |
port = FlatbufferPort.open_port() | |
load_schema(port) | |
FlatbufferPort.fb_to_json(port, raw_message) | |
case collect_response() do | |
{:response, "error: " <> error} -> | |
Port.close(port) | |
IO.puts "decode! error: " <> error | |
{:response, json} -> | |
Port.close(port) | |
json | |
|> Poison.decode!(as: %Message{}) | |
end | |
end | |
def pack_data(json) do | |
port = FlatbufferPort.open_port() | |
load_schema(port) | |
FlatbufferPort.json_to_fb(port, json) | |
case collect_response() do | |
{:response, "error: " <> error} -> | |
Port.close(port) | |
{:error, "pack_data error: " <> error} | |
{:response, response} -> | |
Port.close(port) | |
{:ok, response} | |
end | |
end | |
def load_schema(port) do | |
{:ok, schema} = File.read("chat.fbs") | |
FlatbufferPort.load_schema(port, schema) | |
case collect_response() do | |
{:response, "error: " <> error} -> | |
throw "load_schema error: " <> error | |
{:response, "ok"} -> | |
{:ok, "load schema ok"} | |
end | |
end | |
defp encode_and_pack!(%{} = msg) do | |
{:ok, data} = pack_data(Poison.encode!(msg)) | |
{:socket_push, :binary, data} | |
end | |
defp format_presence(payload) do | |
for {key, %{metas: metas}} <- payload, into: [] do | |
%{user: key, metas: metas} | |
end | |
end | |
defp collect_response() do | |
receive do | |
{_port, {:data, data}} -> {:response, data} | |
after | |
3000 -> :timeout | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment