Skip to content

Instantly share code, notes, and snippets.

@CyrusOfEden
Last active August 29, 2015 14:05
Show Gist options
  • Save CyrusOfEden/b5836b93caa55a66378c to your computer and use it in GitHub Desktop.
Save CyrusOfEden/b5836b93caa55a66378c to your computer and use it in GitHub Desktop.
Streamer
class Event < ActiveRecord::Base
scope :starting, { where(start_time: 5.minutes.ago..5.minutes.from_now) }
scope :ending, { where(end_time: 5.minutes.ago..5.minutes.from_now) }
# ...
end
every 30.minutes do
rake 'streamer:update'
end
namespace :streamer do
desc "Update streamer's params"
task :update => [:environment] do
ActiveRecord::Base.transaction do
Tweet.orphan.each do |tweet|
tweet.assign_event!
tweet.save
end
end
Event.starting.each do |event|
$streamer.add_params(event.stream_params)
end
Event.ending.each do |event|
$streamer.remove_params(event.stream_params)
end
$streamer.reload!
end
end
class Streamer
attr_reader :threads
def initialize
@threads = {}
end
def stop!
threads.each { |name, thread| kill(thread) }
end
def reload!
stop!
run!
end
end
class Tweet < ActiveRecord::Base
scope :orphan, { where(event_id: nil) }
# ...
end
class TwitterStreamer < Streamer
attr_reader :params
def initialize
super
@params = {}
end
def run!
@threads[:tweets] = Thread.new do
load_streaming_client.filter(params) do |tweet|
Tweet.new_from_attrs(tweet.attrs).save
end
end
end
def add_params(params)
# do stuff to modify @params
end
def remove_params(params)
# do stuff to modify @params
end
end
$streamer = TwitterStreamer.new
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment