Skip to content

Instantly share code, notes, and snippets.

@nwalker
Last active November 1, 2021 18:16
Show Gist options
  • Save nwalker/f98bdcc3e02b9e59fd4128b5e1c9f6be to your computer and use it in GitHub Desktop.
Save nwalker/f98bdcc3e02b9e59fd4128b5e1c9f6be to your computer and use it in GitHub Desktop.
defmodule DbcEpgsql do
# really basic dbconnection impl for postgres using epgsql
# * no status support
# * no savepoints and nested transactions
# * no error handling
# * hardcoded connection params
#
# BUT: tests with tracing enabled for quick understanding what's going on
# just change params in connect/1, run DbcEpgsql.Test.query in iex and start exploring what's going on
# $ iex -S mix
# iex(1)> DbcEpgsql.Test.query
# .... tons of info!
defmodule State do
defstruct conn: nil
end
defmodule Query do
defstruct s: nil, ps: nil, name: nil
defimpl DBConnection.Query do
def parse(q, _opts), do: q
def describe(q, _), do: q
def encode(_q, args, _), do: args
def decode(_q, res, _), do: res
end
end
use DBConnection
def connect(opts) do
opts = Map.new(opts) |> Map.merge(%{
host: 'localhost', port: 5432,
username: 'test', password: 'test',
database: 'static_test',
})
case :epgsql.connect(opts) do
{:ok, e_conn} -> {:ok, %State{conn: e_conn}}
{:error, reason} -> {:error, DBConnection.ConnectionError.exception("epgsql", reason)}
end
end
def disconnect(_err, state) do
:epgsql.close(state.conn)
end
def checkout(state) do
{:ok, state}
end
def ping(state) do
case :epgsql.sync(state.conn) do
:ok -> {:ok, state}
{:error, e} -> throw(e)
end
end
def handle_begin(_opts, state) do
# savepoints should be here
case :epgsql.squery(state.conn, "BEGIN") do
{:ok, _, _} -> {:ok, nil, state}
other -> throw(other)
end
end
def handle_commit(_opts, state) do
case :epgsql.squery(state.conn, "COMMIT") do
{:ok, _, _} -> {:ok, nil, state}
other -> throw(other)
end
end
def handle_rollback(_opts, state) do
case :epgsql.squery(state.conn, "ROLLBACK") do
{:ok, _, _} -> {:ok, nil, state}
other -> throw(other)
end
end
def handle_prepare(query, _opts, state) do
name = query.name || []
case :epgsql.parse(state.conn, name, query.s, []) do
{:ok, ps} -> {:ok, %{query | ps: ps, name: name}, state}
other -> throw(other)
end
end
def handle_declare(query, params, opts, state) do
rows = opts[:max_rows] || 0
cursor_name = make_ref() |> :erlang.term_to_binary() |> Base.encode64 |> to_charlist()
case :epgsql.bind(state.conn, query.ps, cursor_name, params) do
:ok -> {:ok, query, {:cursor, cursor_name, rows}, state}
other -> throw(other)
end
end
def handle_execute(query, params, _opts, state) do
case :epgsql.prepared_query(state.conn, query.ps, params) do
success when elem(success, 0) == :ok -> {:ok, query, success, state}
other -> throw(other)
end
end
def handle_fetch(query, {:cursor, name, size}, _opts, state) do
case :epgsql.execute(state.conn, query.ps, name, size) do
{:partial, rows} -> {:cont, rows, state}
success when elem(success, 0) == :ok -> {:halt, success, state}
other -> throw(other)
end
end
def handle_deallocate(_query, {:cursor, name, _}, _opts, state) do
case :epgsql.close(state.conn, :portal, name) do
:ok -> {:ok, nil, state}
{:error, err} -> throw(err)
end
end
def handle_close(query, _opts, state) do
case :epgsql.close(state.conn, query.ps) do
:ok -> {:ok, nil, state}
{:error, err} -> throw(err)
end
end
defmodule Test do
alias DbcEpgsql.Query, as: Q
alias :dbg, as: Dbg
def query() do
setup_trace()
{:ok, pool} = DBConnection.start_link(DbcEpgsql, [])
try do
DBConnection.transaction(pool, fn conn ->
q1 = %Q{name: "q1", s: "SELECT NULL as test, $1::integer as test2"}
q2 = %Q{name: "q2", s: "SELECT tab.* FROM static.revision AS tab LIMIT 0"}
{:ok, ps1, _} = DBConnection.prepare_execute(conn, q1, [1]) |> IO.inspect(label: "prepare_execute")
{:ok, _ps2, _} = DBConnection.prepare_execute(conn, q2, []) |> IO.inspect(label: "prepare_execute")
{:ok, _, _} = DBConnection.execute(conn, ps1, [2]) |> IO.inspect(label: "execute")
# {:ok, _} = DBConnection.close(conn, ps)
# {:ok, _} = DBConnection.close(conn, ps)
end)
rescue
e -> IO.inspect(e)
catch
c, e -> IO.inspect({c, e})
after
stop_trace()
Process.exit(pool, :kill)
end
end
def stream() do
setup_trace()
{:ok, pool} = DBConnection.start_link(DbcEpgsql, [])
try do
DBConnection.transaction(pool, fn conn ->
q1 = %Q{s: "SELECT unnest($1::int[])", name: "some"}
# q2 = %Q{name: "q2", s: "SELECT tab.* FROM static.product_out AS tab LIMIT 0"}
{:ok, ps1} = DBConnection.prepare(conn, q1) |> IO.inspect(label: "prepare")
# DBConnection.execute(conn, ps1, [[1, 2, 3, 4, 5]]) |> IO.inspect(label: "execute")
DBConnection.stream(conn, ps1, [[1, 2, 3, 4, 5]], max_rows: 2) |> Enum.to_list() |> IO.inspect(label: "stream")
{:ok, _} = DBConnection.close(conn, ps1)
# {:ok, _} = DBConnection.close(conn, ps)
end)
rescue
e -> IO.inspect(e)
catch
c, e -> IO.inspect({c, e})
after
stop_trace()
Process.exit(pool, :kill)
end
end
def setup_trace() do
Dbg.start()
Dbg.tracer()
Dbg.tpl(DbcEpgsql, :_, :_, [{:_, [], [{:return_trace}]}])
Dbg.tpl(DBConnection.Query.DbcEpgsql.Query, :_, :_, [{:_, [], [{:return_trace}]}])
Dbg.tp(:epgsql, :_, :_, [{:_, [], [{:return_trace}]}])
Dbg.p(:all, :c)
end
def stop_trace(), do: Dbg.stop_clear()
end
end
defmodule DbcEpgsql.MixProject do
use Mix.Project
def project do
[
app: :dbc_epgsql,
version: "0.1.0",
elixir: "~> 1.12",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger, :runtime_tools]
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:db_connection, "~> 2.4.1"},
{:epgsql, git: "https://github.com/epgsql/epgsql.git", tag: "4.6.0"},
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
end
%{
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"db_connection": {:hex, :db_connection, "2.4.1", "6411f6e23f1a8b68a82fa3a36366d4881f21f47fc79a9efb8c615e62050219da", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea36d226ec5999781a9a8ad64e5d8c4454ecedc7a4d643e4832bf08efca01f00"},
"epgsql": {:git, "https://github.com/epgsql/epgsql.git", "f7530f63ae40ea2b81bae7d4a33292212349b761", [tag: "4.6.0"]},
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment