Skip to content

Instantly share code, notes, and snippets.

@minidfx
Last active December 14, 2017 21:14
Show Gist options
  • Save minidfx/9599b64d858ad53cbedcc224e7f4ae95 to your computer and use it in GitHub Desktop.
Save minidfx/9599b64d858ad53cbedcc224e7f4ae95 to your computer and use it in GitHub Desktop.
Contains modules for downloading medias from a friend plex server.
defmodule Plex.Downloader do
@moduledoc """
Module to download a media from a PlexServer asynchronously.
"""
require Logger
use GenServer
alias Plex.PlexDownloadRequest
alias Plex.PlexMedia
alias Plex.Profile
@typedoc """
Type representing the tuple of a download media.
"""
@type download_arguments :: {PlexMedia.t, PlexDownloadRequest.t, binary}
@spec start_link() :: {:ok, pid} | {:error, bitstring} | :ignore
def start_link do
Logger.debug("Plex GenServer started.")
GenServer.start_link(__MODULE__, %{}, name: :plex)
end
@spec init(args :: any) :: {:ok, any}
def init(args), do: {:ok, args}
@spec terminate(reason :: charlist, args :: any) :: any
def terminate(reason, _) do
Logger.error("Plex GenServer stopping, please check next message.")
IO.inspect(reason)
Logger.debug("Plex GenServer stopped.")
end
@doc """
Returns the downloads which are downloading a media.
"""
@spec downloads() :: %{pid => download_arguments}
def downloads, do: GenServer.call(:plex, :downloads)
@callback handle_call(:downloads, map) :: {:reply, map}
def handle_call(:downloads, _, state), do: {:reply, state, state}
@callback handle_info(HTTPoison.AsyncStatus.t, any) :: {:noreply, any}
def handle_info(%HTTPoison.AsyncStatus{:code => code}, state) do
Logger.debug("Server response: #{code}")
{:noreply, state}
end
@callback handle_info(HTTPoison.AsyncChunk.t, any) :: {:noreply, any}
def handle_info(%HTTPoison.AsyncChunk{:id => response_id} = response, state) do
result = case Map.has_key?(state, response_id) do
false -> {:ok, state}
true -> handle_chunk(response, state)
end
case result do
{:ok, state} -> {:noreply, state}
{:pending, state} -> {:noreply, state}
{:error, state} -> {:stop, "Cannot write the file, please check permissions", state}
end
end
@callback handle_info(HTTPoison.AsyncEnd.t, any) :: {:noreply, any}
def handle_info(%HTTPoison.AsyncEnd{:id => response_id} = response, state) do
Logger.debug("Download finished.")
result = case Map.has_key?(state, response_id) do
false -> state
true -> handle_last_chunk(response, state)
end
case result do
{:ok, state} -> {:noreply, state}
{:error, state} ->
Logger.debug("Error to write the file.")
{:stop, "Cannot write the file, please check permissions.", state}
end
end
@callback handle_info(HTTPoison.AsyncHeaders.t, any) :: {:noreply, any}
def handle_info(%HTTPoison.AsyncHeaders{}, state), do: {:noreply, state}
@callback handle_info(HTTPoison.AsyncError.t, any) :: {:noreply, any}
def handle_info(%HTTPoison.Error{:id => response_id, :reason => reason}, state) do
Logger.error("Error!")
IO.inspect(reason)
new_state = case Map.has_key?(state, response_id) do
false -> state
true ->
{
{
%PlexMedia{:file_name => file_name},
%PlexDownloadRequest{:callback_server_id => callback_server_id},
_
},
local_state
} = Map.get_and_update!(state, response_id, fn _ -> :pop end)
GenServer.cast(callback_server_id, {:plex_important_message, "Failed to download the #{file_name}."})
local_state
end
{:noreply, new_state}
end
@callback handle_info(any, state :: map) :: {:noreply, state :: map}
def handle_info(request, state) do
Logger.debug("Unknown state received.")
IO.inspect(request)
{:noreply, state}
end
@callback handle_cast({:stop, pid}, state :: map) :: {:noreply, state :: map}
def handle_cast({:stop, pid}, state) do
:hackney.stop_async(pid)
{:noreply, Map.delete(state, pid)}
end
@callback handle_cast({:start, Profile.t, media_id :: integer, callback_process :: pid}, state :: map) :: {:noreply, state :: map}
def handle_cast({:start, %Profile{:host => host, :token => token, :port => port}, media_id, callback_process}, state) do
download(%PlexDownloadRequest{host: host,
port: port,
media_id: media_id,
plex_token: token,
callback_server_id: callback_process})
{:noreply, state}
end
@callback handle_cast({:init, integer, PlexMedia.t, PlexDownloadRequest.t}, state :: any) :: {:noreply, state :: map}
def handle_cast({:init,
response_id,
%PlexMedia{:file_name => file_name} = media,
request},
state) do
Logger.debug("Filename updated for #{file_name}")
new_state = Map.update(state,
response_id,
{media, request, <<>>},
fn {_, _, chunks} -> {media, request, chunks} end)
{:noreply, new_state}
end
@spec handle_last_chunk(HTTPoison.AsyncEnd.t, state :: map) :: state :: map
defp handle_last_chunk(%HTTPoison.AsyncEnd{:id => response_id}, state) do
{local_download, new_state} = Map.get_and_update!(state, response_id, fn _ -> :pop end)
{
%PlexMedia{:file_name => file_name} = media,
%PlexDownloadRequest{:callback_server_id => callback_server_id} = request,
_
} = local_download
case local_download |> save_latest_chunk_to_disk do
:ok ->
Logger.debug("Latest chunk saved.")
rename_file({media, request, <<>>})
GenServer.cast(callback_server_id, {:plex_important_message, "#{file_name} downloaded successfully and saved."})
{:ok, new_state}
{:error, new_state} ->
Logger.error("Unable to write the latest buffer to the disk.")
GenServer.cast(callback_server_id, {:plex_important_message, "Unable to write the latest buffer to the disk."})
{:error, Map.delete(new_state, response_id)}
end
end
@spec handle_chunk(HTTPoison.AsyncChunk.t, state :: map) :: state :: map
defp handle_chunk(%HTTPoison.AsyncChunk{:id => response_id, :chunk => chunk}, state) do
{_, %{^response_id => local_download} = local_state} = Map.get_and_update!(state, response_id, fn value -> update_chunks(value, chunk) end)
{
%PlexMedia{:file_name => file_name},
%PlexDownloadRequest{:callback_server_id => callback_server_id},
_
} = local_download
case local_download |> save_to_disk do
:ok ->
local_download |> print_download_stats
# Reset the chunks saved in memory
{_, local_new_state} = Map.get_and_update(state,
response_id,
&reset_chunk/1)
{:ok, local_new_state}
:pending -> {:ok, local_state}
{:error, reason} ->
Logger.error("Unable to write the buffer for #{file_name}: #{reason}")
GenServer.cast(callback_server_id, {:plex_important_message, "An error occurred to write the buffer to the disk."})
{:error, Map.delete(local_state, response_id)}
end
end
@spec percentage(PlexMedia.t, bitstring) :: float
defp percentage(%PlexMedia{:size => size},
chunks_bytes_length),
do: chunks_bytes_length * 100 / size |> Float.round(2)
@spec download(PlexDownloadRequest.t) :: {:ok, bitstring} | {:error, bitstring}
defp download(%PlexDownloadRequest{:plex_token => plex_token} = request) do
request |> metadata_url
|> HTTPoison.get(headers(plex_token, "text/xml"), ssl: [{:server_name_indication, :disable}])
|> download_media_metadata(request)
|> download_media(request)
end
@spec reset_chunk(download_arguments) :: {download_arguments, download_arguments}
defp reset_chunk({media, request, _} = value), do: {value, {media, request, <<>>}}
@spec save_to_disk(download_arguments) :: :ok | {:error, reason :: bitstring} | :pending
defp save_to_disk({media, _, chunks}) do
case percentage(media, bit_size(chunks) / 8) >= 25 do
true ->
media |> absolute_file_path
|> append_tmp_extension
|> File.write(chunks, [:append])
false -> :pending
end
end
@spec save_latest_chunk_to_disk(download_arguments) :: :ok | {:error, atom}
defp save_latest_chunk_to_disk({media, _, chunks}) do
media |> absolute_file_path
|> append_tmp_extension
|> File.write(chunks, [:append])
end
@spec update_chunks(download_arguments | nil, binary) :: {download_arguments, download_arguments}
defp update_chunks({%PlexMedia{:progress => progress} = media, request, chunks} = value, chunk),
do: {value, {%{media | progress: progress + bit_size(chunk) / 8}, request, chunks <> chunk}}
defp update_chunks(nil, chunk), do: {nil, {nil, nil, chunk}}
@spec headers(token :: bitstring, content_type :: bitstring) :: headers :: any
defp headers(plex_token, content_type), do: [Accept: content_type, "X-Plex-Token": plex_token]
@spec metadata_url(PlexDownloadRequest.t) :: bitstring
defp metadata_url(%PlexDownloadRequest{:host => host, :port => port, :media_id => media_id}) do
url = "https://#{host}:#{port}/library/metadata/#{media_id}"
Logger.debug(url)
url
end
@spec download_media_metadata({:ok, HTTPoison.Response.t} | {:error, HTTPoison.Error.t}, PlexDownloadRequest.t) :: {:ok, PlexMedia.t} | {:error, reason :: bitstring}
defp download_media_metadata({:ok, %HTTPoison.Response{:status_code => 200, :body => xml}},
%PlexDownloadRequest{:host => host, :port => port}) do
doc = Exml.parse(xml)
xml_queries = [
{:media_path, Exml.get(doc, "//Part[1]/@key")},
{:file_name, Exml.get(doc, "//Part[1]/@file")},
{:size, Exml.get(doc, "//Part[1]/@size")}
]
case Enum.filter(xml_queries, fn {_, value} -> value == nil end) do
[] ->
media_path = Keyword.get(xml_queries, :media_path)
file_name = Keyword.get(xml_queries, :file_name) |> Path.basename
{size, _} = Keyword.get(xml_queries, :size) |> Integer.parse
url = "https://#{host}:#{port}#{media_path}"
Logger.debug("Media: file_name(#{file_name}), url(#{url}), size(#{size})")
{:ok, %PlexMedia{file_name: file_name, url: url, size: size}}
x ->
invalid_keys = x |> Enum.map(fn {key, _} -> key |> Atom.to_string end) |> Enum.join(",")
{:error, "Following values cannot be read from the XML response, perhaps an invalid media id: #{invalid_keys}"}
end
end
defp download_media_metadata({:ok, %HTTPoison.Response{:status_code => 401}}, %PlexDownloadRequest{:host => host}),
do: {:error, "Unauthorized to access to the server #{host}, please check the Plex token."}
defp download_media_metadata({:ok, %HTTPoison.Response{:status_code => status_code}}, %PlexDownloadRequest{:host => host}),
do: {:error, "HTTP error response: #{host}, #{status_code}"}
defp download_media_metadata({:error, error}, %PlexDownloadRequest{:host => host, :port => port}) do
IO.inspect(error)
{:error, "Cannot contact the host #{host}:#{port}, please check server logs."}
end
@spec download_media({:ok, PlexMedia.t} | {:error, reason :: bitstring}, PlexDownloadRequest.t) :: :ok | {:error, reason :: bitstring}
defp download_media({:ok, media},
%PlexDownloadRequest{:callback_server_id => callback_server_id} = request) do
path = absolute_file_path(media)
path_temp = path |> append_tmp_extension
case {path |> File.exists?, path_temp |> File.exists?} do
{false, true} ->
File.rm!(path_temp)
start_download(media, request)
{false, false} ->
start_download(media, request)
_ ->
GenServer.cast(callback_server_id, {:plex_important_message, "The output file #{path} or the temp file #{path_temp} already exist."})
{:error, "The output file #{path} or the temp file #{path_temp} already exist."}
end
end
defp download_media({:error, reason} = error, %PlexDownloadRequest{:callback_server_id => callback_server_id}) do
Logger.error(reason)
GenServer.cast(callback_server_id, {:plex_message, reason})
error
end
@spec start_download(PlexMedia.t, PlexDownloadRequest.t) :: :ok
defp start_download(%PlexMedia{:file_name => file_name, :url => url} = media, %PlexDownloadRequest{:plex_token => plex_token, :callback_server_id => callback_server_id} = request) do
%{:id => id} = url |> HTTPoison.get!(headers(plex_token, "application/octet-stream"),
stream_to: :plex,
ssl: [{:server_name_indication, :disable}])
media = %{media | start_downloading_datetime: NaiveDateTime.utc_now()}
GenServer.cast(:plex, {:init, id, media, request})
GenServer.cast(callback_server_id, {:plex_message, "#{file_name} download started."})
end
@spec rename_file(download_arguments) :: {:ok, download_arguments} | {:error, download_arguments}
defp rename_file({media, _, _} = local_download) do
path = media |> absolute_file_path
path_temp = path |> append_tmp_extension
case File.rename(path_temp, path) do
:ok ->
Logger.info("Media saved: #{path}")
{:ok, local_download}
{:error, reason} ->
Logger.error("Unable to save the media: #{reason}")
{:error, reason, local_download}
end
end
@spec print_download_stats(download_arguments) :: {:ok, percentage :: integer} | {:error, reason :: bitstring}
defp print_download_stats({%PlexMedia{:start_downloading_datetime => start,
:file_name => file_name,
:size => size,
:progress => progress} = media,
%PlexDownloadRequest{:callback_server_id => callback_server_id},
_}) do
local_percentage = percentage(media, progress)
total = size / 1024 / 1024 |> Float.round(2)
downloaded = progress / 1024 / 1024 |> Float.round(2)
time_elapsed = NaiveDateTime.diff(NaiveDateTime.utc_now, start)
speed = case time_elapsed do
0 -> downloaded |> Float.round(2)
x -> (downloaded / x) |> Float.round(2)
end
Logger.debug("#{file_name} downloaded: #{local_percentage}%, #{downloaded}MB, Avg. speed: #{speed}MB/s")
Logger.debug("Total: #{downloaded}, Time elasped: #{time_elapsed}")
message = "#{file_name}: #{downloaded}/#{total}MB, #{local_percentage}%, Avg. speed: #{speed}MB/s"
GenServer.cast(callback_server_id, {:plex_message, message})
:ok
end
@spec append_tmp_extension(bitstring) :: bitstring
defp append_tmp_extension(path), do: "#{path}.tmp"
@spec absolute_file_path(PlexMedia.t) :: path :: bitstring
defp absolute_file_path(%PlexMedia{:file_name => file_name}) do
download_path = Application.get_env(:bjbot, :download_path)
"#{download_path}/#{file_name}"
end
end
defmodule Plex.PlexDownloadRequest do
@doc """
Model representing the request which will send to the downloader.
"""
defstruct [:host, :port, :media_id, :plex_token, :callback_server_id]
@type t :: %__MODULE__{host: bitstring, port: integer, media_id: integer, plex_token: bitstring, callback_server_id: pid}
end
defmodule Plex.PlexMedia do
@doc """
Model representing the media for the downloader.
"""
defstruct [:file_name, :url, :start_downloading_datetime, size: 0.0, progress: 0.0]
@type t :: %__MODULE__{file_name: bitstring, url: bitstring, start_downloading_datetime: struct, size: float, progress: float}
end
defmodule Plex.Profile do
defstruct [:name, :token, :host, port: 32400]
@type t :: %__MODULE__{name: bitstring, token: bitstring, host: bitstring, port: integer}
end
@minidfx
Copy link
Author

minidfx commented Dec 13, 2017

Add the downloader into your supervisor tree and send a message to the process :plex as following to start the download:

GenServer.cast(:plex, {:start, %Profile{token: <token>, host: <host>, port: <port>}, <media_id>, <callback_process_pid>})

Callback methods:

handle_cast({:plex_important_message, <message>})
handle_cast({:plex_message, <message>})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment