Created
March 27, 2019 11:47
-
-
Save cristianrasch/5a5d6c99886c616198254c3a8d1a7a18 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# sed 's/https:\/\/www.twitter.com\///' -i ~/Downloads/twitter_accounts.csv | |
require "csv" | |
require "English" | |
require "logger" | |
require "pathname" | |
require "set" | |
require_relative "../models" # also sets up Bundler | |
require_relative "../config/initializers/twitter" | |
require "active_support/time" | |
ACCOUNTS_TXT_PATH = Pathname(ENV.fetch("ACCOUNTS_TXT_PATH")) | |
ACCOUNTS_CSV_PATH = ACCOUNTS_TXT_PATH.sub_ext(".csv") | |
ID_HEADER = -"id" | |
SCREEN_NAME_HEADER = -"screen_name" | |
ACCOUNTS_CSV_HEADERS = [ID_HEADER, SCREEN_NAME_HEADER].freeze | |
HANDLES_BY_USERS_LOOKUP_CALL = 100 | |
SCREEN_NAME_BY_ID = {} | |
USR_LOOKUPS_PER_15_MIN_WIN = 300 | |
STATUS_LOOKUPS_PER_15_MIN_WIN = 1_500 | |
SLEEP_TIME = 15.minutes.to_i | |
MAX_USR_TL_STATUS_CNT = 200 | |
USR_TL_PARAMS = { count: MAX_USR_TL_STATUS_CNT, | |
trim_user: true, | |
exclude_replies: true, | |
include_rts: false, | |
contributor_details: false }.freeze | |
TZ = ENV.fetch("AS_TZ") | |
URL_RE = %r(https?://\S+)i | |
MAX_ERR_CNT = 5 | |
BLACKLIST = if (bl_path = Pathname(ENV.fetch("BLACKLIST_PATH"))).exist? | |
bl_path.readlines. | |
reject(&:blank?). | |
map! { |word| /\b#{Regexp.escape(word.chomp)}\b/i }. | |
freeze | |
else | |
[].freeze | |
end | |
MAIN_THR_SLEEP_TIME = 15 | |
JOIN_WORKER_TIMEOUT = 10 | |
RACK_ENV = ENV.fetch("RACK_ENV", "development").freeze | |
SAVE_ONLY_TWEETS_W_URLS = ENV["SAVE_ONLY_TWEETS_W_URLS"] == "true" | |
log_dir = Pathname("log") | |
log_dir.mkpath | |
$script_name = Pathname(__FILE__).basename | |
log_path = log_dir.join($script_name).sub_ext(".log") | |
file = File.open(log_path, File::WRONLY | File::APPEND | File::CREAT) | |
$logger = Logger.new(file, 7, 1024*1024) | |
$logger.level = case | |
when RACK_ENV == "development" || ENV["VERBOSE"] == "true" | |
Logger::DEBUG | |
when ENV["DEBUG"] == "true" | |
Logger::INFO | |
else | |
Logger::ERROR | |
end | |
def write_pid_file | |
pids_path = Pathname("tmp/pids") | |
pids_path.mkpath | |
$pid_path = pids_path.join($script_name).sub_ext(".pid") | |
$pid_path.write(String(Process.pid)) | |
end | |
write_pid_file | |
$shutdown = false | |
on_int_or_term_signal = proc { | |
# signal handler may have already deleted the PIDFILE | |
$pid_path.unlink if $pid_path.exist? | |
if $ERROR_INFO | |
$logger.error "#{$ERROR_INFO} - POS: #{$ERROR_POSITION}" | |
end | |
$shutdown = true | |
} | |
Signal.trap("INT", &on_int_or_term_signal) | |
Signal.trap("TERM", &on_int_or_term_signal) | |
at_exit(&on_int_or_term_signal) | |
def write_user_ids_to_csv_file(screen_name_by_id = SCREEN_NAME_BY_ID) | |
unless ACCOUNTS_CSV_PATH.exist? | |
ACCOUNTS_CSV_PATH.open("w") do |f| | |
f.puts(ACCOUNTS_CSV_HEADERS) | |
end | |
end | |
CSV.open(ACCOUNTS_CSV_PATH, "ab") do |csv| | |
SCREEN_NAME_BY_ID.each do |user_id, screen_name| | |
csv << [user_id, screen_name.downcase] | |
end | |
end | |
end | |
def pause | |
$logger.warn "Sleeping for #{SLEEP_TIME/60} minutes" | |
sleep(SLEEP_TIME) | |
end | |
# rest_client.users => GET users/lookup | |
# Requests / 15-min window (app auth) 300 | |
def users_lookup(handles, &block) | |
begin | |
rest_cli = $rest_clients.shift | |
users = rest_cli.users(handles.join(","), include_entities: false, | |
tweet_mode: :extended) | |
rescue Twitter::Error::NotFound => err | |
$logger.warn err.message | |
users = [] | |
rescue Twitter::Error::TooManyRequests => err | |
$logger.warn err.message | |
users = [] | |
pause | |
retry | |
ensure | |
$rest_clients << rest_cli | |
end | |
block.call(users) | |
end | |
new_screen_names = ACCOUNTS_TXT_PATH.readlines.map! { |screen_name| | |
screen_name.chomp!.downcase | |
} | |
missing_screen_names = [] | |
if ACCOUNTS_CSV_PATH.exist? | |
CSV.foreach(ACCOUNTS_CSV_PATH, headers: true) do |row| | |
SCREEN_NAME_BY_ID[row[ID_HEADER]] = row[SCREEN_NAME_HEADER] | |
end | |
existing_screen_names = SCREEN_NAME_BY_ID.values | |
missing_screen_names = new_screen_names - existing_screen_names | |
unless missing_screen_names.empty? | |
$logger.debug "MISSING_SCREEN_NAMES: #{missing_screen_names.size} -- #{missing_screen_names.inspect}" | |
missing_screen_names.each_slice(HANDLES_BY_USERS_LOOKUP_CALL). | |
with_index do |handles, i| | |
idx = i + 1 | |
pause if (idx % USR_LOOKUPS_PER_15_MIN_WIN).zero? | |
$logger.debug "SLICE ##{idx}: #{handles.size} handles" | |
CSV.open(ACCOUNTS_CSV_PATH, "ab") do |csv| | |
users_lookup(handles) do |users| | |
users.each do |user| | |
if User.create(id: user.id, screen_name: user.screen_name) | |
SCREEN_NAME_BY_ID[String(user.id)] = user.screen_name | |
csv << [user.id, user.screen_name.downcase] | |
end | |
end | |
end | |
end | |
end | |
end | |
legacy_screen_names = existing_screen_names - new_screen_names | |
unless legacy_screen_names.empty? | |
User.where(screen_name: legacy_screen_names).delete | |
legacy_screen_names = Set.new(legacy_screen_names) | |
SCREEN_NAME_BY_ID.delete_if { |_, screen_name| legacy_screen_names.include?(screen_name) } | |
ACCOUNTS_CSV_PATH.unlink | |
write_user_ids_to_csv_file | |
end | |
else | |
new_screen_names.each_slice(HANDLES_BY_USERS_LOOKUP_CALL). | |
with_index do |handles, i| | |
idx = i + 1 | |
pause if (idx % USR_LOOKUPS_PER_15_MIN_WIN).zero? | |
$logger.debug "SLICE ##{idx}: #{handles.size} handles" | |
screen_name_by_id = {} | |
users_lookup(handles) do |users| | |
users.each do |user| | |
if User.create(id: user.id, screen_name: user.screen_name) | |
screen_name_by_id[String(user.id)] = user.screen_name | |
end | |
end | |
end | |
write_user_ids_to_csv_file(screen_name_by_id) | |
SCREEN_NAME_BY_ID.merge!(screen_name_by_id) | |
end | |
end | |
workers = $rest_clients.map.with_index { |rest_client, i| | |
Thread.new(rest_client, i) do |rest_cli, j| | |
if j.zero? | |
SCREEN_NAME_BY_ID.clear | |
User.seed unless missing_screen_names.empty? || RACK_ENV == "development" | |
end | |
idx = 1 | |
err_cnt = 0 | |
bow = Time.use_zone(TZ) { Time.zone.now }.beginning_of_week | |
forced_nap = false | |
loop do | |
break if $shutdown | |
begin | |
User.db.transaction do | |
user = User.for_update. | |
skip_locked. | |
order(Sequel.asc(:last_timeline_check_at, nulls: :first), | |
Sequel.desc(:since_tweet_id, nulls: :last)). | |
first | |
break if $shutdown | |
done = false | |
begin | |
if $shutdown | |
done = true | |
else | |
if (idx % STATUS_LOOKUPS_PER_15_MIN_WIN).zero? | |
bow = Time.use_zone(TZ) { Time.zone.now }.beginning_of_week | |
raise Sequel::Rollback | |
end | |
params = if user.since_tweet_id | |
USR_TL_PARAMS.merge(since_id: user.since_tweet_id) | |
else | |
USR_TL_PARAMS.dup | |
end | |
tweets = [] | |
begin | |
$logger.info "Reading #{user.screen_name}'s TL (since_id = #{user.since_tweet_id.inspect})" | |
# rest_cli.user_timeline => GET statuses/user_timeline | |
# Requests / 15-min window (app auth) 1500 | |
tweets = rest_cli.user_timeline(user.id, params) | |
rescue Twitter::Error::Unauthorized => err | |
$logger.warn "Not authorized to access TL of user '#{user.screen_name}'" | |
rescue Twitter::Error::TooManyRequests => err | |
$logger.warn err.message | |
forced_nap = true | |
raise Sequel::Rollback | |
ensure | |
idx += 1 | |
end | |
tweets.select! do |tweet| | |
if tweet.created_at < bow | |
$logger.debug "Skipping tweet ##{tweet.id} because it was created before #{bow} [#{tweet.created_at}]" | |
next(false) | |
end | |
txt = tweet.text | |
tweet_contains_urls = tweet.urls? || (md = txt.match(URL_RE)) | |
msg = nil | |
if SAVE_ONLY_TWEETS_W_URLS && !tweet_contains_urls | |
msg = "did not contain any URL" | |
elsif !SAVE_ONLY_TWEETS_W_URLS && tweet_contains_urls | |
msg = "contained URLs" | |
end | |
if msg | |
$logger.info "Skipping tweet ##{tweet.id} because it #{msg}. Tweet#urls? [#{tweet.urls?}] - match: #{md.to_s.inspect}" | |
next(false) | |
end | |
if md = BLACKLIST.find { |re| txt.match(re) } | |
$logger.info "Skipping tweet ##{tweet.id} because it matched blacklisted keyword: '#{md}'" | |
next(false) | |
end | |
true | |
end | |
# Only fetch the MAX_USR_TL_STATUS_CNT most recent tweets | |
# the 1st time a new user's TL is checked | |
unless done = tweets.empty? || !user.last_timeline_check_at | |
tweets.each do |tweet| | |
$logger.debug "[#{tweet.created_at}] TWEET: #{tweet.text} - FAVS: #{tweet.favorite_count}, RETWEETS: #{tweet.retweet_count}" | |
begin | |
user.add_tweet(id: tweet.id, created_at: tweet.created_at, | |
favorite_count: tweet.favorite_count, | |
retweet_count: tweet.retweet_count, text: tweet.text) | |
rescue Sequel::ValidationFailed => err | |
unless (id_errs = err.errors.on(:id)).size == 1 && id_errs.first =~ /already taken/ | |
$logger.error "#{err.message}: #{err.errors.full_messages.inspect} -- #{err.model.inspect}" | |
end | |
end | |
end | |
user.update(since_tweet_id: tweets.max_by(&:id).id) | |
end | |
user.add_tl_status_check(tweet_count: tweets.size) | |
end | |
end until done | |
end | |
if (idx % STATUS_LOOKUPS_PER_15_MIN_WIN).zero? || forced_nap | |
pause | |
if forced_nap | |
forced_nap = false | |
else | |
idx += 1 | |
end | |
bow = Time.use_zone(TZ) { Time.zone.now }.beginning_of_week | |
end | |
rescue => err | |
$logger.error err.message | |
$logger.error err.backtrace.join("\n") | |
err_cnt += 1 | |
break if err_cnt > MAX_ERR_CNT | |
else | |
err_cnt = 0 | |
end | |
end | |
end | |
} | |
loop do | |
sleep(MAIN_THR_SLEEP_TIME) | |
if $shutdown | |
workers.each do |worker| | |
res = begin | |
worker.join(JOIN_WORKER_TIMEOUT) | |
rescue => err | |
$logger.error err.message | |
$logger.error err.backtrace.join("\n") | |
true # thread already killed | |
end | |
Thread.kill(worker) unless res | |
end | |
break | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment