Skip to content

Instantly share code, notes, and snippets.

@kei-s
Created April 21, 2010 07:13
Show Gist options
  • Save kei-s/373523 to your computer and use it in GitHub Desktop.
Save kei-s/373523 to your computer and use it in GitHub Desktop.
user streams to AMQP
# vim:fileencoding=utf-8
require 'time'
require 'rubygems'
require 'mq'
require 'yajl'
require 'pp'
AMQP.start do
amq = MQ.new
queue = amq.queue('tweetfan').bind(amq.fanout('tweet'))
Signal.trap(:INT) do
queue.delete
AMQP.stop {EM.stop}
end
Signal.trap(:TERM) do
queue.delete
AMQP.stop {EM.stop}
end
queue.subscribe do |tweet|
data = Yajl::Parser.parse(tweet)
begin
if data['text']
if data['retweeted_status']
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} retweet >#{data['user']['screen_name']}< | <#{data['retweeted_status']['user']['screen_name']}> #{data['text']}"
else
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} <#{data['user']['screen_name']}> #{data['text']}"
end
elsif data['event']
case data['event']
when 'favorite'
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} favorite >#{data['source']['screen_name']}< | <#{data['target']['screen_name']}> #{data['target_object']['text']}"
when 'unfavorite'
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} unfavorite >#{data['source']['screen_name']}< | <#{data['target']['screen_name']}> #{data['target_object']['text']}"
when 'follow'
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} follow <#{data['source']['screen_name']}> => <#{data['target']['screen_name']}>"
else
pp data
end
else
pp data
end
rescue => e
puts e.message
puts e.backtrace
exit
end
end
end
# vim:fileencoding=utf-8
require 'uri'
require 'time'
require 'rubygems'
require 'mq'
require 'yajl'
require 'MeCab'
@mecab = MeCab::Tagger.new
tw_count = 0
bucket = {}
AMQP.start do
amq = MQ.new
queue = amq.queue('mecabtweet').bind(amq.fanout('tweet'))
Signal.trap(:INT) do
queue.delete
AMQP.stop {EM.stop}
end
Signal.trap(:TERM) do
queue.delete
AMQP.stop {EM.stop}
end
queue.subscribe do |tweet|
data = Yajl::Parser.parse(tweet)
begin
if data['text']
words = @mecab.parse(data['text'].gsub(URI.regexp(['http','https']),'')).
each_line.map{|l|l.chomp}.reject{|l|l=='EOS'}.map{|l|l.split("\t")[0]}
words.each do |word|
if bucket.has_key? word
bucket[word] += 1
else
bucket[word] = 1
end
end
puts "#{Time.parse(data['created_at']).strftime("%m/%d %H:%M:%S")} <#{data['user']['screen_name']}> #{data['text']}"
puts bucket.sort_by{|k,v| -v}.take(10).map{|k,v|"#{v} #{k}"}
puts "tw: #{tw_count+=1} words: #{bucket.size}"
end
rescue => e
puts e.message
puts e.backtrace
exit
end
end
end
# vim:fileencoding=utf-8
require 'rubygems'
require 'mq'
require 'yajl'
# checkout http://gist.github.com/132372
require 'saykana'
AMQP.start(:host=>'libelabo.jp') do
amq = MQ.new
queue = amq.queue('saytweet',:auto_delete=>true).bind(amq.fanout('tweet'))
Signal.trap(:INT) do
queue.delete
AMQP.stop {EM.stop}
end
Signal.trap(:TERM) do
queue.delete
AMQP.stop {EM.stop}
end
queue.subscribe do |tweet|
data = Yajl::Parser.parse(tweet)
if data['text'] && !data['user']['protected']
puts "<#{data['user']['screen_name']}> #{data['text']}"
sayKana(data['text'].gsub('@',' '))
end
end
end
# vim:fileencoding=utf-8
require 'time'
require 'rubygems'
require 'mq'
require 'yajl'
require 'pp'
def to_time(str)
Time.parse(str).strftime("%m/%d %H:%M:%S")
end
AMQP.start do
amq = MQ.new
publicqueue = amq.fanout('tweetpublic')
queue = amq.queue('tweetfilter').bind(amq.fanout('tweet'))
queue.subscribe do |tweet|
data = Yajl::Parser.parse(tweet)
begin
if data['text']
if !data['user']['protected']
publicqueue.publish(tweet)
end
elsif data['event']
case data['event']
when 'favorite'
if !data['source']['protected'] && !data['target_object']['user']['protected']
publicqueue.publish(tweet)
end
when 'unfavorite'
if !data['source']['protected'] && !data['target_object']['user']['protected']
publicqueue.publish(tweet)
end
when 'retweet'
if !data['source']['protected'] && !data['target_object']['user']['protected']
publicqueue.publish(tweet)
end
when 'follow'
if !data['source']['protected'] && !data['target']['protected']
publicqueue.publish(tweet)
end
end
else
pp data
end
rescue => e
puts e.message
puts e.backtrace
exit
end
end
end
# vim:fileencoding=utf-8
require 'uri'
require 'rubygems'
require 'yajl/http_stream'
require 'pit'
require 'mongo'
require 'bunny'
db = Mongo::Connection.new.db('tweetstream')
collection = db.collection('kei')
config = Pit.get("tweetstream",:require => {
"login" => "login",
"password" => "password"
})
uri = URI.parse("http://#{config['login']}:#{config['password']}@chirpstream.twitter.com/2b/user.json")
b = Bunny.new
b.start
exch = b.exchange('tweet', :type => :fanout)
Yajl::HttpStream.get(uri) do |data|
puts data.inspect
exch.publish(Yajl::Encoder.encode(data).force_encoding('us-ascii'))
collection.insert(data)
end
b.stop
# vim:fileencoding=utf-8
require 'rubygems'
require 'em-websocket'
require 'mq'
EventMachine.run do
@channel = EM::Channel.new
twitter = MQ.new
twitter.queue('websocket').bind(twitter.fanout('tweetpublic')).subscribe do |t|
@channel.push t
end
EM::WebSocket.start(:host=>'0.0.0.0', :port=>8080) do |ws|
ws.onopen do
sid = @channel.subscribe {|msg| ws.send msg}
puts "#{sid} connected"
ws.onmessage {|msg|
puts "<#{sid}>: #{msg}"
}
ws.onclose {
@channel.unsubscribe(sid)
puts "#{sid} closed"
}
end
end
puts "Server started"
end
@ursm
Copy link

ursm commented Apr 28, 2010

Capped Collection!

@kei-s
Copy link
Author

kei-s commented Apr 28, 2010

Thank you for your advise!
I heard "Capped Collection" for the first time.
I want to archive all tweets in this project.
So, I think that auto-LRU age-out feature of capped collection is not suitable for what I want to do.

You may think that MongoDB isn't good answer for my wants.
But I wanted to learn MongoDB anyway ;)

@ursm
Copy link

ursm commented Apr 28, 2010

I see. In the usual collections which is not capped collections, it is not guaranteed that it can take out in order of insertion. You might have to set the index to created_at attribute.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment