Instantly share code, notes, and snippets.

Embed
What would you like to do?
Low Volume GenStage
defmodule DNSimpleNumbers do
def fetch() do
HTTPotion.get("http://localhost:4567/")
|> case do
%HTTPotion.Response{body: ""} -> nil
%HTTPotion.Response{body: number} -> String.to_integer(number)
end
end
def process(number) do
:math.sqrt(number)
end
end
defmodule Fetcher do
use GenStage
def init(_) do
schedule_next_poll()
{:producer, :there_is_no_state}
end
def handle_demand(demand, state) when demand > 0 do
{:noreply, [], state}
end
def handle_info(:poll, state) do
schedule_next_poll()
numbers = collect_numbers([], DNSimpleNumbers.fetch)
{:noreply, numbers, state}
end
defp collect_numbers(acc, nil), do: Enum.reverse(acc)
defp collect_numbers(acc, number), do: collect_numbers([number | acc], DNSimpleNumbers.fetch)
defp schedule_next_poll do
Process.send_after(self(), :poll, :timer.seconds(2))
end
end
require 'sinatra'
set :server, "webrick"
$numbers = [42]
get '/' do
$numbers.pop.to_s
end
Thread.new do
while(true) do
$numbers.push(rand(1..100))
sleep(rand(10..20))
end
end
defmodule Processor do
use GenStage
def init(_opts) do
{:consumer, :there_is_no_state, subscribe_to: [Fetcher]}
end
def handle_events(events, _from, state) do
Enum.map(events, &DNSimpleNumbers.process/1)
|> IO.inspect
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
[2018-08-17 11:40:56] INFO WEBrick 1.3.1
[2018-08-17 11:40:56] INFO ruby 2.4.2 (2017-09-14) [x86_64-darwin17]
== Sinatra (v2.0.3) has taken the stage on 4567 for development with backup from WEBrick
[2018-08-17 11:40:56] INFO WEBrick::HTTPServer#start: pid=61132 port=4567
127.0.0.1 - - [17/Aug/2018:11:41:03 +0200] "GET / HTTP/1.1" 200 1 0.0054
127.0.0.1 - - [17/Aug/2018:11:41:03 CEST] "GET / HTTP/1.1" 200 1
- -> /
127.0.0.1 - - [17/Aug/2018:11:41:05 +0200] "GET / HTTP/1.1" 200 2 0.0004
127.0.0.1 - - [17/Aug/2018:11:41:05 CEST] "GET / HTTP/1.1" 200 2
- -> /
127.0.0.1 - - [17/Aug/2018:11:41:07 +0200] "GET / HTTP/1.1" 200 2 0.0004
127.0.0.1 - - [17/Aug/2018:11:41:07 CEST] "GET / HTTP/1.1" 200 2
- -> /
127.0.0.1 - - [17/Aug/2018:11:41:09 +0200] "GET / HTTP/1.1" 200 - 0.0003
127.0.0.1 - - [17/Aug/2018:11:41:09 CEST] "GET / HTTP/1.1" 200 0
- -> /
127.0.0.1 - - [17/Aug/2018:11:41:11 +0200] "GET / HTTP/1.1" 200 - 0.0004
127.0.0.1 - - [17/Aug/2018:11:41:11 CEST] "GET / HTTP/1.1" 200 0
- -> /
HTTPotion.start
DNSimpleNumbers.fetch
|> DNSimpleNumbers.process
|> IO.inspect
HTTPotion.start
GenStage.start_link(Fetcher, :there_is_no_state, name: Fetcher)
GenStage.start_link(Processor, :there_is_no_state, name: Processor)
Erlang/OTP 21 [erts-10.0.4] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
[9.539392014169456, 6.4031242374328485, 7.416198487095663, 2.8284271247461903,
8.48528137423857, 7.280109889280518, 9.486832980505138, 9.486832980505138,
8.366600265340756, 9.643650760992955, 1.4142135623730951, 6.244997998398398,
3.7416573867739413, 3.0, 3.872983346207417, 9.219544457292887,
8.602325267042627, 6.0, 6.48074069840786]
[7.54983443527075]
[7.483314773547883]
iex(1)> %
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment