Created
March 26, 2018 17:45
-
-
Save danj3/e024f61cd6079c942d2e9d3a1b26d38d to your computer and use it in GitHub Desktop.
Elixir: GenStage producer from an :elmdb (LMDB) database using a cursor for linear sequence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule ElmdbStage do | |
@moduledoc """ | |
Create a GenStage for an :elmdb based LMDB database cursor. | |
This is for linearly reading from a db | |
""" | |
use GenStage | |
def start_link( env, db ) do | |
GenStage.start_link( __MODULE__, { env, db } ) | |
end | |
def init( { env, db } ) do | |
with { :ok, txn } = :elmdb.ro_txn_begin( env ), | |
{ :ok, cur } = :elmdb.ro_txn_cursor_open( txn, db ) do | |
{ :producer, { cur, txn, :first } } | |
else | |
any -> { :error, any } | |
end | |
end | |
def get( cur, pos ) do | |
with { :ok, key, val } <- :elmdb.ro_txn_cursor_get( cur, pos ) do | |
{ key, val } | |
else | |
:not_found -> nil | |
end | |
end | |
def get_demand( cur, demand, r \\ [] ) | |
def get_demand( cur, 0, [] ), do: nil | |
def get_demand( cur, 0, r ), do: Enum.reverse( r ) | |
def get_demand( cur, demand, r ) do | |
case get( cur, :next ) do | |
nil -> get_demand( cur, 0, r ) | |
g -> get_demand( cur, demand-1, [ g | r ] ) | |
end | |
end | |
def handle_demand( demand, { cur, txn, :first } ) do | |
case get( cur, :first ) do | |
nil -> { :stop, :normal, { cur, txn, nil } } | |
f -> { :noreply, [ f | | |
get_demand( cur, demand-1 ) | |
], { cur, txn, :next } } | |
end | |
end | |
def handle_demand( demand, { cur, txn, :next } ) do | |
case get_demand( cur, demand ) do | |
nil -> { :stop, :normal, { cur, txn, nil } } | |
r -> { :noreply, r, { cur, txn, :next } } | |
end | |
end | |
def handle_cancel( reason, state ) do | |
# IO.puts("closing cursor") | |
{ :stop, :normal, state } | |
end | |
def terminate( reason, { cur, txn, where } ) do | |
# IO.puts("closing cursor (terminate)") | |
:ok = :elmdb.ro_txn_cursor_close( cur ) | |
:ok = :elmdb.ro_txn_commit( txn ) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment