Last active
September 1, 2015 08:57
-
-
Save niku/866526b3feda16b30d47 to your computer and use it in GitHub Desktop.
複数の INPUT と OUTPUT を Stream で繋ぐ例
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 複数の INPUT と OUTPUT を Stream で繋ぐ例 | |
{:ok, manager} = GenEvent.start_link() | |
# | |
# 複数個の OUTPUT を取り扱える | |
# | |
Task.start(fn -> | |
for x <- GenEvent.stream(manager), do: IO.puts "OUTPUT#1: #{x}" | |
end) | |
# パターンマッチできる | |
Task.start(fn -> | |
for x <- GenEvent.stream(manager), | |
x == "foo" | |
do | |
IO.puts "OUTPUT#2: foo と書かれました" | |
end | |
end) | |
# | |
# 複数個の INPUT を取り扱える | |
# | |
# 3 秒毎に 1 足していく | |
Task.start(fn -> | |
Stream.iterate(0, &(&1+1)) | |
|> Stream.zip(Stream.interval(3000)) | |
|> Stream.each(fn {v, _} -> GenEvent.notify(manager, v) end) | |
|> Stream.run | |
end) | |
# シェルから手で入力できる | |
IO.stream(:stdio, :line) | |
|> Stream.each(fn line -> | |
GenEvent.notify(manager, String.rstrip(line)) | |
end) | |
|> Enum.to_list | |
# % elixir io_stream.ex | |
# OUTPUT#1: 0 | |
# OUTPUT#1: 1 | |
# OUTPUT#1: 2 | |
# OUTPUT#1: 3 | |
# hoge | |
# OUTPUT#1: hoge | |
# fuga | |
# OUTPUT#1: fuga | |
# OUTPUT#1: 4 | |
# foo | |
# OUTPUT#2: foo と書かれました | |
# OUTPUT#1: foo |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# v2 | |
# INPUT にメタ情報を付けて,OUTPUT で扱えるようにした例 | |
defmodule Message.Meta do | |
@undefined :__undefined__ | |
defstruct label: @undefined | |
end | |
defmodule Message do | |
defstruct meta: %Message.Meta{}, body: "" | |
end | |
defmodule Bot do | |
def run do | |
{:ok, manager} = GenEvent.start_link() | |
# | |
# 複数個の OUTPUT を取り扱える | |
# | |
Task.start(fn -> | |
for %Message{body: x} <- GenEvent.stream(manager), do: IO.puts "OUTPUT#1: #{x}" | |
end) | |
# パターンマッチできる | |
Task.start(fn -> | |
for %Message{body: x} <- GenEvent.stream(manager), | |
x == "foo" | |
do | |
IO.puts "OUTPUT#2: foo と書かれました" | |
end | |
end) | |
# | |
# 複数個の INPUT を取り扱える | |
# | |
# 3 秒毎に 1 足していく | |
Task.start(fn -> | |
Stream.iterate(0, &(&1+1)) | |
|> Stream.zip(Stream.interval(3000)) | |
|> Stream.each(fn {v, _} -> GenEvent.notify(manager, %Message{meta: %Message.Meta{label: :incriment}, body: v}) end) | |
|> Stream.run | |
end) | |
# シェルから手で入力できる | |
IO.stream(:stdio, :line) | |
|> Stream.each(fn line -> | |
GenEvent.notify(manager, %Message{meta: %Message.Meta{label: :shell}, body: String.rstrip(line)}) | |
end) | |
|> Enum.to_list | |
end | |
end | |
Bot.run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# v3 | |
# Stream を INPUT と OUTPUT の 2 つ持つことにした | |
defmodule Message.Meta do | |
@undefined :__undefined__ | |
defstruct label: @undefined | |
end | |
defmodule Message do | |
defstruct meta: %Message.Meta{}, body: "" | |
end | |
defmodule Bot do | |
def run do | |
{:ok, input} = GenEvent.start_link() | |
{:ok, output} = GenEvent.start_link() | |
# | |
# 複数個の Handler を取り扱える | |
# | |
Task.start(fn -> | |
for %Message{body: x} <- GenEvent.stream(input) do | |
GenEvent.notify(output, %Message{meta: %Message.Meta{label: :incriment}, body: "OUTPUT#1: #{x}"}) | |
end | |
end) | |
# パターンマッチできる | |
Task.start(fn -> | |
for %Message{body: x} <- GenEvent.stream(input), | |
x == "foo" | |
do | |
GenEvent.notify(output, %Message{meta: %Message.Meta{label: :incriment}, body: "OUTPUT#2: foo と書かれました"}) | |
end | |
end) | |
# | |
# 複数個の Adapter を取り扱える | |
# | |
# Adapter #1 IN | |
# 3 秒毎に 1 足していく | |
Task.start(fn -> | |
Stream.iterate(0, &(&1+1)) | |
|> Stream.zip(Stream.interval(3000)) | |
|> Stream.each(fn {v, _} -> GenEvent.notify(input, %Message{meta: %Message.Meta{label: :incriment}, body: v}) end) | |
|> Stream.run | |
end) | |
# Adapter #2 OUT | |
Task.start(fn -> | |
for %Message{body: x} <- GenEvent.stream(output), do: IO.puts x | |
end) | |
# Adapter #2 IN | |
# シェルから手で入力できる | |
IO.stream(:stdio, :line) | |
|> Stream.each(fn line -> | |
GenEvent.notify(input, %Message{meta: %Message.Meta{label: :shell}, body: String.rstrip(line)}) | |
end) | |
|> Enum.to_list | |
end | |
end | |
Bot.run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# v4 | |
# Handler をプラグイン化した | |
defmodule Message.Meta do | |
defstruct label: nil | |
end | |
defmodule Message do | |
defstruct meta: %Message.Meta{}, body: "" | |
end | |
defmodule Handler do | |
@doc false | |
defmacro __using__(_opts) do | |
quote do | |
@before_compile Handler | |
use GenServer | |
def handle_cast(message, state = {input, output}) do | |
case on(message) do | |
{:reply, {body, meta}} -> | |
meta = Map.merge(%Message.Meta{label: __MODULE__}, Enum.into(meta, Map.new)) | |
message = %Message{meta: meta, body: body} | |
GenEvent.notify(output, message) | |
{:noreply, state} | |
:noreply -> | |
{:noreply, state} | |
end | |
end | |
end | |
end | |
defmacro __before_compile__(_env) do | |
quote do | |
# Catch any pattern matching except we defined and then just ignore | |
def on(_), do: :noreply | |
end | |
end | |
end | |
defmodule Handler.Echo do | |
@moduledoc """ | |
入力を単に出力するだけのハンドラ | |
""" | |
use Handler | |
def on(%Message{body: body}), do: {:reply, {body, []}} | |
end | |
defmodule Handler.CheckFoo do | |
@moduledoc """ | |
"foo" と入力されたときにだけ "foo と書かれました" と出力するハンドラ | |
""" | |
use Handler | |
# パターンマッチで指定できる | |
def on(%Message{body: "foo"}), do: {:reply, {"foo と書かれました", []}} | |
end | |
defmodule Bot do | |
def run do | |
{:ok, input} = GenEvent.start_link() | |
{:ok, output} = GenEvent.start_link() | |
# | |
# 複数個の Handler を取り扱える | |
# | |
handlers = [Handler.Echo, Handler.CheckFoo] | |
for handle <- handlers do | |
{:ok, echo} = GenServer.start_link(handle, {input, output}) | |
Task.start(fn -> | |
for x <- GenEvent.stream(input) do | |
GenServer.cast(echo, x) | |
end | |
end) | |
end | |
# | |
# 複数個の Adapter を取り扱える | |
# | |
# Adapter #1 IN | |
# 3 秒毎に 1 足していく | |
Task.start(fn -> | |
Stream.iterate(0, &(&1+1)) | |
|> Stream.zip(Stream.interval(3000)) | |
|> Stream.each(fn {v, _} -> GenEvent.notify(input, %Message{meta: %Message.Meta{label: :incriment}, body: v}) end) | |
|> Stream.run | |
end) | |
# Adapter #2 OUT | |
Task.start(fn -> | |
for %Message{body: x} <- GenEvent.stream(output), do: IO.puts x | |
end) | |
# Adapter #2 IN | |
# シェルから手で入力できる | |
IO.stream(:stdio, :line) | |
|> Stream.each(fn line -> | |
GenEvent.notify(input, %Message{meta: %Message.Meta{label: :shell}, body: String.rstrip(line)}) | |
end) | |
|> Enum.to_list | |
end | |
end | |
Bot.run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# v5 | |
# Adapter をプラグイン化した | |
defmodule Message.Meta do | |
defstruct label: nil | |
end | |
defmodule Message do | |
defstruct meta: %Message.Meta{}, body: "" | |
end | |
defmodule Handler do | |
@doc false | |
defmacro __using__(_opts) do | |
quote do | |
@before_compile Handler | |
use GenServer | |
def handle_cast(message, state = {input, output}) do | |
case on(message) do | |
{:reply, {body, meta}} -> | |
meta = Map.merge(%Message.Meta{label: __MODULE__}, Enum.into(meta, Map.new)) | |
message = %Message{meta: meta, body: body} | |
GenEvent.notify(output, message) | |
{:noreply, state} | |
:noreply -> | |
{:noreply, state} | |
end | |
end | |
end | |
end | |
defmacro __before_compile__(_env) do | |
quote do | |
# Catch any pattern matching except we defined and then just ignore | |
def on(_), do: :noreply | |
end | |
end | |
end | |
defmodule Handler.Echo do | |
@moduledoc """ | |
入力を単に出力するだけのハンドラ | |
""" | |
use Handler | |
def on(%Message{body: body}), do: {:reply, {body, []}} | |
end | |
defmodule Handler.CheckFoo do | |
@moduledoc """ | |
"foo" と入力されたときにだけ "foo と書かれました" と出力するハンドラ | |
""" | |
use Handler | |
# パターンマッチで指定できる | |
def on(%Message{body: "foo"}), do: {:reply, {"foo と書かれました", []}} | |
end | |
defmodule Adapter do | |
@doc false | |
defmacro __using__(_opts) do | |
quote do | |
def input, do: nil | |
def output(x), do: nil | |
def generate_input_stream do | |
input() | |
|> Stream.map(fn {body, meta} -> | |
meta = Map.merge(%Message.Meta{label: __MODULE__}, Enum.into(meta, Map.new)) | |
%Message{meta: meta, body: body} | |
end) | |
end | |
def writeout_stream(output_stream) do | |
for x <- GenEvent.stream(output_stream), do: output(x) | |
end | |
defoverridable [input: 0, output: 1] | |
end | |
end | |
end | |
defmodule Adapter.Incriment do | |
use Adapter | |
def input do | |
Stream.iterate(0, &(&1+1)) | |
|> Stream.zip(Stream.interval(3000)) | |
|> Stream.map(fn {v, _} -> {v, []} end) | |
end | |
end | |
defmodule Adapter.Shell do | |
use Adapter | |
def input do | |
IO.stream(:stdio, :line) | |
|> Stream.map(&String.rstrip/1) | |
|> Stream.map(fn x -> {x, []} end) | |
end | |
def output(%Message{body: x}) do | |
IO.puts x | |
end | |
end | |
defmodule Bot do | |
def run do | |
{:ok, input} = GenEvent.start_link() | |
{:ok, output} = GenEvent.start_link() | |
# | |
# 複数個の Handler を取り扱える | |
# | |
handlers = [Handler.Echo, Handler.CheckFoo] | |
for handle <- handlers do | |
{:ok, echo} = GenServer.start_link(handle, {input, output}) | |
Task.start(fn -> | |
for x <- GenEvent.stream(input) do | |
GenServer.cast(echo, x) | |
end | |
end) | |
end | |
# | |
# 複数個の Adapter を取り扱える | |
# | |
# Adapter #1 IN | |
# 3 秒毎に 1 足していく | |
# | |
# Adapter #2 OUT | |
# シェルに出力する | |
# | |
# Adapter #2 IN | |
# シェルから手で入力できる | |
adapters = [Adapter.Incriment, Adapter.Shell] | |
for adapter <- adapters do | |
Task.start(fn -> | |
adapter.generate_input_stream() | |
|> Stream.each(&GenEvent.notify(input, &1)) | |
|> Stream.run | |
end) | |
Task.start(fn -> | |
adapter.writeout_stream(output) | |
end) | |
end | |
:timer.sleep(:infinity) | |
end | |
end | |
Bot.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment