Skip to content

Instantly share code, notes, and snippets.

@matlc
Last active March 11, 2021 07:12
Show Gist options
  • Save matlc/4854510fc1b5d69cec540b03e106a498 to your computer and use it in GitHub Desktop.
Save matlc/4854510fc1b5d69cec540b03e106a498 to your computer and use it in GitHub Desktop.
defmodule Filter do
def start_flow do
streams()
|> Flow.from_enumerables()
|> Flow.filter(&Regex.match?(~r/publishing/, &1))
|> Flow.map(&get_key_and_payload(&1))
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(fn -> %{} end, fn x, acc ->
Map.put_new(acc, elem(x, 1), elem(x, 2))
end)
end
defp streams() do
for file <- File.ls!("/logs") do
File.stream!("/logs/#{file}", read_ahead: 100_000)
end
end
defp get_key(line) do
cond do
service?(line) ->
get_service_key(line)
event?(line) ->
get_event_key(line)
stream_event?(line) ->
get_stream_event_key(line)
true ->
get_other_key(line)
end
end
defp get_other_key(line) do
matches = Regex.run(~r/to\s\[(.+?)\]/, line)
List.last(matches)
end
defp service?(line) do
Regex.match?(~r/<\{\"service\"/, line)
end
defp get_service_key(line) do
matches = Regex.run(~r/"service":"(.+?)"/, line)
List.last(matches)
end
defp event?(line) do
Regex.match?(~r/\"event_type\"/, line)
end
defp get_event_key(line) do
matches = Regex.run(~r/"event_type":"(.+?)"/, line)
List.last(matches)
end
defp stream_event?(line) do
Regex.match?(~r/\"type\"/, line)
end
defp get_stream_event_key(line) do
matches = Regex.run(~r/"type":"(.+?)"/, line)
List.last(matches)
end
defp get_payload(line) do
matches = Regex.run(~r/<(.+?)>/, line)
List.last(matches)
end
defp get_key_and_payload(line) do
key = get_key(line)
payload = get_payload(line)
{String.to_atom(key), key, payload}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment