Skip to content

Instantly share code, notes, and snippets.

@cristianrasch
Created March 27, 2019 11:47
Show Gist options
  • Save cristianrasch/5a5d6c99886c616198254c3a8d1a7a18 to your computer and use it in GitHub Desktop.
Save cristianrasch/5a5d6c99886c616198254c3a8d1a7a18 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
# sed 's/https:\/\/www.twitter.com\///' -i ~/Downloads/twitter_accounts.csv
require "csv"
require "English"
require "logger"
require "pathname"
require "set"
require_relative "../models" # also sets up Bundler
require_relative "../config/initializers/twitter"
require "active_support/time"
ACCOUNTS_TXT_PATH = Pathname(ENV.fetch("ACCOUNTS_TXT_PATH"))
ACCOUNTS_CSV_PATH = ACCOUNTS_TXT_PATH.sub_ext(".csv")
ID_HEADER = -"id"
SCREEN_NAME_HEADER = -"screen_name"
ACCOUNTS_CSV_HEADERS = [ID_HEADER, SCREEN_NAME_HEADER].freeze
HANDLES_BY_USERS_LOOKUP_CALL = 100
SCREEN_NAME_BY_ID = {}
USR_LOOKUPS_PER_15_MIN_WIN = 300
STATUS_LOOKUPS_PER_15_MIN_WIN = 1_500
SLEEP_TIME = 15.minutes.to_i
MAX_USR_TL_STATUS_CNT = 200
USR_TL_PARAMS = { count: MAX_USR_TL_STATUS_CNT,
trim_user: true,
exclude_replies: true,
include_rts: false,
contributor_details: false }.freeze
TZ = ENV.fetch("AS_TZ")
URL_RE = %r(https?://\S+)i
MAX_ERR_CNT = 5
BLACKLIST = if (bl_path = Pathname(ENV.fetch("BLACKLIST_PATH"))).exist?
bl_path.readlines.
reject(&:blank?).
map! { |word| /\b#{Regexp.escape(word.chomp)}\b/i }.
freeze
else
[].freeze
end
MAIN_THR_SLEEP_TIME = 15
JOIN_WORKER_TIMEOUT = 10
RACK_ENV = ENV.fetch("RACK_ENV", "development").freeze
SAVE_ONLY_TWEETS_W_URLS = ENV["SAVE_ONLY_TWEETS_W_URLS"] == "true"
log_dir = Pathname("log")
log_dir.mkpath
$script_name = Pathname(__FILE__).basename
log_path = log_dir.join($script_name).sub_ext(".log")
file = File.open(log_path, File::WRONLY | File::APPEND | File::CREAT)
$logger = Logger.new(file, 7, 1024*1024)
$logger.level = case
when RACK_ENV == "development" || ENV["VERBOSE"] == "true"
Logger::DEBUG
when ENV["DEBUG"] == "true"
Logger::INFO
else
Logger::ERROR
end
def write_pid_file
pids_path = Pathname("tmp/pids")
pids_path.mkpath
$pid_path = pids_path.join($script_name).sub_ext(".pid")
$pid_path.write(String(Process.pid))
end
write_pid_file
$shutdown = false
on_int_or_term_signal = proc {
# signal handler may have already deleted the PIDFILE
$pid_path.unlink if $pid_path.exist?
if $ERROR_INFO
$logger.error "#{$ERROR_INFO} - POS: #{$ERROR_POSITION}"
end
$shutdown = true
}
Signal.trap("INT", &on_int_or_term_signal)
Signal.trap("TERM", &on_int_or_term_signal)
at_exit(&on_int_or_term_signal)
def write_user_ids_to_csv_file(screen_name_by_id = SCREEN_NAME_BY_ID)
unless ACCOUNTS_CSV_PATH.exist?
ACCOUNTS_CSV_PATH.open("w") do |f|
f.puts(ACCOUNTS_CSV_HEADERS)
end
end
CSV.open(ACCOUNTS_CSV_PATH, "ab") do |csv|
SCREEN_NAME_BY_ID.each do |user_id, screen_name|
csv << [user_id, screen_name.downcase]
end
end
end
def pause
$logger.warn "Sleeping for #{SLEEP_TIME/60} minutes"
sleep(SLEEP_TIME)
end
# rest_client.users => GET users/lookup
# Requests / 15-min window (app auth) 300
def users_lookup(handles, &block)
begin
rest_cli = $rest_clients.shift
users = rest_cli.users(handles.join(","), include_entities: false,
tweet_mode: :extended)
rescue Twitter::Error::NotFound => err
$logger.warn err.message
users = []
rescue Twitter::Error::TooManyRequests => err
$logger.warn err.message
users = []
pause
retry
ensure
$rest_clients << rest_cli
end
block.call(users)
end
new_screen_names = ACCOUNTS_TXT_PATH.readlines.map! { |screen_name|
screen_name.chomp!.downcase
}
missing_screen_names = []
if ACCOUNTS_CSV_PATH.exist?
CSV.foreach(ACCOUNTS_CSV_PATH, headers: true) do |row|
SCREEN_NAME_BY_ID[row[ID_HEADER]] = row[SCREEN_NAME_HEADER]
end
existing_screen_names = SCREEN_NAME_BY_ID.values
missing_screen_names = new_screen_names - existing_screen_names
unless missing_screen_names.empty?
$logger.debug "MISSING_SCREEN_NAMES: #{missing_screen_names.size} -- #{missing_screen_names.inspect}"
missing_screen_names.each_slice(HANDLES_BY_USERS_LOOKUP_CALL).
with_index do |handles, i|
idx = i + 1
pause if (idx % USR_LOOKUPS_PER_15_MIN_WIN).zero?
$logger.debug "SLICE ##{idx}: #{handles.size} handles"
CSV.open(ACCOUNTS_CSV_PATH, "ab") do |csv|
users_lookup(handles) do |users|
users.each do |user|
if User.create(id: user.id, screen_name: user.screen_name)
SCREEN_NAME_BY_ID[String(user.id)] = user.screen_name
csv << [user.id, user.screen_name.downcase]
end
end
end
end
end
end
legacy_screen_names = existing_screen_names - new_screen_names
unless legacy_screen_names.empty?
User.where(screen_name: legacy_screen_names).delete
legacy_screen_names = Set.new(legacy_screen_names)
SCREEN_NAME_BY_ID.delete_if { |_, screen_name| legacy_screen_names.include?(screen_name) }
ACCOUNTS_CSV_PATH.unlink
write_user_ids_to_csv_file
end
else
new_screen_names.each_slice(HANDLES_BY_USERS_LOOKUP_CALL).
with_index do |handles, i|
idx = i + 1
pause if (idx % USR_LOOKUPS_PER_15_MIN_WIN).zero?
$logger.debug "SLICE ##{idx}: #{handles.size} handles"
screen_name_by_id = {}
users_lookup(handles) do |users|
users.each do |user|
if User.create(id: user.id, screen_name: user.screen_name)
screen_name_by_id[String(user.id)] = user.screen_name
end
end
end
write_user_ids_to_csv_file(screen_name_by_id)
SCREEN_NAME_BY_ID.merge!(screen_name_by_id)
end
end
workers = $rest_clients.map.with_index { |rest_client, i|
Thread.new(rest_client, i) do |rest_cli, j|
if j.zero?
SCREEN_NAME_BY_ID.clear
User.seed unless missing_screen_names.empty? || RACK_ENV == "development"
end
idx = 1
err_cnt = 0
bow = Time.use_zone(TZ) { Time.zone.now }.beginning_of_week
forced_nap = false
loop do
break if $shutdown
begin
User.db.transaction do
user = User.for_update.
skip_locked.
order(Sequel.asc(:last_timeline_check_at, nulls: :first),
Sequel.desc(:since_tweet_id, nulls: :last)).
first
break if $shutdown
done = false
begin
if $shutdown
done = true
else
if (idx % STATUS_LOOKUPS_PER_15_MIN_WIN).zero?
bow = Time.use_zone(TZ) { Time.zone.now }.beginning_of_week
raise Sequel::Rollback
end
params = if user.since_tweet_id
USR_TL_PARAMS.merge(since_id: user.since_tweet_id)
else
USR_TL_PARAMS.dup
end
tweets = []
begin
$logger.info "Reading #{user.screen_name}'s TL (since_id = #{user.since_tweet_id.inspect})"
# rest_cli.user_timeline => GET statuses/user_timeline
# Requests / 15-min window (app auth) 1500
tweets = rest_cli.user_timeline(user.id, params)
rescue Twitter::Error::Unauthorized => err
$logger.warn "Not authorized to access TL of user '#{user.screen_name}'"
rescue Twitter::Error::TooManyRequests => err
$logger.warn err.message
forced_nap = true
raise Sequel::Rollback
ensure
idx += 1
end
tweets.select! do |tweet|
if tweet.created_at < bow
$logger.debug "Skipping tweet ##{tweet.id} because it was created before #{bow} [#{tweet.created_at}]"
next(false)
end
txt = tweet.text
tweet_contains_urls = tweet.urls? || (md = txt.match(URL_RE))
msg = nil
if SAVE_ONLY_TWEETS_W_URLS && !tweet_contains_urls
msg = "did not contain any URL"
elsif !SAVE_ONLY_TWEETS_W_URLS && tweet_contains_urls
msg = "contained URLs"
end
if msg
$logger.info "Skipping tweet ##{tweet.id} because it #{msg}. Tweet#urls? [#{tweet.urls?}] - match: #{md.to_s.inspect}"
next(false)
end
if md = BLACKLIST.find { |re| txt.match(re) }
$logger.info "Skipping tweet ##{tweet.id} because it matched blacklisted keyword: '#{md}'"
next(false)
end
true
end
# Only fetch the MAX_USR_TL_STATUS_CNT most recent tweets
# the 1st time a new user's TL is checked
unless done = tweets.empty? || !user.last_timeline_check_at
tweets.each do |tweet|
$logger.debug "[#{tweet.created_at}] TWEET: #{tweet.text} - FAVS: #{tweet.favorite_count}, RETWEETS: #{tweet.retweet_count}"
begin
user.add_tweet(id: tweet.id, created_at: tweet.created_at,
favorite_count: tweet.favorite_count,
retweet_count: tweet.retweet_count, text: tweet.text)
rescue Sequel::ValidationFailed => err
unless (id_errs = err.errors.on(:id)).size == 1 && id_errs.first =~ /already taken/
$logger.error "#{err.message}: #{err.errors.full_messages.inspect} -- #{err.model.inspect}"
end
end
end
user.update(since_tweet_id: tweets.max_by(&:id).id)
end
user.add_tl_status_check(tweet_count: tweets.size)
end
end until done
end
if (idx % STATUS_LOOKUPS_PER_15_MIN_WIN).zero? || forced_nap
pause
if forced_nap
forced_nap = false
else
idx += 1
end
bow = Time.use_zone(TZ) { Time.zone.now }.beginning_of_week
end
rescue => err
$logger.error err.message
$logger.error err.backtrace.join("\n")
err_cnt += 1
break if err_cnt > MAX_ERR_CNT
else
err_cnt = 0
end
end
end
}
loop do
sleep(MAIN_THR_SLEEP_TIME)
if $shutdown
workers.each do |worker|
res = begin
worker.join(JOIN_WORKER_TIMEOUT)
rescue => err
$logger.error err.message
$logger.error err.backtrace.join("\n")
true # thread already killed
end
Thread.kill(worker) unless res
end
break
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment