Skip to content

Instantly share code, notes, and snippets.

@fcruxen
Last active September 11, 2015 14:16
Show Gist options
  • Save fcruxen/c523c6be9dbaa7474a3c to your computer and use it in GitHub Desktop.
Save fcruxen/c523c6be9dbaa7474a3c to your computer and use it in GitHub Desktop.
defmodule ArkRaccoon.RaccoonServerController do
use ArkRaccoon.Web, :controller
use Timex
require Logger
alias ArkRaccoon.Repo
alias ArkRaccoon.RaccoonAgent
alias ArkRaccoon.RaccoonQueue
alias ArkRaccoon.RaccoonResult
import Ecto.Query
def get_job(conn, _params) do
name = _params["name"]
key = _params["key"]
query = from agent in RaccoonAgent, where: agent.name == ^(name) and agent.key == ^(key), limit: 1
agent = List.first(Repo.all(query))
if agent do
if !agent.active, do: Repo.update(%{agent | active: true})
Logger.info "Agent connected:" <> agent.name
query = from queue in RaccoonQueue, where: is_nil(queue.date_retrieved),
where: queue.raccoon_browser_id in ^agent.raccoon_browser_ids,
where: queue.raccoon_location_id == ^agent.raccoon_location_id,
limit: 1,
order_by: [desc: queue.created_at]
queue = List.first(Repo.all(query) |> Repo.preload([:raccoon_browser, :raccoon_job, :raccoon_connection]))
if queue do
Logger.info "Agent "<> agent.name <> " got queue: " <> queue.id <> " for job: " <> queue.raccoon_job.label
Repo.update(%{queue | raccoon_agent_id: agent.id, date_retrieved: Date.now })
json conn, %{id: queue.id, browser: queue.raccoon_browser.abbr,
upstream: queue.raccoon_connection.upstream, downstream: queue.raccoon_connection.downstream,
latency: queue.raccoon_connection.latency, url: queue.raccoon_job.url,
headers: queue.raccoon_job.headers, remap_dns: queue.raccoon_job.remap_dns,
runs: queue.raccoon_job.runs, cached: queue.raccoon_job.cached, script: queue.raccoon_job.script,
type: RaccoonQueue.type(queue), filmstrip: queue.raccoon_job.filmstrip}
else
conn |> put_status(304) |> json nil
end
else
conn |> put_status(404) |> json nil
end
end
def save_result(conn, _params) do
result = _params["result"]
queue = Repo.get(RaccoonQueue, result["id"])
if queue do
changeset = RaccoonResult.changeset(%ArkRaccoon.RaccoonResult{}, result)
Repo.insert(changeset)
conn |> put_status(202) |> json nil
else
Logger.debug "Could not find queue using id: " <> result["id"]
conn |> put_status(404) |> json nil
end
end
end
defmodule ArkRaccoon.RaccoonJob do
use ArkRaccoon.Web, :model
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "raccoon_jobs" do
field :label
field :description
field :url
field :detailed, :boolean
field :resumed, :boolean
field :priority, :boolean, default: false
field :active, :boolean, default: false
field :remap_dns, {:array, :map}
field :headers, {:array, :map}
field :blacklist, {:array, :map}
field :date_start, Ecto.DateTime
field :date_stop, Ecto.DateTime
field :runs, :integer
field :cached, :boolean, default: false
field :filmstrip, :boolean, default: false
has_many :raccoon_queues, ArkRaccoon.RaccoonQueue
embeds_many :script, ArkRaccoon.RaccoonJobScript
end
@required_fields ~w()
@optional_fields ~w()
@doc """
Creates a changeset based on the `model` and `params`.
If `params` are nil, an invalid changeset is returned
with no validation performed.
"""
def changeset(resource, params) do
params
|> cast(params, @required_fields, @optional_fields)
end
end
defmodule ArkRaccoon.RaccoonQueue do
use ArkRaccoon.Web, :model
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "raccoon_queues" do
field :priority, :boolean
field :resumed, :boolean
field :detailed, :boolean
field :created_at, Timex.Ecto.DateTime
field :date_retrieved, Timex.Ecto.DateTime
field :date_processed, Timex.Ecto.DateTime
field :alert, :boolean
belongs_to :raccoon_agent, ArkRaccoon.RaccoonAgent
belongs_to :raccoon_location, ArkRaccoon.RaccoonLocation
belongs_to :raccoon_job, ArkRaccoon.RaccoonJob
belongs_to :raccoon_browser, ArkRaccoon.RaccoonBrowser
belongs_to :raccoon_connection, ArkRaccoon.RaccoonConnection
has_many :raccoon_results, ArkRaccoon.RaccoonResult
end
@required_fields ~w()
@optional_fields ~w()
@doc """
Creates a changeset based on the `model` and `params`.
If `params` are nil, an invalid changeset is returned
with no validation performed.
"""
def type(queue)do
if queue.detailed do
"Detailed"
else
"Resumed"
end
end
def changeset(resource, params) do
params
|> cast(params, @required_fields, @optional_fields)
end
end
defmodule ArkRaccoon.RaccoonResult do
use ArkRaccoon.Web, :model
use Timex
alias ArkRaccoon.RaccoonQueue
alias ArkRaccoon.RaccoonResult
alias ArkRaccoon.Repo
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "raccoon_results" do
field :run, :integer
field :timing, {:array, :map}
field :summary, {:array, :map}
field :navigation, :map
field :os, {:array, :map}
field :cached, :boolean
field :error
field :fully_loaded, :integer
field :document_complete, :integer
field :ttfb, :integer
field :requests, :integer
field :dns_time, :integer
field :total_size, :integer
field :domain_list, {:array, :map}
field :created_at, Timex.Ecto.DateTime
belongs_to :raccoon_queue, ArkRaccoon.RaccoonQueue
embeds_many :screenshots, ArkRaccoon.RaccoonResultScreenshot
timestamps
end
after_insert :schedule_result
def set_fully_loaded(changeset) do
try do
dates = Enum.reduce changeset.changes.navigation["log"]["entries"], [],
fn x, acc -> List.insert_at(acc, 0, (elem(x["startedDateTime"] |> DateFormat.parse("{ISO}"), 1)
|> Date.to_secs) * 1000 |> + x["time"])
end
start = List.first(changeset.changes.navigation["log"]["entries"])["startedDateTime"]
|> DateFormat.parse("{ISO}") |> elem(1) |> Date.to_secs
start = start * 1000
fully_loaded = Enum.max(dates) - start
put_change(changeset, :fully_loaded, fully_loaded)
rescue
_ -> put_change(changeset, :fully_loaded, 0)
end
end
def set_total_size(changeset) do
try do
total_size = Enum.reduce changeset.changes.navigation["log"]["entries"], [],
fn x, acc -> List.insert_at(acc, 0, x["response"]["bodySize"]) end
put_change(changeset, :total_size, Enum.sum(total_size))
rescue
_ -> put_change(changeset, :total_size, 0)
end
end
def set_dns_time(changeset) do
try do
dns = Enum.reduce changeset.changes.navigation["log"]["entries"], [],
fn x, acc -> List.insert_at(acc, 0, x["timings"]["dns"]) end
put_change(changeset, :dns_time, Enum.sum(dns))
rescue
_ -> put_change(changeset, :dns_time, 0)
end
end
def set_ttfb(changeset) do
try do
ttfb = Enum.reduce changeset.changes.summary, [],
fn x, acc -> List.insert_at(acc, 0, x["time_to_first_byte"]) end
put_change(changeset, :ttfb, Enum.sum(ttfb))
rescue
e -> put_change(changeset, :ttfb, 0)
end
end
def set_document_complete(changeset) do
try do
doc = Enum.reduce changeset.changes.summary, [],
fn x, acc -> List.insert_at(acc, 0, x["response_time"]) end
put_change(changeset, :document_complete, Enum.sum(doc))
rescue
e -> put_change(changeset, :document_complete, 0)
end
end
def set_requests(changeset) do
try do
requests = Enum.count(changeset.changes.navigation["log"]["entries"])
put_change(changeset, :requests, requests)
rescue
e -> put_change(changeset, :requests, 0)
end
end
def validate_field_presence_unless_error(changeset, field) do
error = Ecto.Changeset.get_field(changeset, :error)
field = Ecto.Changeset.get_field(changeset, field)
unless error do
cond do
field == nil ->
add_error changeset, field, "#{field} cannot be blank"
field == "" ->
add_error changeset, field, "#{field} cannot be blank"
true ->
changeset
end
else
changeset
end
end
def set_create_date(changeset) do
put_change(changeset, :created_at, Date.now)
end
def set_navigation(changeset) do
queue = Repo.get(RaccoonQueue, changeset.changes.raccoon_queue_id)
if queue.resumed do
put_change(changeset, :navigation, nil)
else
changeset
end
end
def clear_failed(changeset) do
queue_id = changeset.changes.raccoon_queue_id
run = changeset.changes.run
cached = changeset.changes.cached
result = List.first(Repo.all(from r in RaccoonResult, where: r.raccoon_queue_id == ^queue_id,
where: r.run == ^run, where: r.cached == ^cached, limit: 1))
if result do
Repo.delete(result)
end
changeset
end
def set_processed(changeset) do
queue = Repo.get(RaccoonQueue, changeset.changes.raccoon_queue_id) |> Repo.preload([:raccoon_job])
if queue.raccoon_job.runs == changeset.changes.run and queue.raccoon_job.cached == changeset.changes.cached do
IO.puts "queue: #{changeset.changes.raccoon_queue_id} processed all results"
Repo.update(%{queue | date_processed: Date.now})
end
changeset
end
def schedule_result(changeset) do
queue = Repo.get(RaccoonQueue, changeset.changes.raccoon_queue_id)
if queue.detailed do
{:ok, sup} = Exq.Enqueuer.start_link(Application.get_env(:ark_raccoon, :exq))
{:ok, ack} = Exq.Enqueuer.enqueue(:exq_enqueuer, "default", "RaccoonResultCdnWorker", [changeset.model.id])
Exq.Enqueuer.stop(:exq_enqueuer)
end
changeset
end
@doc """
Creates a changeset based on the `model` and `params`.
If `params` are nil, an invalid changeset is returned
with no insert performed.
"""
def changeset(model, params \\ :empty) do
model
|> cast(params, ~w(run cached raccoon_queue_id), ~w(timing summary fully_loaded document_complete dns_time ttfb requests total_size error navigation created_at screenshots))
|> set_create_date()
|> validate_field_presence_unless_error(:summary)
|> validate_field_presence_unless_error(:timing)
|> validate_field_presence_unless_error(:fully_loaded)
|> validate_field_presence_unless_error(:document_complete)
|> validate_field_presence_unless_error(:dns_time)
|> validate_field_presence_unless_error(:ttfb)
|> validate_field_presence_unless_error(:requests)
|> validate_field_presence_unless_error(:total_size)
|> set_fully_loaded
|> set_total_size
|> set_requests
|> set_ttfb
|> set_document_complete
|> set_dns_time
|> set_navigation
|> clear_failed
|> set_processed
|> schedule_result
end
end
@josevalim
Copy link

Here is what we should do:

  1. The model should only create the changeset. It should never call the repository
  2. You are going to move clear_failed, set_processed and schedule_result to the controller for now. Instead of Repo.insert, you will do something like:
Repo.transaction fn ->
  case Repo.insert(changeset) do
    {:ok, model} ->
      model |> clear_failed |> set_processed |> schedule_result
    {:error, changeset} ->
      Repo.rollback(changeset)
  end
end
  1. However the code above and the clear_failed functions do not really belong in the controller. Move it to somewhere else. Let's call it a Racoon.QueueService:
defmodule Racoon.QueueService do
  def insert_result_and_update_queues(changeset) do
    Repo.transaction fn ->
      case Repo.insert(changeset) do
        {:ok, model} ->
          model |> clear_failed |> set_processed |> schedule_result
        {:error, changeset} ->
          Repo.rollback(changeset)
      end
    end
  end
end
  1. Now your controller will look like this:
def create(conn, params) do
  result = Result.changeset(%Result{}, params)
  case Racoon.QueueService.insert_result_and_update_queues(changeset) do
    {:ok, model} -> ...
    {:error, changeset} -> ...
  end
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment