Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Infinite Streams with Elixir
# Elixir has lazily evaluated enumerable objects that allow you
# to work with enumerable objects like lists either only as needed
# or infinitely.
# Start up iex to play around
$ iex
# Typical enumeration is done eagerly where the result is computed ASAP
iex> Enum.map(1..10, fn i -> i * 2 end)
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# So if we do map twice we'll see that it executes twice
# Note: `1..10 |> Enum.map(fun)` is the same as `Enum.map(1..10, fun)`
# The |> pipe operator passes the result of the previous function into the first
# parameter of the function to the right. Therefore, `a |> b |> c |> d` is equivalent to
# `d(c(b(a)))` which is harder to read since it executes inside out. This is the form Elixir
# compiles it down to, so theres no runtime difference between the two forms.
iex> Enum.map(1..10, fn i -> IO.inspect(i) end) |> Enum.map(fn i -> IO.inspect(i) end)
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# However a stream will delay execution until absoutely necessary
iex> Stream.map(1..10, fn i -> IO.inspect(i) end) |> Stream.map(fn i -> IO.inspect(i) end)
#Stream<[enum: 1..10,
funs: [#Function<0.98695279/1 in Stream.map/2>,
#Function<0.98695279/1 in Stream.map/2>]]>
# Notice how it wasn't executed, it just returned a data structure representing
# what we want to do, including both functions we want to execute.
# We can execute it by performing an Enum function on it. We can shorten this up by using
# the anonymous function syntax instead
iex> Stream.map(1..10, &IO.inspect/1) |> Stream.map(&IO.inspect/1) |> Enum.take(1)
1
1
[1]
# So if we only need the first element, we won't map the whole list and then just grab the first
# as we might do in other languages (wasting the computation performed on the rest of the list).
iex> Stream.map(1..10, fn i -> i * 2 end) |> Enum.take(1)
[2] # Never computed for 2..10
# We can compose multiple streams together also.
iex> 1..10 |> Stream.map(fn i -> i * 2 end) |> Stream.filter(fn(i) -> i >= 4 end) |> Enum.take(1)
[4]
# Because of the order, we're asking to double each element and the drop the ones that are less than 4, then
# just grabbing the first that matches that condition. That means we can also reverse the order to do the filtering
# first and then perform the work as little as possible, if our business logic allowed it.
iex> 1..10 |> Stream.filter(fn(i) -> i >= 3 end) |> Stream.map(fn(i)-> i * 2 end) |> Enum.take(1)
[6]
# Elixir also has infinite streams that never end. Basically we're going to ask to generate a repeating stream.
iex> Stream.cycle(1..10)
#Function<60.98695279/2 in Stream.unfold/2>
# And then we can take as many as we want
iex> Stream.cycle(1..10) |> Enum.take(10)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
iex> Stream.cycle(1..10) |> Enum.take(20)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# So we can perform work if needing to by taking an infinite stream and turning it into a list (which will never end)
iex> Stream.cycle(1..10) |>
...> # Filter out odd numbers (rem/2 is the modulus function, calculating remainder)
...> Stream.filter(&(rem(&1, 2) == 0)) |>
...> # Reject the number 6 just cause
...> Stream.reject(&match?(6, &1)) |>
...> # Print out the number
...> Stream.map(&IO.inspect/1) |>
...> # Take the first 10 items
...> Enum.take(10)
2
4
8
10
2
4
8
10
2
4
[2, 4, 8, 10, 2, 4, 8, 10, 2, 4]
# If instead we attempt to convert the infinite list to a finite list, it will never complete
iex> Stream.cycle(1..10) |>
...> Stream.filter(&(rem(&1, 2) == 0)) |>
...> Stream.reject(&match?(6, &1)) |>
...> Stream.map(&IO.inspect/1) |>
...> # This call here will lock only the current process into performing this work until the end of time
...> Enum.to_list
2
4
8
10
2
4
8
10
2
4
8
10
2
4
8
10
2
# This output never ends... CTRL+C twice to exit
# We're going to process data from twitter this way!
# Start a new Phoenix project
$ mix phoenix.new twitter_stream
# Open up mix.exs, and add extwitter
def application do
[mod: {TwitterStream, []},
applications: [:phoenix, :phoenix_html, :cowboy, :logger, :gettext,
:phoenix_ecto, :postgrex, :extwitter]] # Add extwitter here
end
# And change the deps to
defp deps do
[
{:phoenix, "~> 1.1.4"},
{:postgrex, ">= 0.0.0"},
{:phoenix_ecto, "~> 2.0"},
{:phoenix_html, "~> 2.4"},
{:phoenix_live_reload, "~> 1.0", only: :dev},
{:gettext, "~> 0.9"},
{:cowboy, "~> 1.0"},
{:oauth, github: "tim/erlang-oauth"}, # Add this and the line below
{:extwitter, "~> 0.7.1"}
]
end
# And add the configuration keys you'll need to hit their API into config/config.exs. You'll
# need to get these values from apps.twitter.com (make an app and get its keys).
config :extwitter, :oauth,
consumer_key: System.get_env("TWITTER_CONSUMER_KEY"),
consumer_secret: System.get_env("TWITTER_CONSUMER_SECRET"),
access_token: System.get_env("TWITTER_ACCESS_TOKEN"),
access_token_secret: System.get_env("TWITTER_ACCESS_SECRET")
# Fetch dependencies
$ mix deps.get
# Open up iex with all our code loaded
$ iex -S mix
# We can fetch the last value with
iex> ExTwitter.search("apple") |> Enum.map(&( &1.text ))
...
# ExTwitter gives us an Elixir Stream we can use:
iex> ExTwitter.stream_filter(track: "apple")
#Function<10.98695279/2 in Stream.resource/3>
# This is possible because of `Stream.resource/3`
iex> h Stream.resource/3
# Basically, if we provide 3 functions: an initialization function, a get next function, and a
# cleanup function; Elixir will let us create our own data streams.
# We can take a few values from the extwitter stream, and it won't return until its got all 5 values
iex> ExTwitter.stream_filter(track: "apple") |> Enum.take(5)
# But if we stream map it, we'll see data coming through as it comes in:
iex> ExTwitter.stream_filter(track: "apple") |> Stream.map(&(&1.text)) |> Stream.map(&IO.inspect/1) |> Enum.take(10)
...
[ ... ]
# Lets stream this onto the page in our phoenix app:
$ mix phoenix.gen.channel Tweet tweets
# Open up web/channels/tweet_channel.ex and change the join function to
def join("tweets:" <> _tweet, payload, socket) do
{:ok, socket}
end
# You can delete the `authorized?/1` function and both `handle_in/3` functions if you'd like.
# Uncomment the last line of web/static/javascript/app.js to include the socket.js file
# Add this line to the bottom of the socket.js file before the export line:
let channel = socket.channel(`tweets:${prompt("Twitter to search for")}`, {})
channel.on('tweet', tweet => { $('#tweets').prepend(tweet.html) })
channel.join()
# Open web/channels/user_socket.ex and allow the user to join that channel by replacing line 5 with:
channel "tweets:*", TwitterStream.TweetChannel
# Add jquery to the head tag of web/templates/layout/app.html.eex (normally this is done via package.json)
<script src="https://code.jquery.com/jquery-2.2.3.min.js"></script>
# Boot up the phoenix server
$ iex -S mix phoenix.server
# Replace web/templates/pages/index.html.eex with a div for our tweets:
<div id="tweets">
</div>
# Add a file for rendering a specific tweet into web/templates/pages/tweet.html.eex
<div class="col-xs-12">
<img src="<%= @tweet.profile_url %>"/>
<%= @tweet.text %>
</div>
# We can render that view supplying the data easily
iex> html = Phoenix.View.render_to_string(TwitterStream.PageView, "tweet.html", tweet: %{profile_url: "https://pbs.twimg.com/profile_images/714638875377999872/DawaNC9x_normal.jpg", text: "This is the text of the tweet"})
# And then broadcast it and it should show up on the screen
iex> TwitterStream.Endpoint.broadcast "tweets:apple", "tweet", %{html: html}
# So to stream them in we can simply do
iex> ExTwitter.stream_filter(track: "apple") |>
...> # Take the huge amount of data and break it down into just the fields we want
...> Stream.map(&(%{profile_url: &1.user.profile_image_url, text: &1.text})) |>
...> # Print it out so we see it coming in on the console too
...> Stream.map(&IO.inspect/1) |>
...> # Render the tweet to HTML
...> Stream.map(&(Phoenix.View.render_to_string(TwitterStream.PageView, "tweet.html", tweet: &1))) |>
...> # Broadcast it to all listening users
...> Stream.map(&(TwitterStream.Endpoint.broadcast("tweets:apple", "tweet", %{html: &1}))) |>
...> # Do this forever!
...> Enum.to_list
# CTRL + C twice to exit
# To make this final we'll just kick this off in a separate process whenever a join occurs. Change the
# web/channels/tweet_channel.ex to the following:
# NOTE: This isn't the right place to add this, but its a quick hack and it works for the demo
def join("tweets:" <> tweet, payload, socket) do
spawn_link fn ->
[track: tweet]
|> ExTwitter.stream_filter
|> Stream.map(&(%{profile_url: &1.user.profile_image_url, text: &1.text}))
|> Stream.map(&IO.inspect/1)
|> Stream.map(&(Phoenix.View.render_to_string(TwitterStream.PageView, "tweet.html", tweet: &1)))
|> Stream.map(&(TwitterStream.Endpoint.broadcast("tweets:" <> tweet, "tweet", %{html: &1})))
|> Enum.to_list
end
{:ok, socket}
end
# Launch the server (if it isnt running already) and test it out!
$ mix phoenix.server
@hopewise

This comment has been minimized.

Copy link

@hopewise hopewise commented Jul 30, 2018

wow!

@WannesFransen1994

This comment has been minimized.

Copy link

@WannesFransen1994 WannesFransen1994 commented Oct 21, 2019

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment