Skip to content

Instantly share code, notes, and snippets.

@joachimdb
Last active October 9, 2019 00:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joachimdb/c1b9ebe2ac8d8d70d6678bba3337852a to your computer and use it in GitHub Desktop.
Save joachimdb/c1b9ebe2ac8d8d70d6678bba3337852a to your computer and use it in GitHub Desktop.
defmodule MarketState.Filter.ActivationPrice do
use GenStage
alias __MODULE__
@moduledoc """
GenServer/Genstage that consumes `market_state` events and produces `{:breach, market, price}` events
- Call `monitor(market, breach_price)` to produce events when the price of `market` breaches `breach_price`
- Call `demonitor(market, breach_price)` to stop triggering events
"""
#
# Client
#
def start_link() do
GenStage.start_link(__MODULE__, nil, name: __MODULE__)
end
def monitor(market, breach_price) do
GenServer.cast(__MODULE__, {:monitor, market, breach_price})
end
def demonitor(market, breach_price) do
GenServer.cast(__MODULE__, {:monitor, market, breach_price})
end
#
# GenServer
#
def init() do
initial_state = %{}
{:producer_consumer, initial_state, subscribe_to: [MarketState.Producer]}
end
@doc """
iex> handle_cast({:monitor, "ETH-BTC", "0.02"}, %{})
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.02"], last_price: nil}}}
iex> handle_cast({:monitor, "ETH-BTC", "0.03"}, %{"ETH-BTC" => %{breach_prices: ["0.02"], last_price: nil}})
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"], last_price: nil}}}
iex> handle_cast({:monitor, "ADA-BTC", "0.00012"}, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"], last_price: nil}})
{:noreply, %{"ADA-BTC" => %{breach_prices: ["0.00012"], last_price: nil},
"ETH-BTC" => %{breach_prices: ["0.03", "0.02"], last_price: nil}}}
"""
def handle_cast({:monitor, market, breach_price}, state) do
# add breach_price to the list of breach prices for market
new_state =
Map.update(
state,
market,
%{breach_prices: [breach_price], last_price: nil},
# TODO: sort price points and optimize check algorithm
fn s -> Map.update!(s, :breach_prices, &[breach_price | &1]) end
)
{:noreply, new_state}
end
@doc """
iex> handle_cast({:demonitor, "ETH-BTC", "0.02"}, %{})
{:noreply, %{}}
iex> handle_cast({:demonitor, "ETH-BTC", "0.02"}, %{"ETH-BTC" => %{breach_prices: ["0.02"]}})
{:noreply, %{}}
iex> handle_cast({:demonitor, "ETH-BTC", "0.03"}, %{"ETH-BTC" => %{breach_prices: ["0.02"]}})
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.02"]}}}
iex> handle_cast({:demonitor, "ADA-BTC", "0.00012"}, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"]}, "ADA-BTC" => %{breach_prices: ["0.00012"]}})
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"]}}}
iex> handle_cast({:demonitor, "ETH-BTC", "0.02"}, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"]}, "ADA-BTC" => %{breach_prices: ["0.00012"]}})
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.03"]}, "ADA-BTC" => %{breach_prices: ["0.00012"]}}}
"""
def handle_cast({:demonitor, market, breach_price}, state) do
# Remove breach_price to the list of breach prices for market. Remove the entry for market
# alltogether if there are no other breach prices
new_state =
case Map.get(state, market) do
nil ->
state
entry ->
new_breach_prices = Enum.reject(entry.breach_prices, &Decimal.equal?(&1, breach_price))
case new_breach_prices do
[] -> Map.delete(state, market)
pcs -> Map.update!(state, market, &Map.put(&1, :breach_prices, new_breach_prices))
end
end
{:noreply, new_state}
end
#
# GenStage
#
@doc """
Consumes market_state events and produces `{:breach, market, price}` events
"""
def handle_events(market_states, _from, state) do
# market_states need to be matched against the markets and breach points in state
{state, breach_events} =
market_states
# reject unmonitored markets
|> Enum.filter(&Map.has_key?(state, &1.market))
|> Enum.reduce({state, []}, fn market_state, {state, breach_events} ->
{state, breached} = update_state_and_compute_breaches(state, market_state)
# TODO: optimize by making use of fact that breach pints are sorted
{state,
breach_events ++ Enum.map(breached, fn b -> {:breach, market_state.market, b} end)}
end)
{:noreply, breach_events, state}
end
#
# Implementation
#
@doc """
iex> update_state_and_compute_breaches(%{avg_price: 0.01, market: "ETH-BTC"}, %{})
{%{}, []}
iex> update_state_and_compute_breaches(%{avg_price: 0.01, market: "ETH-BTC"}, %{"ETH-BTC" => %{breach_prices: [0.1], last_price: nil}})
{%{"ETH-BTC" => %{breach_prices: [0.1], last_price: nil, nil: 0.01}}, []}
iex> update_state_and_compute_breaches(%{avg_price: 0.3, market: "ETH-BTC"}, %{"ETH-BTC" => %{breach_prices: [0.1], last_price: nil}})
{%{"ETH-BTC" => %{breach_prices: [0.1], last_price: 0.3}}, []}
iex> update_state_and_compute_breaches(%{avg_price: 0.15, market: "ETH-BTC"}, %{"ETH-BTC" => %{breach_prices: [0.1, 0.2, 0.4], last_price: 0.3}})
{%{"ETH-BTC" => %{breach_prices: [0.1, 0.2, 0.4], last_price: 0.15}}, [0.2]}
"""
def update_state_and_compute_breaches(market_state, state) do
market = market_state.market
case Map.get(state, market) do
nil ->
{state, []}
%{last_price: last_price, breach_prices: breach_prices} ->
current_price = market_state.avg_price
new_state = put_in(state, [market, :last_price], current_price)
breached =
if is_nil(last_price) do
[]
else
Enum.filter(breach_prices, &breached?(last_price, current_price, &1))
end
{new_state, breached}
end
end
@doc """
Return true if breach_point lies in between previous and current value
iex> breached?(0, 1, 2)
false
iex> breached?(0, 2, 1)
true
iex> breached?(2, 0, 1)
true
iex> breached?(2, 1, 0)
false
"""
def breached?(previous, current, breach_point) do
if previous <= breach_point do
current >= breach_point
else
current <= breach_point
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment