Skip to content

Instantly share code, notes, and snippets.

@sneako
Last active March 19, 2019 12:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sneako/f7cb0dba96c5a9beb9c4310a2f999a87 to your computer and use it in GitHub Desktop.
Save sneako/f7cb0dba96c5a9beb9c4310a2f999a87 to your computer and use it in GitHub Desktop.
Kinesis Writer
defmodule KinesisWriter.Writer do
@type t :: %__MODULE__{
aggregator: ExKpl.t(),
client: module(),
client_opts: Keyword.t()
}
defstruct aggregator: ExKpl.new(),
client: KinesisWriter.Client,
client_opts: []
def new(opts \\ []) do
struct(__MODULE__, opts)
end
def write(%__MODULE__{aggregator: agg} = state, message) do
case ExKpl.add(agg, {partition_key(), message}) do
{:undefined, updated_agg} ->
%{state | aggregator: updated_agg}
{{pk, full_record, _ehk}, updated_agg} ->
state.client.put_record(format_record(pk, full_record), state.client_opts)
%{state | aggregator: updated_agg}
end
end
def flush(%{aggregator: agg, client: client} = state) do
{{pk, record, _}, new_agg} = ExKpl.finish(agg)
client.put_record(format_record(pk, record), state.client_opts)
%{state | aggregator: new_agg}
end
defp format_record(pk, record), do: %{data: record, partition_key: pk}
defp partition_key, do: Ecto.UUID.generate()
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment