Skip to content

Instantly share code, notes, and snippets.

@krainboltgreene
Last active June 16, 2020 11:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krainboltgreene/dd2ed2677217c505f9a96872b9b1db2c to your computer and use it in GitHub Desktop.
Save krainboltgreene/dd2ed2677217c505f9a96872b9b1db2c to your computer and use it in GitHub Desktop.
defmodule LiveDataImport.ESI do
require Logger
@default_host URI.parse("https://esi.evetech.net/")
@default_fetch_query %{datasource: "tranquility"}
@type allowed_verbs :: :get | :patch | :post | :put
@spec read(String.t(), map) :: Flow.t()
def read(resource, query \\ %{datasource: "tranquility"})
when is_binary(resource) and is_map(query) do
{:get, resource, query}
|> fetch()
|> paginate()
|> Flow.map(&parse/1)
|> Flow.map(&cleanup/1)
|> Flow.reject(&is_nil/1)
end
@spec fetch({verb, resource, query}) ::
{verb, resource, query, any}
when verb: allowed_verbs, resource: String.t(), query: map
def fetch({verb, resource, query})
when is_atom(verb) and is_binary(resource) and is_map(query) do
encoded_query = URI.encode_query(Map.merge(@default_fetch_query, query))
uri = URI.merge(URI.merge(@default_host, resource), encoded_query) |> URI.to_string()
etag_header = Cachex.get(
:live_data_import,
"esi/#{verb} #{uri}/etag"
) |> case do
{:ok, nil} ->
Logger.info("Not using etag")
[]
{:ok, value} ->
Logger.info("Using etag", etag: value)
[{"If-None-Match", value}]
end
headers = [
{"Accept", "application/json"},
{"User-Agent", "Affinity MATRiX Service Cluster v1 (kurtis@rainbolt-greene.online)"}
] ++ etag_header
Logger.info("Initiating request", uri: uri, verb: verb, query: encoded_query)
{
verb,
resource,
query,
Mojito.request(
verb,
uri,
headers
)
|> case do
{:ok, %{headers: headers} = response} ->
last_modified = Utilities.find_header(headers, "last-modified")
pages =
Utilities.find_header(headers, "x-pages", {"x-pages", "1"}) |> String.to_integer()
etag = Utilities.find_header(headers, "etag")
Logger.info("Merging important headers into response",
uri: uri,
last_modified: last_modified,
pages: pages,
etag: etag
)
{:ok,
Map.merge(response, %{uri: uri, last_modified: last_modified, pages: pages, etag: etag})}
{:error, _} = rejection ->
rejection
end
|> case do
{:ok, %{status_code: 200, etag: etag}} = response ->
Logger.info("Server: OK", status_code: 200)
Logger.info("Storing response etag into cache",
key: "esi/#{verb} #{uri}/etag",
etag: etag
)
{:ok, true} =
Cachex.put(
:live_data_import,
"esi/#{verb} #{uri}/etag",
etag
)
Logger.info("Storing response", key: "esi/#{verb} #{uri}/#{etag}")
{:ok, true} =
Cachex.put(
:live_data_import,
"esi/#{verb} #{uri}/#{etag}",
response
)
response
{:ok, %{status_code: 304, etag: etag}} ->
Logger.info("Server: Not Modified", status_code: 304)
Logger.info("Using stored response", key: "esi/#{verb} #{uri}/#{etag}", etag: etag)
Cachex.get!(
:live_data_import,
"esi/#{verb} #{uri}/#{etag}"
)
{:ok, %{status_code: 401}} = response ->
Logger.warn("Server: Unauthorized", status_code: 401)
{:error, response}
{:ok, %{status_code: 404}} = response ->
Logger.warn("Server: Not Found", status_code: 404)
{:error, response}
{:ok, %{status_code: 500}} = response ->
Logger.warn("Server: Internal Server Exception", status_code: 500)
{:error, response}
{:ok, %{status_code: 501}} = response ->
Logger.warn("Server: Not implemented", status_code: 501)
{:error, response}
{:ok, %{status_code: 502}} = response ->
Logger.warn("Server: Bad Gateway", status_code: 502)
{:retry, response}
{:ok, %{status_code: 503}} = response ->
Logger.warn("Server: Service Unavailable", status_code: 503)
{:retry, response}
{:ok, %{status_code: 504}} = response ->
Logger.warn("Server: Gateway Timeout", status_code: 504)
{:retry, response}
{:error, message} = rejection ->
Logger.warn("Client: Failure", message: message)
rejection
end
|> case do
{:retry, _} ->
Logger.info("Retrying request")
# NOTE: This is definitely going to break spec
fetch({verb, resource, query})
# NOTE: What about failure here?
resolution ->
resolution
end
}
end
# TODO: Write a paginate that allows for failure
@spec paginate({verb, resource, query, {:ok, map}}) :: Flow.t()
when verb: allowed_verbs, resource: String.t(), query: map
def paginate({verb, resource, query, {:ok, %{pages: pages} = response}})
when is_atom(verb) and is_binary(resource) and is_map(query) and is_map(response) and
is_integer(pages) do
Logger.info("Spinning up paginated requests", pages: pages)
1..pages
|> Flow.from_enumerable()
|> Flow.map(fn
1 ->
{verb, resource, query, {:ok, response}}
page ->
Logger.info("Spawning page request", page: page)
fetch({verb, resource, Map.merge(query, %{page: page})})
end)
end
# TODO: Write a parse that allows for failure
@spec parse({verb, resource, query, {:ok, map}}) ::
{:ok, any}
| {:error, atom}
| {:error, Jason.DecodeError.t()}
when verb: allowed_verbs, resource: String.t(), query: map
def parse({verb, resource, query, {:ok, %{body: body, uri: uri, etag: etag} = response}})
when is_atom(verb) and is_binary(resource) and is_map(query) and is_map(response) and
is_binary(etag) and is_binary(uri) do
Logger.info("Fetching JSON body from cache", key: "esi/#{verb} #{uri}/#{etag}/parsed")
Cachex.fetch(
:live_data_import,
"esi/#{verb} #{uri}/#{etag}/parsed",
fn _key ->
Logger.info("Decoding JSON")
Jason.decode(body)
|> case do
{:ok, data} ->
Logger.info("Committing JSON to cache")
{:commit, data}
{:error, _} = failure ->
Logger.info("Failed to decode JSON")
{:ignore, failure}
end
end
)
|> case do
{:ok, _} = success -> success
{:commit, data} -> {:ok, data}
{:ignore, data} -> {:ok, data}
rejection -> rejection
end
end
@spec cleanup({:error, any} | {:ok, response}) :: response | nil when response: any
def cleanup({:ok, response}) do
response
end
def cleanup({:error, message}) do
Logger.error(Kernel.inspect(message))
nil
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment