Skip to content

Instantly share code, notes, and snippets.

@fchabouis
Last active March 31, 2022 05:12
Show Gist options
  • Save fchabouis/928d9d64230dd7e952e90ba3e8c8628d to your computer and use it in GitHub Desktop.
Save fchabouis/928d9d64230dd7e952e90ba3e8c8628d to your computer and use it in GitHub Desktop.
Elixir : Stream a paginated API to a Postgres database, all included
Mix.install([
{:ecto_sql, "~> 3.7.0"},
{:postgrex, "~> 0.15.0"},
{:httpoison, ">= 0.0.0"},
{:jason, ">= 0.0.0"}
])
require Logger
# SETUP DATABASE, THANKS TO https://github.com/wojtekmach/mix_install_examples
# put your credentials here to access your local postgres database:
Application.put_env(:foo, Repo, url: "ecto://postgres:postgres@localhost/datasets")
defmodule Repo do
use Ecto.Repo,
adapter: Ecto.Adapters.Postgres,
otp_app: :foo
end
defmodule Migration0 do
use Ecto.Migration
def change do
create table("datasets") do
add(:title, :string)
add(:external_id, :string)
end
end
end
defmodule Dataset do
use Ecto.Schema
schema "datasets" do
field(:title, :string)
field(:external_id, :string)
end
end
defmodule DB do
def setup do
children = [
Repo
]
_ = Repo.__adapter__().storage_down(Repo.config())
case Repo.__adapter__().storage_up(Repo.config()) do
:ok -> {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one)
{:error, :already_up} -> Logger.info("storage already up")
end
Ecto.Migrator.run(Repo, [{0, Migration0}], :up, all: true, log_sql: :debug)
end
end
DB.setup()
# SET UP API STREAMING, TAKEN FROM https://francis.chabouis.fr/posts/stream-api-with-elixir/
start_fun = fn url ->
fn -> {url} end
end
next_fun = fn
nil ->
{:halt, nil}
{url} ->
# we make a GET request on the url
case HTTPoison.get(url) do
{:ok, %{status_code: 200, body: body}} ->
# we decode the body from a string to an Elixir map
{:ok, content} = Jason.decode(body)
# we keep only what interests us; Here, what's in the data key
items = content |> Map.get("data", [])
Logger.info("fetch #{Enum.count(items)} items from the API coming from #{url}")
# we return the results, and the next url to query
{items, {Map.get(content, "next_page")}}
_ ->
# something went wrong with the API call, we stop the stream
{:halt, nil}
end
end
after_fun = fn _ -> nil end
stream_api = fn url ->
Stream.resource(
start_fun.(url),
next_fun,
after_fun
)
end
datasets = stream_api.("https://www.data.gouv.fr/api/1/datasets/")
# WE ARE READY
# START STREAMING THE API CONTENT TO THE DB!
Repo.transaction(fn ->
datasets
|> Stream.map(fn dataset ->
%{
external_id: dataset["id"],
title: dataset["title"]
}
end)
|> Stream.take(200)
|> Stream.chunk_every(100)
|> Stream.each(fn changesets -> Repo.insert_all(Dataset, changesets) end)
|> Stream.run()
end)
@fchabouis
Copy link
Author

fchabouis commented Oct 8, 2021

This gist is part of a blog post explaining in details what it does.

In a nutshell, If you run this Elixir script, it will create a Postgres database called datasets on your computer. And it will start streaming data from an external paginated API to your database.

All the Ecto database setup is taken from https://github.com/wojtekmach/mix_install_examples and I thank @wojtekmach for its nice work !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment