Skip to content

Instantly share code, notes, and snippets.

@niku
Last active September 1, 2015 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niku/866526b3feda16b30d47 to your computer and use it in GitHub Desktop.
Save niku/866526b3feda16b30d47 to your computer and use it in GitHub Desktop.
複数の INPUT と OUTPUT を Stream で繋ぐ例
# 複数の INPUT と OUTPUT を Stream で繋ぐ例
{:ok, manager} = GenEvent.start_link()
#
# 複数個の OUTPUT を取り扱える
#
Task.start(fn ->
for x <- GenEvent.stream(manager), do: IO.puts "OUTPUT#1: #{x}"
end)
# パターンマッチできる
Task.start(fn ->
for x <- GenEvent.stream(manager),
x == "foo"
do
IO.puts "OUTPUT#2: foo と書かれました"
end
end)
#
# 複数個の INPUT を取り扱える
#
# 3 秒毎に 1 足していく
Task.start(fn ->
Stream.iterate(0, &(&1+1))
|> Stream.zip(Stream.interval(3000))
|> Stream.each(fn {v, _} -> GenEvent.notify(manager, v) end)
|> Stream.run
end)
# シェルから手で入力できる
IO.stream(:stdio, :line)
|> Stream.each(fn line ->
GenEvent.notify(manager, String.rstrip(line))
end)
|> Enum.to_list
# % elixir io_stream.ex
# OUTPUT#1: 0
# OUTPUT#1: 1
# OUTPUT#1: 2
# OUTPUT#1: 3
# hoge
# OUTPUT#1: hoge
# fuga
# OUTPUT#1: fuga
# OUTPUT#1: 4
# foo
# OUTPUT#2: foo と書かれました
# OUTPUT#1: foo
# v2
# INPUT にメタ情報を付けて,OUTPUT で扱えるようにした例
defmodule Message.Meta do
@undefined :__undefined__
defstruct label: @undefined
end
defmodule Message do
defstruct meta: %Message.Meta{}, body: ""
end
defmodule Bot do
def run do
{:ok, manager} = GenEvent.start_link()
#
# 複数個の OUTPUT を取り扱える
#
Task.start(fn ->
for %Message{body: x} <- GenEvent.stream(manager), do: IO.puts "OUTPUT#1: #{x}"
end)
# パターンマッチできる
Task.start(fn ->
for %Message{body: x} <- GenEvent.stream(manager),
x == "foo"
do
IO.puts "OUTPUT#2: foo と書かれました"
end
end)
#
# 複数個の INPUT を取り扱える
#
# 3 秒毎に 1 足していく
Task.start(fn ->
Stream.iterate(0, &(&1+1))
|> Stream.zip(Stream.interval(3000))
|> Stream.each(fn {v, _} -> GenEvent.notify(manager, %Message{meta: %Message.Meta{label: :incriment}, body: v}) end)
|> Stream.run
end)
# シェルから手で入力できる
IO.stream(:stdio, :line)
|> Stream.each(fn line ->
GenEvent.notify(manager, %Message{meta: %Message.Meta{label: :shell}, body: String.rstrip(line)})
end)
|> Enum.to_list
end
end
Bot.run
# v3
# Stream を INPUT と OUTPUT の 2 つ持つことにした
defmodule Message.Meta do
@undefined :__undefined__
defstruct label: @undefined
end
defmodule Message do
defstruct meta: %Message.Meta{}, body: ""
end
defmodule Bot do
def run do
{:ok, input} = GenEvent.start_link()
{:ok, output} = GenEvent.start_link()
#
# 複数個の Handler を取り扱える
#
Task.start(fn ->
for %Message{body: x} <- GenEvent.stream(input) do
GenEvent.notify(output, %Message{meta: %Message.Meta{label: :incriment}, body: "OUTPUT#1: #{x}"})
end
end)
# パターンマッチできる
Task.start(fn ->
for %Message{body: x} <- GenEvent.stream(input),
x == "foo"
do
GenEvent.notify(output, %Message{meta: %Message.Meta{label: :incriment}, body: "OUTPUT#2: foo と書かれました"})
end
end)
#
# 複数個の Adapter を取り扱える
#
# Adapter #1 IN
# 3 秒毎に 1 足していく
Task.start(fn ->
Stream.iterate(0, &(&1+1))
|> Stream.zip(Stream.interval(3000))
|> Stream.each(fn {v, _} -> GenEvent.notify(input, %Message{meta: %Message.Meta{label: :incriment}, body: v}) end)
|> Stream.run
end)
# Adapter #2 OUT
Task.start(fn ->
for %Message{body: x} <- GenEvent.stream(output), do: IO.puts x
end)
# Adapter #2 IN
# シェルから手で入力できる
IO.stream(:stdio, :line)
|> Stream.each(fn line ->
GenEvent.notify(input, %Message{meta: %Message.Meta{label: :shell}, body: String.rstrip(line)})
end)
|> Enum.to_list
end
end
Bot.run
# v4
# Handler をプラグイン化した
defmodule Message.Meta do
defstruct label: nil
end
defmodule Message do
defstruct meta: %Message.Meta{}, body: ""
end
defmodule Handler do
@doc false
defmacro __using__(_opts) do
quote do
@before_compile Handler
use GenServer
def handle_cast(message, state = {input, output}) do
case on(message) do
{:reply, {body, meta}} ->
meta = Map.merge(%Message.Meta{label: __MODULE__}, Enum.into(meta, Map.new))
message = %Message{meta: meta, body: body}
GenEvent.notify(output, message)
{:noreply, state}
:noreply ->
{:noreply, state}
end
end
end
end
defmacro __before_compile__(_env) do
quote do
# Catch any pattern matching except we defined and then just ignore
def on(_), do: :noreply
end
end
end
defmodule Handler.Echo do
@moduledoc """
入力を単に出力するだけのハンドラ
"""
use Handler
def on(%Message{body: body}), do: {:reply, {body, []}}
end
defmodule Handler.CheckFoo do
@moduledoc """
"foo" と入力されたときにだけ "foo と書かれました" と出力するハンドラ
"""
use Handler
# パターンマッチで指定できる
def on(%Message{body: "foo"}), do: {:reply, {"foo と書かれました", []}}
end
defmodule Bot do
def run do
{:ok, input} = GenEvent.start_link()
{:ok, output} = GenEvent.start_link()
#
# 複数個の Handler を取り扱える
#
handlers = [Handler.Echo, Handler.CheckFoo]
for handle <- handlers do
{:ok, echo} = GenServer.start_link(handle, {input, output})
Task.start(fn ->
for x <- GenEvent.stream(input) do
GenServer.cast(echo, x)
end
end)
end
#
# 複数個の Adapter を取り扱える
#
# Adapter #1 IN
# 3 秒毎に 1 足していく
Task.start(fn ->
Stream.iterate(0, &(&1+1))
|> Stream.zip(Stream.interval(3000))
|> Stream.each(fn {v, _} -> GenEvent.notify(input, %Message{meta: %Message.Meta{label: :incriment}, body: v}) end)
|> Stream.run
end)
# Adapter #2 OUT
Task.start(fn ->
for %Message{body: x} <- GenEvent.stream(output), do: IO.puts x
end)
# Adapter #2 IN
# シェルから手で入力できる
IO.stream(:stdio, :line)
|> Stream.each(fn line ->
GenEvent.notify(input, %Message{meta: %Message.Meta{label: :shell}, body: String.rstrip(line)})
end)
|> Enum.to_list
end
end
Bot.run
# v5
# Adapter をプラグイン化した
defmodule Message.Meta do
defstruct label: nil
end
defmodule Message do
defstruct meta: %Message.Meta{}, body: ""
end
defmodule Handler do
@doc false
defmacro __using__(_opts) do
quote do
@before_compile Handler
use GenServer
def handle_cast(message, state = {input, output}) do
case on(message) do
{:reply, {body, meta}} ->
meta = Map.merge(%Message.Meta{label: __MODULE__}, Enum.into(meta, Map.new))
message = %Message{meta: meta, body: body}
GenEvent.notify(output, message)
{:noreply, state}
:noreply ->
{:noreply, state}
end
end
end
end
defmacro __before_compile__(_env) do
quote do
# Catch any pattern matching except we defined and then just ignore
def on(_), do: :noreply
end
end
end
defmodule Handler.Echo do
@moduledoc """
入力を単に出力するだけのハンドラ
"""
use Handler
def on(%Message{body: body}), do: {:reply, {body, []}}
end
defmodule Handler.CheckFoo do
@moduledoc """
"foo" と入力されたときにだけ "foo と書かれました" と出力するハンドラ
"""
use Handler
# パターンマッチで指定できる
def on(%Message{body: "foo"}), do: {:reply, {"foo と書かれました", []}}
end
defmodule Adapter do
@doc false
defmacro __using__(_opts) do
quote do
def input, do: nil
def output(x), do: nil
def generate_input_stream do
input()
|> Stream.map(fn {body, meta} ->
meta = Map.merge(%Message.Meta{label: __MODULE__}, Enum.into(meta, Map.new))
%Message{meta: meta, body: body}
end)
end
def writeout_stream(output_stream) do
for x <- GenEvent.stream(output_stream), do: output(x)
end
defoverridable [input: 0, output: 1]
end
end
end
defmodule Adapter.Incriment do
use Adapter
def input do
Stream.iterate(0, &(&1+1))
|> Stream.zip(Stream.interval(3000))
|> Stream.map(fn {v, _} -> {v, []} end)
end
end
defmodule Adapter.Shell do
use Adapter
def input do
IO.stream(:stdio, :line)
|> Stream.map(&String.rstrip/1)
|> Stream.map(fn x -> {x, []} end)
end
def output(%Message{body: x}) do
IO.puts x
end
end
defmodule Bot do
def run do
{:ok, input} = GenEvent.start_link()
{:ok, output} = GenEvent.start_link()
#
# 複数個の Handler を取り扱える
#
handlers = [Handler.Echo, Handler.CheckFoo]
for handle <- handlers do
{:ok, echo} = GenServer.start_link(handle, {input, output})
Task.start(fn ->
for x <- GenEvent.stream(input) do
GenServer.cast(echo, x)
end
end)
end
#
# 複数個の Adapter を取り扱える
#
# Adapter #1 IN
# 3 秒毎に 1 足していく
#
# Adapter #2 OUT
# シェルに出力する
#
# Adapter #2 IN
# シェルから手で入力できる
adapters = [Adapter.Incriment, Adapter.Shell]
for adapter <- adapters do
Task.start(fn ->
adapter.generate_input_stream()
|> Stream.each(&GenEvent.notify(input, &1))
|> Stream.run
end)
Task.start(fn ->
adapter.writeout_stream(output)
end)
end
:timer.sleep(:infinity)
end
end
Bot.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment