Last active
August 8, 2016 12:45
-
-
Save vheathen/a21be4cd275e10679e553c5d3de54931 to your computer and use it in GitHub Desktop.
Elixir + Phoenix Channels memory consumtion
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Myapp.PlaylistsUserChannel do | |
use Myapp.Web, :channel | |
import Ecto.Query | |
alias Myapp.Repo | |
alias Myapp.Playlist | |
alias Phoenix.Socket.Message | |
intercept ["playlists:added", "playlists:deleted", "playlists:updated"] | |
def join("playlists:" <> user_id, _params, socket) do | |
current_user = socket.assigns.current_user | |
if (user_id) == current_user.id do | |
{:ok, socket} | |
else | |
{:error, %{reason: "Invalid user"}} | |
end | |
end | |
##################################################################### | |
## Playlists section | |
##################################################################### | |
# Content list request handler | |
def handle_in("playlists:list", _payload, socket) do | |
{ opid, socket } = get_operation_id(socket) | |
send(self, :list) | |
{:reply, :ok, socket} | |
end | |
#################################################################### | |
#################################################################### | |
def handle_out(event, payload, socket) do | |
{ opid, socket } = get_operation_id(socket) | |
payload = payload |> Map.put(:opid, opid) | |
push socket, event, payload | |
{:noreply, socket} | |
end | |
##################### | |
##### async ops ##### | |
##################### | |
def handle_info(:list, socket) do | |
event = "playlists:list" | |
fun = fn -> | |
try do | |
res = socket.assigns.current_user | |
|> Playlist.get_by_user | |
|> order_by([desc: :updated_at]) | |
|> Repo.all | |
%{transport_pid: transport_pid, topic: topic} = socket | |
msg = %Message{topic: topic, | |
event: event, | |
payload: %{ | |
data: res, | |
opid: socket.assigns.opid } } | |
encoded_msg = {:socket_push, :text, Poison.encode_to_iodata!(msg)} | |
send transport_pid, encoded_msg | |
{ :nodata } | |
catch | |
res -> | |
Logger.debug "No playlists or something wrong: #{inspect res}" | |
{ :push, %{error: "No playlists"} } | |
end | |
end | |
socket = run_task(socket.assigns.opid, event, fun, socket) | |
{:noreply, socket} | |
end | |
################################ | |
# Elixir.Task results receiver # | |
################################ | |
def handle_info({ref, message}, socket) when is_reference(ref) do | |
{ opid, subj, socket } = get_task(ref, socket) | |
unless opid == nil do | |
case message do | |
{ :push, payload } -> | |
push socket, subj, Map.merge(%{opid: opid}, payload) | |
{ :broadcast, payload } -> | |
broadcast socket, subj, Map.merge(%{opid: opid}, payload) | |
other -> | |
Logger.debug "Got another message from async task: #{inspect other}" | |
end | |
end | |
{:noreply, socket} | |
end | |
def handle_info({:DOWN, ref, _, _, reason}, socket) when is_reference(ref) do | |
{ opid, subj, socket } = get_task(ref, socket) | |
unless opid == nil || reason == :normal do | |
push socket, subj, %{opid: opid, error: reason} | |
end | |
{:noreply, socket} | |
end | |
# Get operation id and subject (event) | |
defp get_task(ref, socket) when is_reference(ref) do | |
tasks = Map.keys(socket.assigns.running) | |
if task = Enum.find(tasks, &(&1.ref == ref)) do | |
{ opid, subj } = socket.assigns.running[task] | |
socket = assign(socket, :running, Map.delete(socket.assigns.running, task)) | |
{ opid, subj, socket } | |
else | |
{ nil, nil, socket } | |
end | |
end | |
defp run_task(id, subject, fun, socket) do | |
task = Task.async(fun) | |
socket = assign(socket, :running, Map.put(socket.assigns.running, task, {id, subject})) | |
socket | |
end | |
defp get_operation_id(socket) do | |
opid_next = socket.assigns.opid + 1 | |
socket = assign(socket, :opid, opid_next) | |
{ opid_next, socket } | |
end | |
############################################################################ | |
############################################################################ | |
# Add authorization logic here as required. | |
defp authorized?(_payload) do | |
true | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Myapp.Playlist do | |
use Myapp.Web, :model | |
@derive {Poison.Encoder, only: [:id, :title, :description, :updated_at, :inserted_at]} | |
schema "playlists" do | |
field :title, :string | |
field :description, :string | |
belongs_to :user, Myapp.User | |
has_many :tracks, Myapp.Track | |
timestamps() | |
end | |
@required_fields ~w(title) | |
@optional_fields ~w(description) | |
@doc """ | |
Builds a changeset based on the `struct` and `params`. | |
""" | |
def changeset(struct, params \\ %{}) do | |
struct | |
|> cast(params, @required_fields, @optional_fields) | |
|> unique_constraint(:id, name: "playlists_pkey") | |
end | |
def get_by_user(user) do | |
user |> assoc(:playlists) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment