Last active
March 14, 2018 09:15
-
-
Save alvises/656cacf7b1962927b34f2708e52ebc6f to your computer and use it in GitHub Desktop.
GenStage and twitter stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Twitter do | |
def start(_type, _args) do | |
import Supervisor.Spec | |
workers = [ | |
worker(Twitter.Producer, ["#Rio2016"]), | |
worker(Twitter.Consumer, [], id: 1), | |
worker(Twitter.Consumer, [], id: 2), | |
worker(Twitter.Consumer, [], id: 3) | |
] | |
Supervisor.start_link(workers, strategy: :one_for_one) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alias Experimental.GenStage | |
defmodule Twitter.Consumer do | |
use GenStage | |
def start_link do | |
GenStage.start_link(__MODULE__, []) | |
end | |
def init(tweets) do | |
IO.inspect [self, "consumer"] | |
{:consumer, tweets, subscribe_to: [{Twitter.Producer, max_demand: 20}]} | |
end | |
def handle_events(events, _from, tweets) do | |
new_tweets = events |> Enum.map(fn e -> e.text end) | |
new_tweets |> Enum.each(fn t -> IO.inspect([self,t]) end) | |
IO.inspect [self,"sleeping for 10s"] | |
:timer.sleep(10_000) | |
IO.inspect [self,"awake"] | |
{:noreply, [], nil} | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[#PID<0.177.0>, | |
"RT @BBCSport: It was a sensational #Rio2016... Welcome home @TeamGB 👏 https://t.co/IerPuEso63"] | |
[#PID<0.177.0>, | |
"RT @WhiteHouse: ICYMI: Go behind the scenes with the the U.S. delegation at the #Rio2016 Closing Ceremony: https://t.co/G6iq5occa4 https://…"] | |
[#PID<0.177.0>, | |
"RT @sportskh: #Rio2016 #리우 #올림픽 #수영 호주 캠벨, 탈장 상태로 올림픽 출전\n\n자세한 기사는\nhttps://t.co/8ZR6cBEfng https://t.co/58wXQ1rv3Z"] | |
[#PID<0.177.0>, | |
"RT @SocialSA_: Van Niekerk will receive an extra R150k, Caster an extra R100k‚ Silver winners receive extra R70k , and for Bronze R50k extr…"] | |
[#PID<0.177.0>, | |
"RT @Ajay_Chandrakar: Congratulations #PVSindhu for Getting First Ever #Silver in Badminton @ #Rio2016"] | |
[#PID<0.177.0>, | |
"RT @miriamsteimer: Die @zdfheute-Kamera ist bereit, um kurz nach 11h soll der #Olympia-#Siegerflieger landen. #Rio2016 https://t.co/Yi1PbaK…"] | |
[#PID<0.177.0>, | |
"#Breaking144 – Team GB return home from #Rio2016 Olympics – live updates – Guardian –… https://t.co/ZlcqCfED4y https://t.co/61mlA5sYqn"] | |
[#PID<0.177.0>, | |
"RT @DamGiordano: #GinnasticaArtistica\nPer me hanno vinto loro.\n#Cina #Olimpiadi #Rio2016 https://t.co/BB1gm5n7jR"] | |
[#PID<0.177.0>, | |
"RT @98rosjon: If #Croydon was a country, it would have come joint 78th in #Rio2016 medals table with Portugal, Austria and Nigeria."] | |
[#PID<0.177.0>, | |
"RT @afpfr: #Rio2016 En Grande-Bretagne aussi, l'équipe olympique vient d'arriver, à l'aéroport d'Heathrow près de Londres #AFP https://t.co…"] | |
[#PID<0.177.0>, "sleeping for 10s"] | |
[#PID<0.177.0>, "awake"] | |
[#PID<0.177.0>, | |
"@Dwayneeeboy reviews the Rio Olympics 2016 @Rio2016_en: https://t.co/h4OboSWyqo. Competition winners also announced!\n\n#Rio2016 #Olympics"] | |
[#PID<0.177.0>, | |
"RT @olimpicsoniisan: 閉会式の映像でキャプテン翼が話題で、ジダンが「翼を見て本格的にサッカー始めた」、メッシが「翼見るために学校サボった」など美談がたくさん出回ってるが、私の中での最高の美談は、イタリアのトッティがタイガーショットの練習して故障し、ワンシーズ…"] | |
[#PID<0.177.0>, | |
"RT @signal107: NEWS: #TeamGB arrive back to a heroes' welcome after historic #Rio2016 >>https://t.co/6laNjOeV6e https://t.co/awrNOQ9QW5"] | |
[#PID<0.177.0>, | |
"RT @h_723: レースの前は、Make a wishを一番聴いていました🎵本当に私の心の支えになっていました😌✨\n#rio2016 #Japonism #嵐"] | |
[#PID<0.177.0>, | |
"#Breaking144 – Team GB return home from #Rio2016 Olympics – live updates – Guardian –… https://t.co/RCiRgkgnwL https://t.co/HmdRv0GefQ"] | |
[#PID<0.177.0>, | |
"Belgium-iPhone : Rio 2016 : Michael Phelps plus populaire qu'Usain Bolt sur Instagram #Instagram #Rio2016 https://t.co/MClHEblPsH"] | |
[#PID<0.177.0>, | |
"Voting: Wir suchen euren #Rio2016-Moment: ^slz https://t.co/moqsCzPJoz"] | |
[#PID<0.177.0>, | |
"RT @PSG_inside: Félicitations à @marquinhos_m5 pour sa médaille d'#or avec le Brésil à #Rio2016 !! 👏👏 https://t.co/AzzjsAgRlL"] | |
[#PID<0.177.0>, | |
"RT @elcomercio: Las insólitas labores a las que vuelven los atletas de #Rio2016 ► https://t.co/41EEQXemYk https://t.co/6iHWwvWHAn"] | |
[#PID<0.177.0>, | |
"Very exciting! Lovely gold-nosed plane as well 👏🏻 #Rio2016 https://t.co/2RJCLprg8D"] | |
[#PID<0.177.0>, "sleeping for 10s"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alias Experimental.GenStage | |
defmodule Twitter.Producer do | |
use GenStage | |
def start_link(track) do | |
IO.puts "track: #{track}" | |
GenStage.start_link(__MODULE__, track, name: __MODULE__) | |
end | |
def init(track) do | |
stream = twitter_stream(track) | |
#stream = bigfile_stream | |
{:producer, stream} | |
end | |
def handle_demand(demand, stream) when demand > 0 do | |
events = stream |> Enum.take(demand) | |
{:noreply, events, stream} | |
end | |
defp twitter_stream(track) do | |
ExTwitter.stream_filter([track: track], :infinity) | |
end | |
defp bigfile_stream do | |
stream = File.stream!("../genstage/stagetest/big.txt",[],:line) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment