Skip to content

Instantly share code, notes, and snippets.

@joshnuss
Last active March 31, 2023 09:02
Show Gist options
  • Save joshnuss/d036ffb224d6d90f331778cf141e13f4 to your computer and use it in GitHub Desktop.
Save joshnuss/d036ffb224d6d90f331778cf141e13f4 to your computer and use it in GitHub Desktop.
Streaming HTTP requests
defmodule MyApp.Integrations.Skubana do
@host "..."
@headers [ ... ]
# returns a stream of shipments
def get_shipments do
# start with page 1
start = fn -> 1 end
# create a stream, it will make HTTP requests until the page returned is empty
Stream.resource(start, &get_page/1, &Function.identity/1)
end
defp get_page(page) do
# build url
url = "#{@host}/v1/shipments?page=#{page}"
# make http request
{:ok, response} = Mojito.get(url, @headers)
# decode JSON
case Jason.decode!(response.body) do
# if empty array, halt the stream
[] -> {:halt, nil}
# otherwise, return data and move on to next page
shipments ->
{shipments, page+1}
end
end
end
# paging is completely hidden from the caller's perspective
MyApp.Integrations.Skubana.get_shipments()
|> Stream.each(&Fulfillment.create_shipment/1)
|> Stream.run()
@Jqnxyz
Copy link

Jqnxyz commented Feb 3, 2022

Isn't Stream.run() in usage.exs:5 redundant since calling Stream.each also triggers the Stream? Or am I mistaken

@joshnuss
Copy link
Author

joshnuss commented Feb 3, 2022

Stream.each won't trigger the stream, it's just defining a step in the pipeline.
To start streaming there needs to be a Stream.run or a call to one of the Enum functions, like Enum.to_list, Enum.map, Enum.reduce etc..

To see it, try this in iex:

Stream.repeatedly(&DateTime.utc_now/0) |> Stream.each(&IO.inspect/1)

@Jqnxyz
Copy link

Jqnxyz commented Feb 3, 2022

Ah I see, thank you for the explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment