Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Infinite Streams with Elixir
# Elixir has lazily evaluated enumerable objects that allow you
# to work with enumerable objects like lists either only as needed
# or infinitely.
# Start up iex to play around
$ iex
# Typical enumeration is done eagerly where the result is computed ASAP
iex> Enum.map(1..10, fn i -> i * 2 end)
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# So if we do map twice we'll see that it executes twice
# Note: `1..10 |> Enum.map(fun)` is the same as `Enum.map(1..10, fun)`
# The |> pipe operator passes the result of the previous function into the first
# parameter of the function to the right. Therefore, `a |> b |> c |> d` is equivalent to
# `d(c(b(a)))` which is harder to read since it executes inside out. This is the form Elixir
# compiles it down to, so theres no runtime difference between the two forms.
iex> Enum.map(1..10, fn i -> IO.inspect(i) end) |> Enum.map(fn i -> IO.inspect(i) end)
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# However a stream will delay execution until absoutely necessary
iex> Stream.map(1..10, fn i -> IO.inspect(i) end) |> Stream.map(fn i -> IO.inspect(i) end)
#Stream<[enum: 1..10,
funs: [#Function<0.98695279/1 in Stream.map/2>,
#Function<0.98695279/1 in Stream.map/2>]]>
# Notice how it wasn't executed, it just returned a data structure representing
# what we want to do, including both functions we want to execute.
# We can execute it by performing an Enum function on it. We can shorten this up by using
# the anonymous function syntax instead
iex> Stream.map(1..10, &IO.inspect/1) |> Stream.map(&IO.inspect/1) |> Enum.take(1)
1
1
[1]
# So if we only need the first element, we won't map the whole list and then just grab the first
# as we might do in other languages (wasting the computation performed on the rest of the list).
iex> Stream.map(1..10, fn i -> i * 2 end) |> Enum.take(1)
[2] # Never computed for 2..10
# We can compose multiple streams together also.
iex> 1..10 |> Stream.map(fn i -> i * 2 end) |> Stream.filter(fn(i) -> i >= 4 end) |> Enum.take(1)
[4]
# Because of the order, we're asking to double each element and the drop the ones that are less than 4, then
# just grabbing the first that matches that condition. That means we can also reverse the order to do the filtering
# first and then perform the work as little as possible, if our business logic allowed it.
iex> 1..10 |> Stream.filter(fn(i) -> i >= 3 end) |> Stream.map(fn(i)-> i * 2 end) |> Enum.take(1)
[6]
# Elixir also has infinite streams that never end. Basically we're going to ask to generate a repeating stream.
iex> Stream.cycle(1..10)
#Function<60.98695279/2 in Stream.unfold/2>
# And then we can take as many as we want
iex> Stream.cycle(1..10) |> Enum.take(10)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
iex> Stream.cycle(1..10) |> Enum.take(20)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# So we can perform work if needing to by taking an infinite stream and turning it into a list (which will never end)
iex> Stream.cycle(1..10) |>
...> # Filter out odd numbers (rem/2 is the modulus function, calculating remainder)
...> Stream.filter(&(rem(&1, 2) == 0)) |>
...> # Reject the number 6 just cause
...> Stream.reject(&match?(6, &1)) |>
...> # Print out the number
...> Stream.map(&IO.inspect/1) |>
...> # Take the first 10 items
...> Enum.take(10)
2
4
8
10
2
4
8
10
2
4
[2, 4, 8, 10, 2, 4, 8, 10, 2, 4]
# If instead we attempt to convert the infinite list to a finite list, it will never complete
iex> Stream.cycle(1..10) |>
...> Stream.filter(&(rem(&1, 2) == 0)) |>
...> Stream.reject(&match?(6, &1)) |>
...> Stream.map(&IO.inspect/1) |>
...> # This call here will lock only the current process into performing this work until the end of time
...> Enum.to_list
2
4
8
10
2
4
8
10
2
4
8
10
2
4
8
10
2
# This output never ends... CTRL+C twice to exit
# We're going to process data from twitter this way!
# Start a new Phoenix project
$ mix phoenix.new twitter_stream
# Open up mix.exs, and add extwitter
def application do
[mod: {TwitterStream, []},
applications: [:phoenix, :phoenix_html, :cowboy, :logger, :gettext,
:phoenix_ecto, :postgrex, :extwitter]] # Add extwitter here
end
# And change the deps to
defp deps do
[
{:phoenix, "~> 1.1.4"},
{:postgrex, ">= 0.0.0"},
{:phoenix_ecto, "~> 2.0"},
{:phoenix_html, "~> 2.4"},
{:phoenix_live_reload, "~> 1.0", only: :dev},
{:gettext, "~> 0.9"},
{:cowboy, "~> 1.0"},
{:oauth, github: "tim/erlang-oauth"}, # Add this and the line below
{:extwitter, "~> 0.7.1"}
]
end
# And add the configuration keys you'll need to hit their API into config/config.exs. You'll
# need to get these values from apps.twitter.com (make an app and get its keys).
config :extwitter, :oauth,
consumer_key: System.get_env("TWITTER_CONSUMER_KEY"),
consumer_secret: System.get_env("TWITTER_CONSUMER_SECRET"),
access_token: System.get_env("TWITTER_ACCESS_TOKEN"),
access_token_secret: System.get_env("TWITTER_ACCESS_SECRET")
# Fetch dependencies
$ mix deps.get
# Open up iex with all our code loaded
$ iex -S mix
# We can fetch the last value with
iex> ExTwitter.search("apple") |> Enum.map(&( &1.text ))
...
# ExTwitter gives us an Elixir Stream we can use:
iex> ExTwitter.stream_filter(track: "apple")
#Function<10.98695279/2 in Stream.resource/3>
# This is possible because of `Stream.resource/3`
iex> h Stream.resource/3
# Basically, if we provide 3 functions: an initialization function, a get next function, and a
# cleanup function; Elixir will let us create our own data streams.
# We can take a few values from the extwitter stream, and it won't return until its got all 5 values
iex> ExTwitter.stream_filter(track: "apple") |> Enum.take(5)
# But if we stream map it, we'll see data coming through as it comes in:
iex> ExTwitter.stream_filter(track: "apple") |> Stream.map(&(&1.text)) |> Stream.map(&IO.inspect/1) |> Enum.take(10)
...
[ ... ]
# Lets stream this onto the page in our phoenix app:
$ mix phoenix.gen.channel Tweet tweets
# Open up web/channels/tweet_channel.ex and change the join function to
def join("tweets:" <> _tweet, payload, socket) do
{:ok, socket}
end
# You can delete the `authorized?/1` function and both `handle_in/3` functions if you'd like.
# Uncomment the last line of web/static/javascript/app.js to include the socket.js file
# Add this line to the bottom of the socket.js file before the export line:
let channel = socket.channel(`tweets:${prompt("Twitter to search for")}`, {})
channel.on('tweet', tweet => { $('#tweets').prepend(tweet.html) })
channel.join()
# Open web/channels/user_socket.ex and allow the user to join that channel by replacing line 5 with:
channel "tweets:*", TwitterStream.TweetChannel
# Add jquery to the head tag of web/templates/layout/app.html.eex (normally this is done via package.json)
<script src="https://code.jquery.com/jquery-2.2.3.min.js"></script>
# Boot up the phoenix server
$ iex -S mix phoenix.server
# Replace web/templates/pages/index.html.eex with a div for our tweets:
<div id="tweets">
</div>
# Add a file for rendering a specific tweet into web/templates/pages/tweet.html.eex
<div class="col-xs-12">
<img src="<%= @tweet.profile_url %>"/>
<%= @tweet.text %>
</div>
# We can render that view supplying the data easily
iex> html = Phoenix.View.render_to_string(TwitterStream.PageView, "tweet.html", tweet: %{profile_url: "https://pbs.twimg.com/profile_images/714638875377999872/DawaNC9x_normal.jpg", text: "This is the text of the tweet"})
# And then broadcast it and it should show up on the screen
iex> TwitterStream.Endpoint.broadcast "tweets:apple", "tweet", %{html: html}
# So to stream them in we can simply do
iex> ExTwitter.stream_filter(track: "apple") |>
...> # Take the huge amount of data and break it down into just the fields we want
...> Stream.map(&(%{profile_url: &1.user.profile_image_url, text: &1.text})) |>
...> # Print it out so we see it coming in on the console too
...> Stream.map(&IO.inspect/1) |>
...> # Render the tweet to HTML
...> Stream.map(&(Phoenix.View.render_to_string(TwitterStream.PageView, "tweet.html", tweet: &1))) |>
...> # Broadcast it to all listening users
...> Stream.map(&(TwitterStream.Endpoint.broadcast("tweets:apple", "tweet", %{html: &1}))) |>
...> # Do this forever!
...> Enum.to_list
# CTRL + C twice to exit
# To make this final we'll just kick this off in a separate process whenever a join occurs. Change the
# web/channels/tweet_channel.ex to the following:
# NOTE: This isn't the right place to add this, but its a quick hack and it works for the demo
def join("tweets:" <> tweet, payload, socket) do
spawn_link fn ->
[track: tweet]
|> ExTwitter.stream_filter
|> Stream.map(&(%{profile_url: &1.user.profile_image_url, text: &1.text}))
|> Stream.map(&IO.inspect/1)
|> Stream.map(&(Phoenix.View.render_to_string(TwitterStream.PageView, "tweet.html", tweet: &1)))
|> Stream.map(&(TwitterStream.Endpoint.broadcast("tweets:" <> tweet, "tweet", %{html: &1})))
|> Enum.to_list
end
{:ok, socket}
end
# Launch the server (if it isnt running already) and test it out!
$ mix phoenix.server
@hopewise

This comment has been minimized.

Copy link

commented Jul 30, 2018

wow!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.