Skip to content

Instantly share code, notes, and snippets.

@vheathen
Last active August 8, 2016 12:45
Show Gist options
  • Save vheathen/a21be4cd275e10679e553c5d3de54931 to your computer and use it in GitHub Desktop.
Save vheathen/a21be4cd275e10679e553c5d3de54931 to your computer and use it in GitHub Desktop.
Elixir + Phoenix Channels memory consumtion
defmodule Myapp.PlaylistsUserChannel do
use Myapp.Web, :channel
import Ecto.Query
alias Myapp.Repo
alias Myapp.Playlist
alias Phoenix.Socket.Message
intercept ["playlists:added", "playlists:deleted", "playlists:updated"]
def join("playlists:" <> user_id, _params, socket) do
current_user = socket.assigns.current_user
if (user_id) == current_user.id do
{:ok, socket}
else
{:error, %{reason: "Invalid user"}}
end
end
#####################################################################
## Playlists section
#####################################################################
# Content list request handler
def handle_in("playlists:list", _payload, socket) do
{ opid, socket } = get_operation_id(socket)
send(self, :list)
{:reply, :ok, socket}
end
####################################################################
####################################################################
def handle_out(event, payload, socket) do
{ opid, socket } = get_operation_id(socket)
payload = payload |> Map.put(:opid, opid)
push socket, event, payload
{:noreply, socket}
end
#####################
##### async ops #####
#####################
def handle_info(:list, socket) do
event = "playlists:list"
fun = fn ->
try do
res = socket.assigns.current_user
|> Playlist.get_by_user
|> order_by([desc: :updated_at])
|> Repo.all
%{transport_pid: transport_pid, topic: topic} = socket
msg = %Message{topic: topic,
event: event,
payload: %{
data: res,
opid: socket.assigns.opid } }
encoded_msg = {:socket_push, :text, Poison.encode_to_iodata!(msg)}
send transport_pid, encoded_msg
{ :nodata }
catch
res ->
Logger.debug "No playlists or something wrong: #{inspect res}"
{ :push, %{error: "No playlists"} }
end
end
socket = run_task(socket.assigns.opid, event, fun, socket)
{:noreply, socket}
end
################################
# Elixir.Task results receiver #
################################
def handle_info({ref, message}, socket) when is_reference(ref) do
{ opid, subj, socket } = get_task(ref, socket)
unless opid == nil do
case message do
{ :push, payload } ->
push socket, subj, Map.merge(%{opid: opid}, payload)
{ :broadcast, payload } ->
broadcast socket, subj, Map.merge(%{opid: opid}, payload)
other ->
Logger.debug "Got another message from async task: #{inspect other}"
end
end
{:noreply, socket}
end
def handle_info({:DOWN, ref, _, _, reason}, socket) when is_reference(ref) do
{ opid, subj, socket } = get_task(ref, socket)
unless opid == nil || reason == :normal do
push socket, subj, %{opid: opid, error: reason}
end
{:noreply, socket}
end
# Get operation id and subject (event)
defp get_task(ref, socket) when is_reference(ref) do
tasks = Map.keys(socket.assigns.running)
if task = Enum.find(tasks, &(&1.ref == ref)) do
{ opid, subj } = socket.assigns.running[task]
socket = assign(socket, :running, Map.delete(socket.assigns.running, task))
{ opid, subj, socket }
else
{ nil, nil, socket }
end
end
defp run_task(id, subject, fun, socket) do
task = Task.async(fun)
socket = assign(socket, :running, Map.put(socket.assigns.running, task, {id, subject}))
socket
end
defp get_operation_id(socket) do
opid_next = socket.assigns.opid + 1
socket = assign(socket, :opid, opid_next)
{ opid_next, socket }
end
############################################################################
############################################################################
# Add authorization logic here as required.
defp authorized?(_payload) do
true
end
end
defmodule Myapp.Playlist do
use Myapp.Web, :model
@derive {Poison.Encoder, only: [:id, :title, :description, :updated_at, :inserted_at]}
schema "playlists" do
field :title, :string
field :description, :string
belongs_to :user, Myapp.User
has_many :tracks, Myapp.Track
timestamps()
end
@required_fields ~w(title)
@optional_fields ~w(description)
@doc """
Builds a changeset based on the `struct` and `params`.
"""
def changeset(struct, params \\ %{}) do
struct
|> cast(params, @required_fields, @optional_fields)
|> unique_constraint(:id, name: "playlists_pkey")
end
def get_by_user(user) do
user |> assoc(:playlists)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment