Skip to content

Instantly share code, notes, and snippets.

@rbishop
Created August 18, 2015 15:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rbishop/8ea8b8cbdc8dc991e8a1 to your computer and use it in GitHub Desktop.
Save rbishop/8ea8b8cbdc8dc991e8a1 to your computer and use it in GitHub Desktop.
Replacing imperative if/else with lazy, infinite Stream. I had to remove some of the specifics for security purposes but hopefully this shows the general idea.
def loop(model, models) do
task_sup = Process.whereis(:worker_supervisor)
Stream.resource(
fn -> {model, 0} end,
&func_that_gets_data/1,
fn(_) -> :ok end
)
|> Stream.chunk(20)
|> Stream.map(fn(ids) -> Task.Supervisor.async(task_sup, Worker, :sanitize, [model, ids]) end)
|> Stream.run
case models do
[] ->
main_pid = Process.whereis(:expunge_main)
send(main_pid, :done)
exit(:normal)
_ ->
[next_model | rest] = models
loop(next_model, rest)
end
end
def loop(model_type, %{models: models, last_row_id: id} = state) do
task_sup = Process.whereis(:worker_supervisor)
children = Task.Supervisor.children(task_sup)
if Enum.count(children) < 6 do
row_ids = func_that_gets_data(model_type, id)
Task.Supervisor.start_child(task_sup, Mod, :fun, [model_type, row_ids])
if Enum.count(row_ids) < 10 do
case models do
[] ->
main_pid = Process.whereis(:main_task)
send(main_pid, :done)
exit(:normal)
_ ->
[model_type | models] = models
state = %{models: models, last_row_id: 0}
end
else
state = %{state | last_row_id: List.last(row_ids)}
end
else
# Let's wait 25ms and try again
:timer.sleep(25)
end
loop(model_type, state)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment