Skip to content

Instantly share code, notes, and snippets.

@alvises
Last active Mar 14, 2018
Embed
What would you like to do?
GenStage and twitter stream
defmodule Twitter do
def start(_type, _args) do
import Supervisor.Spec
workers = [
worker(Twitter.Producer, ["#Rio2016"]),
worker(Twitter.Consumer, [], id: 1),
worker(Twitter.Consumer, [], id: 2),
worker(Twitter.Consumer, [], id: 3)
]
Supervisor.start_link(workers, strategy: :one_for_one)
end
end
alias Experimental.GenStage
defmodule Twitter.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, [])
end
def init(tweets) do
IO.inspect [self, "consumer"]
{:consumer, tweets, subscribe_to: [{Twitter.Producer, max_demand: 20}]}
end
def handle_events(events, _from, tweets) do
new_tweets = events |> Enum.map(fn e -> e.text end)
new_tweets |> Enum.each(fn t -> IO.inspect([self,t]) end)
IO.inspect [self,"sleeping for 10s"]
:timer.sleep(10_000)
IO.inspect [self,"awake"]
{:noreply, [], nil}
end
end
[#PID<0.177.0>,
"RT @BBCSport: It was a sensational #Rio2016... Welcome home @TeamGB 👏 https://t.co/IerPuEso63"]
[#PID<0.177.0>,
"RT @WhiteHouse: ICYMI: Go behind the scenes with the the U.S. delegation at the #Rio2016 Closing Ceremony: https://t.co/G6iq5occa4 https://…"]
[#PID<0.177.0>,
"RT @sportskh: #Rio2016 #리우 #올림픽 #수영 호주 캠벨, 탈장 상태로 올림픽 출전\n\n자세한 기사는\nhttps://t.co/8ZR6cBEfng https://t.co/58wXQ1rv3Z"]
[#PID<0.177.0>,
"RT @SocialSA_: Van Niekerk will receive an extra R150k, Caster an extra R100k‚ Silver winners receive extra R70k , and for Bronze R50k extr…"]
[#PID<0.177.0>,
"RT @Ajay_Chandrakar: Congratulations #PVSindhu for Getting First Ever #Silver in Badminton @ #Rio2016"]
[#PID<0.177.0>,
"RT @miriamsteimer: Die @zdfheute-Kamera ist bereit, um kurz nach 11h soll der #Olympia-#Siegerflieger landen. #Rio2016 https://t.co/Yi1PbaK…"]
[#PID<0.177.0>,
"#Breaking144 – Team GB return home from #Rio2016 Olympics – live updates – Guardian –… https://t.co/ZlcqCfED4y https://t.co/61mlA5sYqn"]
[#PID<0.177.0>,
"RT @DamGiordano: #GinnasticaArtistica\nPer me hanno vinto loro.\n#Cina #Olimpiadi #Rio2016 https://t.co/BB1gm5n7jR"]
[#PID<0.177.0>,
"RT @98rosjon: If #Croydon was a country, it would have come joint 78th in #Rio2016 medals table with Portugal, Austria and Nigeria."]
[#PID<0.177.0>,
"RT @afpfr: #Rio2016 En Grande-Bretagne aussi, l'équipe olympique vient d'arriver, à l'aéroport d'Heathrow près de Londres #AFP https://t.co…"]
[#PID<0.177.0>, "sleeping for 10s"]
[#PID<0.177.0>, "awake"]
[#PID<0.177.0>,
"@Dwayneeeboy reviews the Rio Olympics 2016 @Rio2016_en: https://t.co/h4OboSWyqo. Competition winners also announced!\n\n#Rio2016 #Olympics"]
[#PID<0.177.0>,
"RT @olimpicsoniisan: 閉会式の映像でキャプテン翼が話題で、ジダンが「翼を見て本格的にサッカー始めた」、メッシが「翼見るために学校サボった」など美談がたくさん出回ってるが、私の中での最高の美談は、イタリアのトッティがタイガーショットの練習して故障し、ワンシーズ…"]
[#PID<0.177.0>,
"RT @signal107: NEWS: #TeamGB arrive back to a heroes' welcome after historic #Rio2016 &gt;&gt;https://t.co/6laNjOeV6e https://t.co/awrNOQ9QW5"]
[#PID<0.177.0>,
"RT @h_723: レースの前は、Make a wishを一番聴いていました🎵本当に私の心の支えになっていました😌✨\n#rio2016 #Japonism #嵐"]
[#PID<0.177.0>,
"#Breaking144 – Team GB return home from #Rio2016 Olympics – live updates – Guardian –… https://t.co/RCiRgkgnwL https://t.co/HmdRv0GefQ"]
[#PID<0.177.0>,
"Belgium-iPhone : Rio 2016 : Michael Phelps plus populaire qu'Usain Bolt sur Instagram #Instagram #Rio2016 https://t.co/MClHEblPsH"]
[#PID<0.177.0>,
"Voting: Wir suchen euren #Rio2016-Moment: ^slz https://t.co/moqsCzPJoz"]
[#PID<0.177.0>,
"RT @PSG_inside: Félicitations à @marquinhos_m5 pour sa médaille d'#or avec le Brésil à #Rio2016 !! 👏👏 https://t.co/AzzjsAgRlL"]
[#PID<0.177.0>,
"RT @elcomercio: Las insólitas labores a las que vuelven los atletas de #Rio2016 ► https://t.co/41EEQXemYk https://t.co/6iHWwvWHAn"]
[#PID<0.177.0>,
"Very exciting! Lovely gold-nosed plane as well 👏🏻 #Rio2016 https://t.co/2RJCLprg8D"]
[#PID<0.177.0>, "sleeping for 10s"]
alias Experimental.GenStage
defmodule Twitter.Producer do
use GenStage
def start_link(track) do
IO.puts "track: #{track}"
GenStage.start_link(__MODULE__, track, name: __MODULE__)
end
def init(track) do
stream = twitter_stream(track)
#stream = bigfile_stream
{:producer, stream}
end
def handle_demand(demand, stream) when demand > 0 do
events = stream |> Enum.take(demand)
{:noreply, events, stream}
end
defp twitter_stream(track) do
ExTwitter.stream_filter([track: track], :infinity)
end
defp bigfile_stream do
stream = File.stream!("../genstage/stagetest/big.txt",[],:line)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment