Last active
October 9, 2019 00:15
-
-
Save joachimdb/c1b9ebe2ac8d8d70d6678bba3337852a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MarketState.Filter.ActivationPrice do | |
use GenStage | |
alias __MODULE__ | |
@moduledoc """ | |
GenServer/Genstage that consumes `market_state` events and produces `{:breach, market, price}` events | |
- Call `monitor(market, breach_price)` to produce events when the price of `market` breaches `breach_price` | |
- Call `demonitor(market, breach_price)` to stop triggering events | |
""" | |
# | |
# Client | |
# | |
def start_link() do | |
GenStage.start_link(__MODULE__, nil, name: __MODULE__) | |
end | |
def monitor(market, breach_price) do | |
GenServer.cast(__MODULE__, {:monitor, market, breach_price}) | |
end | |
def demonitor(market, breach_price) do | |
GenServer.cast(__MODULE__, {:monitor, market, breach_price}) | |
end | |
# | |
# GenServer | |
# | |
def init() do | |
initial_state = %{} | |
{:producer_consumer, initial_state, subscribe_to: [MarketState.Producer]} | |
end | |
@doc """ | |
iex> handle_cast({:monitor, "ETH-BTC", "0.02"}, %{}) | |
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.02"], last_price: nil}}} | |
iex> handle_cast({:monitor, "ETH-BTC", "0.03"}, %{"ETH-BTC" => %{breach_prices: ["0.02"], last_price: nil}}) | |
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"], last_price: nil}}} | |
iex> handle_cast({:monitor, "ADA-BTC", "0.00012"}, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"], last_price: nil}}) | |
{:noreply, %{"ADA-BTC" => %{breach_prices: ["0.00012"], last_price: nil}, | |
"ETH-BTC" => %{breach_prices: ["0.03", "0.02"], last_price: nil}}} | |
""" | |
def handle_cast({:monitor, market, breach_price}, state) do | |
# add breach_price to the list of breach prices for market | |
new_state = | |
Map.update( | |
state, | |
market, | |
%{breach_prices: [breach_price], last_price: nil}, | |
# TODO: sort price points and optimize check algorithm | |
fn s -> Map.update!(s, :breach_prices, &[breach_price | &1]) end | |
) | |
{:noreply, new_state} | |
end | |
@doc """ | |
iex> handle_cast({:demonitor, "ETH-BTC", "0.02"}, %{}) | |
{:noreply, %{}} | |
iex> handle_cast({:demonitor, "ETH-BTC", "0.02"}, %{"ETH-BTC" => %{breach_prices: ["0.02"]}}) | |
{:noreply, %{}} | |
iex> handle_cast({:demonitor, "ETH-BTC", "0.03"}, %{"ETH-BTC" => %{breach_prices: ["0.02"]}}) | |
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.02"]}}} | |
iex> handle_cast({:demonitor, "ADA-BTC", "0.00012"}, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"]}, "ADA-BTC" => %{breach_prices: ["0.00012"]}}) | |
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"]}}} | |
iex> handle_cast({:demonitor, "ETH-BTC", "0.02"}, %{"ETH-BTC" => %{breach_prices: ["0.03", "0.02"]}, "ADA-BTC" => %{breach_prices: ["0.00012"]}}) | |
{:noreply, %{"ETH-BTC" => %{breach_prices: ["0.03"]}, "ADA-BTC" => %{breach_prices: ["0.00012"]}}} | |
""" | |
def handle_cast({:demonitor, market, breach_price}, state) do | |
# Remove breach_price to the list of breach prices for market. Remove the entry for market | |
# alltogether if there are no other breach prices | |
new_state = | |
case Map.get(state, market) do | |
nil -> | |
state | |
entry -> | |
new_breach_prices = Enum.reject(entry.breach_prices, &Decimal.equal?(&1, breach_price)) | |
case new_breach_prices do | |
[] -> Map.delete(state, market) | |
pcs -> Map.update!(state, market, &Map.put(&1, :breach_prices, new_breach_prices)) | |
end | |
end | |
{:noreply, new_state} | |
end | |
# | |
# GenStage | |
# | |
@doc """ | |
Consumes market_state events and produces `{:breach, market, price}` events | |
""" | |
def handle_events(market_states, _from, state) do | |
# market_states need to be matched against the markets and breach points in state | |
{state, breach_events} = | |
market_states | |
# reject unmonitored markets | |
|> Enum.filter(&Map.has_key?(state, &1.market)) | |
|> Enum.reduce({state, []}, fn market_state, {state, breach_events} -> | |
{state, breached} = update_state_and_compute_breaches(state, market_state) | |
# TODO: optimize by making use of fact that breach pints are sorted | |
{state, | |
breach_events ++ Enum.map(breached, fn b -> {:breach, market_state.market, b} end)} | |
end) | |
{:noreply, breach_events, state} | |
end | |
# | |
# Implementation | |
# | |
@doc """ | |
iex> update_state_and_compute_breaches(%{avg_price: 0.01, market: "ETH-BTC"}, %{}) | |
{%{}, []} | |
iex> update_state_and_compute_breaches(%{avg_price: 0.01, market: "ETH-BTC"}, %{"ETH-BTC" => %{breach_prices: [0.1], last_price: nil}}) | |
{%{"ETH-BTC" => %{breach_prices: [0.1], last_price: nil, nil: 0.01}}, []} | |
iex> update_state_and_compute_breaches(%{avg_price: 0.3, market: "ETH-BTC"}, %{"ETH-BTC" => %{breach_prices: [0.1], last_price: nil}}) | |
{%{"ETH-BTC" => %{breach_prices: [0.1], last_price: 0.3}}, []} | |
iex> update_state_and_compute_breaches(%{avg_price: 0.15, market: "ETH-BTC"}, %{"ETH-BTC" => %{breach_prices: [0.1, 0.2, 0.4], last_price: 0.3}}) | |
{%{"ETH-BTC" => %{breach_prices: [0.1, 0.2, 0.4], last_price: 0.15}}, [0.2]} | |
""" | |
def update_state_and_compute_breaches(market_state, state) do | |
market = market_state.market | |
case Map.get(state, market) do | |
nil -> | |
{state, []} | |
%{last_price: last_price, breach_prices: breach_prices} -> | |
current_price = market_state.avg_price | |
new_state = put_in(state, [market, :last_price], current_price) | |
breached = | |
if is_nil(last_price) do | |
[] | |
else | |
Enum.filter(breach_prices, &breached?(last_price, current_price, &1)) | |
end | |
{new_state, breached} | |
end | |
end | |
@doc """ | |
Return true if breach_point lies in between previous and current value | |
iex> breached?(0, 1, 2) | |
false | |
iex> breached?(0, 2, 1) | |
true | |
iex> breached?(2, 0, 1) | |
true | |
iex> breached?(2, 1, 0) | |
false | |
""" | |
def breached?(previous, current, breach_point) do | |
if previous <= breach_point do | |
current >= breach_point | |
else | |
current <= breach_point | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment