Skip to content

Instantly share code, notes, and snippets.

@Harrisonl
Created December 8, 2016 10:18
Show Gist options
  • Save Harrisonl/ab3cc18a0271d0a8d600b4c76c2d3f3d to your computer and use it in GitHub Desktop.
Save Harrisonl/ab3cc18a0271d0a8d600b4c76c2d3f3d to your computer and use it in GitHub Desktop.
alias Experimental.GenStage
defmodule Go do
def go(file) do
{:ok, producer} = GsProducer.start_link(file)
{:ok, consumer1} = GsConsumer.start_link
{:ok, consumer2} = GsConsumer.start_link
{:ok, consumer3} = GsConsumer.start_link
{:ok, consumer4} = GsConsumer.start_link
{:ok, consumer5} = GsConsumer.start_link
{:ok, consumer6} = GsConsumer.start_link
{:ok, consumer7} = GsConsumer.start_link
{:ok, consumer8} = GsConsumer.start_link
GenStage.sync_subscribe(consumer1, to: producer)
GenStage.sync_subscribe(consumer2, to: producer)
GenStage.sync_subscribe(consumer3, to: producer)
GenStage.sync_subscribe(consumer4, to: producer)
GenStage.sync_subscribe(consumer5, to: producer)
GenStage.sync_subscribe(consumer6, to: producer)
GenStage.sync_subscribe(consumer7, to: producer)
GenStage.sync_subscribe(consumer8, to: producer)
end
end
defmodule GsProducer do
use GenStage
def start_link(file) do
IO.inspect file
GenStage.start_link(__MODULE__, file, name: __MODULE__)
end
def init(file) do
stream = File.stream! file
{:producer, stream}
end
def handle_demand(demand, stream) when demand > 0 do
lines = stream |> Enum.take(demand)
{:noreply, lines, stream}
end
end
defmodule GsConsumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, [])
end
def init(lines) do
{:consumer, lines}
end
def handle_events(lines, _from, state) do
File.open("results.log", [:append], fn(file) ->
lines
|> Enum.map(fn(line) ->
case Regex.run(~r/GET \/[a-z]{2,10}\b([-a-zA-Z0-9@:%_\+.~#?&\/\/=]*)/, line) do
nil -> ""
[h | tail] -> h
end
end)
|> Enum.map(fn(x) ->
IO.binwrite(file, x)
end)
end)
{:noreply, [], nil}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment