-
-
Save anonymous/07eeb33982affbfc1659f8edd160a5f0 to your computer and use it in GitHub Desktop.
R-Code zum Abrufen aller Twitter-Nachrichten aller Twitter-Konten aus Bruno Wüests Schweizerpolitik-Datensatz. Erstellt für den Blogeintrag "Twitter vs. Realwelt: Wo erzeugen welche Volksinitiativen die meiste Resonanz?" im UZH-Seminar "Politischer Datenjournalismus" 2016: http://pwipdm.uzh.ch/wordpress/?p=6504
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (C) 2016 Salim Brüggemann | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# To get a copy of the GNU General Public License see <http://www.gnu.org/licenses/>. | |
############################################################################### | |
# Alle Twitter-Nachrichten aller Twitter-Konten aus Bruno Wüests Schweizerpolitik-Datensatz herunterladen | |
# Autor: Salim Brüggemann (gestützt auf Code-Beispiele von Bruno Wüest) | |
############################################################################### | |
rm(list = ls(all = TRUE)) | |
#?????????????????????????????????????????????????????????????????????????????? | |
# Host und Anmeldedaten für MySQL-Server und Twitter definieren | |
mysql_host <- "localhost" | |
mysql_username <- "?" | |
mysql_password <- "?" | |
twitter_consumer_key <- "?" | |
twitter_consumer_secret <- "?" | |
twitter_access_token <- "?" | |
twitter_access_secret <- "?" | |
#?????????????????????????????????????????????????????????????????????????????? | |
# Relevante R-Pakete laden | |
library(RMySQL) | |
library(twitteR) | |
# Brunos Schweizerpolitik-Twitterkonten-Datensatz laden | |
conn <- dbConnect(MySQL(), | |
user = mysql_username, | |
pass = mysql_password, | |
host = mysql_host, | |
dbname = "twitter") | |
dbGetQuery(conn, | |
"set names utf8") | |
twitter_accounts_bruno <- dbGetQuery(conn, | |
"select * from `accounts`") | |
dbDisconnect(conn) | |
#?????????????????????????????????????????????????????????????????????????????? | |
# Twitter-Konten definieren, deren Tweets abgerufen werden sollen | |
target_IDs <- twitter_accounts_bruno$twitterID | |
#?????????????????????????????????????????????????????????????????????????????? | |
# Bereits abgefragte Twitter-Konten laden, um diese zu überspringen | |
if ( file.exists("twitter_scrape_job.Rda") ) { | |
load(file = "twitter_scrape_job.Rda") | |
} else { | |
tweets <- data.frame() | |
successfully_fetched_IDs <- c() | |
unfetchable_IDs <- data.frame("ID" = numeric(), | |
"message" = character(), | |
stringsAsFactors = FALSE) | |
protected_IDs <- c() | |
lazy_IDs <- c() | |
busy_IDs <- c() | |
} | |
# Noch abzurufende Twitter-Konten definieren (Mengendifferenz zwischen allen gewünschten und bereits abgerufenen Konten) | |
to_fetch <- setdiff(setdiff(setdiff(target_IDs, | |
successfully_fetched_IDs), | |
unfetchable_IDs$ID), | |
protected_IDs) | |
## Tweets abrufen | |
# Funktion definieren zur Twitter-Authentifizierung | |
twitter_authentication <- function() { | |
setup_twitter_oauth(twitter_consumer_key, | |
twitter_consumer_secret, | |
twitter_access_token, | |
twitter_access_secret) | |
} | |
# Funktionen definieren zum Abruf der rate limits | |
get_rate_limits_helper <- function() { | |
rate_limit_status <- tryCatch( | |
{ | |
getCurRateLimitInfo() | |
}, | |
warning = function(w) { | |
print(paste0("WARNING: ", as.character(w))) | |
return(-2) | |
}, | |
error = function(e) { | |
print(paste0("ERROR: ", as.character(e))) | |
return(-1) | |
} | |
) | |
return(rate_limit_status) | |
} | |
get_rate_limits <- function() { | |
rate_limit_status <- get_rate_limits_helper() | |
retry_count <- 0 | |
while ( length(rate_limit_status) == 1 ) { | |
if ( retry_count == 10 ) { | |
print("retrieval of rate limits failed repeatedly; retrying in 15 minutes") | |
Sys.sleep(901) | |
} else if ( retry_count > 20 ) stop("retrieval of rate limits still failing; aborting processing of further twitter accounts") | |
twitter_authentication() | |
rate_limit_status <- get_rate_limits_helper() | |
retry_count <- retry_count + 1 | |
} | |
return(rate_limit_status) | |
} | |
# Funktion definieren zum Abruf der Tweets einer bestimmenten Twitter-ID | |
get_tweets <- function(twitter_ID) { | |
tweets <- tryCatch( | |
{ | |
twitter_user <- getUser(twitter_ID) | |
if ( twitter_user$statusesCount > 3200 ) { | |
busy_IDs <<- union(busy_IDs, twitter_ID) | |
tweet_count <- 3200 | |
} else { | |
tweet_count <- twitter_user$statusesCount | |
} | |
# Prüfen, ob rate limit für die Twitter-API-Methode "GET statuses/user_timeline" ausreicht für alle Tweets der aktuellen Twitter-ID; Fehlertyp 4 zurückgeben, falls nicht | |
rate_limit_status <- get_rate_limits() | |
if ( as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/statuses/user_timeline"]) < ceiling(tweet_count/100) ) { | |
remaining_points <- as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/statuses/user_timeline"]) | |
list(-4, | |
paste0("can't fetch twitter account ", which(to_fetch == twitter_ID), " of ", length(to_fetch), " right now: ", twitter_user$name, " (screen name: ", twitter_user$screenName, " | ID: ", twitter_user$id, ")"), | |
paste0("account comprises ", tweet_count, " tweets but rate limit only allows for fetching ", remaining_points * 100, ifelse(remaining_points == 1, " more tweet", " more tweets"))) | |
} else { | |
# Tweets für aktuelle ID abrufen, sofern diese nicht protected ist | |
if ( !twitter_user$protected ) { | |
if ( tweet_count > 0 ) { | |
print(paste0("fetching up to ", tweet_count, " tweets from twitter account ", which(to_fetch == twitter_ID), " of ", length(to_fetch), ": ", twitter_user$name, " (screen name: ", twitter_user$screenName, " | ID: ", twitter_user$id, ")")) | |
userTimeline(twitter_user, n = tweet_count, includeRts = TRUE) | |
} else { | |
print(paste0("there are no tweets to fetch from twitter account ", which(to_fetch == twitter_ID), " of ", length(to_fetch), ": ", twitter_user$name, " (screen name: ", twitter_user$screenName, " | ID: ", twitter_user$id, ")")) | |
list() | |
} | |
# Fehlertyp 3 zurückgeben, falls die aktuelle Twitter-ID protected ist | |
} else return(list(-3, paste0("twitter account ", which(to_fetch == twitter_ID), " of ", length(to_fetch), " is protected: ", twitter_user$name, " (screen name: ", twitter_user$screenName, " | ID: ", twitter_user$id, ")"))) | |
} | |
}, | |
# Bei Auftreten einer Warnung, diese anzeigen und Fehlertyp 2 zurückgeben | |
warning = function(w) { | |
print(paste0("WARNING: ", as.character(w))) | |
return(list(-2, as.character(w))) | |
}, | |
# Bei Auftreten eines Fehlers, diesen anzeigen und Fehlertyp 1 zurückgeben | |
error = function(e) { | |
print(paste0("ERROR: ", as.character(e))) | |
return(list(-1, as.character(e))) | |
} | |
) | |
return(tweets) | |
} | |
# Funktion definieren zum Verarbeiten einer bestimmenten Twitter-ID (Speichern der Resultate, Fehlerbehandlung, Zuordnung zu den verschiedenen Gruppen (successfully_fetched, unfetchable, protected, lazy) etc.) | |
process_twitter_ID <- function(twitter_ID) { | |
error_count <- 0 | |
warnings_count <- 0 | |
new_tweets <- get_tweets(twitter_ID) | |
# Wenn von get_tweets() zurückgegebene Liste nicht leer ist, wurden die Tweets entweder erfolgreich abgerufen oder es ist ein Fehler aufgetreten | |
if ( length(new_tweets) > 0 ) { | |
# Negative Zahl als erstes Element von new_tweets bedeutet grundsätzlich Fehler | |
# Behandlung Fehlertyp 4: rate limit nicht ausreichend für aktuelle ID | |
if ( is.numeric(new_tweets[[1]]) ) { | |
if ( new_tweets[[1]] == -4 ) { | |
print(new_tweets[[2]]) | |
print(new_tweets[[3]]) | |
return(FALSE) | |
# Behandlung Fehlertyp 3: Aktuelle ID ist protected; als protected markieren und überspringen | |
} else if ( new_tweets[[1]] == -3 ) { | |
print(new_tweets[[2]]) | |
print("therefore skipping and marking it as protected") | |
protected_IDs <<- c(protected_IDs, twitter_ID) | |
return(TRUE) | |
} | |
} | |
while ( is.numeric(new_tweets[[1]]) && (new_tweets[[1]] == -1 || new_tweets[[1]] == -2) ) { | |
# Behandlung Fehlertyp 2: Bei Warnmeldung OAuth-Authentifikation wiederholen und nochmals versuchen | |
if ( new_tweets[[1]] == -2 && warnings_count < 3 ) { | |
warnings_count <- warnings_count + 1 | |
print("trying to re-register with OAuth credentials") | |
twitter_authentication() | |
new_tweets <- get_tweets(twitter_ID) | |
# Behandlung Fehlertyp 1: Bei Fehlermeldung OAuth-Authentifikation wiederholen und nochmals versuchen | |
} else if ( new_tweets[[1]] == -1 && error_count < 3 ) { | |
error_count <- error_count + 1 | |
print("trying to re-register with OAuth credentials") | |
twitter_authentication() | |
new_tweets <- get_tweets(twitter_ID) | |
} | |
# Nach 3 wiederholten Warn- bzw. Fehlermeldungen aktuelle ID als unfetchable markieren und überspringen | |
if ( error_count == 3 || warnings_count == 3 ) { | |
print(paste0("can't fetch twitter account with ID ", twitter_ID, " due to persistent failures")) | |
print(paste0("last warning/error was: ", new_tweets[[2]])) | |
print("skipping this twitter account and marking it as unfetchable") | |
unfetchable_IDs <<- rbind(unfetchable_IDs, data.frame("ID" = twitter_ID, "message" = new_tweets[[2]])) | |
return(TRUE) | |
} | |
} | |
# Ist die ID weder protected noch unfetchable, bedeutet das einen erfolgreichen Abruf der Tweets | |
if ( !is.numeric(new_tweets[[1]]) ) { | |
successfully_fetched_IDs <<- c(successfully_fetched_IDs, twitter_ID) | |
new_tweets <- twListToDF(new_tweets) | |
new_tweets$twitterUserID <- twitter_ID | |
tweets <<- rbind(tweets, new_tweets) | |
return(TRUE) | |
} | |
# Leere new_tweets bedeutet, es existieren keine Tweets von der aktuellen ID | |
} else { | |
successfully_fetched_IDs <<- c(successfully_fetched_IDs, twitter_ID) | |
lazy_IDs <<- c(lazy_IDs, twitter_ID) | |
return(TRUE) | |
} | |
} | |
# Bei Twitter authentifizieren | |
twitter_authentication() | |
# Ausstehende Twitter-Konten verarbeiten | |
start_time <- Sys.time() | |
fetched_ahead_IDs <- c() | |
omitted_IDs <- c() | |
for ( ID in to_fetch ) { | |
# Zuerst übersprungene IDs verarbeiten, sofern vorhanden | |
if ( length(omitted_IDs) > 0 ) { | |
for (omitted_ID in omitted_IDs) { | |
processed <- process_twitter_ID(omitted_ID) | |
# processed = FALSE bedeutet, verbleibendes rate limit reichte nicht aus zur Verarbeitung der aktuellen ID | |
if ( isTRUE(processed) ) omitted_IDs <- omitted_IDs[omitted_IDs != omitted_ID] | |
} | |
} | |
# ID nur verarbeiten, wenn nicht bereits zuvor geschehen | |
if ( !(ID %in% fetched_ahead_IDs) ) { | |
processed <- process_twitter_ID(ID) | |
} else { | |
fetched_ahead_IDs <- fetched_ahead_IDs[fetched_ahead_IDs != ID] | |
processed <- TRUE | |
} | |
# processed = FALSE bedeutet, verbleibendes rate limit reichte nicht aus zur Verarbeitung der aktuellen ID | |
if ( !isTRUE(processed) ) { | |
omitted_IDs <- c(omitted_IDs, ID) | |
# Versuchen, weitere Twitter-Konten zu verarbeiten, so lange rate limits nicht gänzlich erschöpft sind | |
next_ID <- ID | |
first_try <- TRUE | |
rate_limit_status <- get_rate_limits() | |
rate_limit_exploited <- as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/statuses/user_timeline"]) == 0 | |
while ( !rate_limit_exploited ) { | |
ID_position <- which(to_fetch == next_ID) | |
if ( (as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/users/show/:id"]) > 0) && (ID_position + 1 < length(to_fetch)) ) { | |
if ( first_try ) print(paste0("postponing processing of twitter account ", ID_position, " of ", length(to_fetch), " and trying with subsequent accounts")) | |
first_try <- FALSE | |
next_ID <- to_fetch[ID_position + 1] | |
if ( !(next_ID %in% fetched_ahead_IDs) ) { | |
processed <- process_twitter_ID(next_ID) | |
if ( isTRUE(processed) ) fetched_ahead_IDs <- c(fetched_ahead_IDs, next_ID) | |
} | |
} else rate_limit_exploited <- TRUE | |
if ( !rate_limit_exploited ) { | |
rate_limit_status <- get_rate_limits() | |
rate_limit_exploited <- as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/statuses/user_timeline"]) == 0 | |
if ( !rate_limit_exploited ) rate_limit_exploited <- as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/application/rate_limit_status"]) < 3 | |
} | |
} | |
# Warten bis das rate limit für die Twitter-API-Methode "GET statuses/user_timeline" zurückgesetzt ist | |
first_wait <- TRUE | |
while ( rate_limit_exploited ) { | |
time_to_wait <- as.numeric(rate_limit_status$reset[rate_limit_status$resource=="/statuses/user_timeline"]) - (as.numeric(max(rate_limit_status$reset)) - 901) | |
mins_to_wait <- time_to_wait %/% 60 | |
secs_to_waits <- time_to_wait %% 60 | |
print(paste0(ifelse(first_wait, | |
paste0("rate limit reached; must wait for ", mins_to_wait, ifelse(mins_to_wait == 1, " minute and ", " minutes and "), secs_to_waits, ifelse(secs_to_waits == 1, " second", " seconds")), | |
"rate limit not yet reset; waiting for another 30 seconds"))) | |
if ( first_wait ) Sys.sleep(time_to_wait) else Sys.sleep(30) | |
rate_limit_status <- get_rate_limits() | |
rate_limit_exploited <- as.numeric(rate_limit_status$remaining[rate_limit_status$resource=="/statuses/user_timeline"]) < 180 | |
first_wait <- FALSE | |
} | |
} | |
} | |
# für den Abruf der Twitter-Konten benötigte Zeit ausgeben | |
end_time <- Sys.time() | |
secs_lasted <- as.numeric(difftime(end_time, start_time, units = "secs")) | |
hours_lasted <- secs_lasted %/% 3600 | |
mins_lasted <- (secs_lasted %/% 60) - (hours_lasted * 60) | |
print(paste0("this twitter scrape job took ", ifelse(hours_lasted > 0, paste0(hours_lasted, " hours, "), ""), ifelse(mins_lasted > 0, paste0(mins_lasted, " minutes and "), ""), round(secs_lasted %% 60, digits = 0), " seconds")) | |
# Ergebnis in lokale .Rda-Datei speichern | |
save(tweets, | |
successfully_fetched_IDs, | |
unfetchable_IDs, | |
protected_IDs, | |
lazy_IDs, | |
busy_IDs, | |
file = "twitter_scrape_job.Rda", | |
compress = "xz", | |
compression_level = 9) | |
############################################################################### | |
## Diverse Checks | |
# Wurden alle Twitter-Konten während der Verarbeitung berücksichtigt? | |
all_processed <- setequal(target_IDs, union(successfully_fetched_IDs, union(unfetchable_IDs$ID, protected_IDs))) | |
all_processed | |
if ( !all_processed ) setdiff(target_IDs, union(successfully_fetched_IDs, union(unfetchable_IDs$ID, protected_IDs))) | |
# Wurden alle Twitter-Konten korrekt verarbeitet? Falls nicht, welche nicht? Wie lautete jeweils die letzte Warn-/Fehlermeldung? | |
if ( isTRUE(length(unfetchable_IDs$ID) > 0) ) { | |
print(FALSE) | |
unfetchable_IDs | |
} else print(TRUE) | |
# Wurden alle Twitter-Konten jeweils nur einmal verarbeitet (keine Duplikate)? Falls nicht, wo finden sich welche Duplikate? | |
no_duplicates <- data.frame("variable" = c(deparse(substitute(successfully_fetched_IDs)), | |
deparse(substitute(unfetchable_IDs)), | |
deparse(substitute(protected_IDs)), | |
deparse(substitute(lazy_IDs))), | |
"processed_once" = c(rep(FALSE, 4)), | |
stringsAsFactors = FALSE) | |
for ( variable in no_duplicates$variable ) no_duplicates$processed_once[no_duplicates$variable==variable] <- isTRUE(length(get(variable)) == length(unique(get(variable)))) | |
no_duplicates | |
for ( variable in no_duplicates$variable[no_duplicates$processed_once == FALSE] ) { | |
print(variable) | |
print(table(get(variable))[table(get(variable)) > 1]) | |
} | |
# Wurden die Twitter-Konten alle korrekt den verschiedenen Gruppen (successfully_fetched, unfetchable, protected, lazy) zugeordnet (keine unzulässigen Überschneidungen)? | |
print(paste(length(intersect(unfetchable_IDs, successfully_fetched_IDs)) == 0, "for", no_duplicates$variable[2], "vs.", no_duplicates$variable[1])) | |
print(paste(length(intersect(protected_IDs, successfully_fetched_IDs)) == 0, "for", no_duplicates$variable[3], "vs.", no_duplicates$variable[1])) | |
print(paste(length(intersect(unfetchable_IDs, protected_IDs)) == 0, "for", no_duplicates$variable[2], "vs.", no_duplicates$variable[3])) | |
print(paste(length(setdiff(lazy_IDs, successfully_fetched_IDs)) == 0, "for", no_duplicates$variable[4], "vs.", no_duplicates$variable[1])) | |
# Von wievielen und welchen Twitter-Konten konnten nicht alle Tweets gefetcht werden aufgrund des API-Limits (die 3200 neusten Tweets)? | |
length(busy_IDs) | |
busy_IDs | |
# Welche Twitter-Konten in Brunos Datensatz mit TweetCount = 0 haben zwischenzeitlich getweetet? | |
lazy_IDs_old <- subset(twitter_accounts_bruno, tweetCount == 0)$twitterID | |
setdiff(lazy_IDs_old, lazy_IDs) | |
# Welche Twitter-Konten sind in Brunos Datensatz als protected markiert, mittlerweile jedoch unprotected? | |
protected_IDs_old <- subset(twitter_accounts_bruno, protected == 1)$twitterID | |
setdiff(protected_IDs_old, protected_IDs) | |
# Welche Twitter-Konten sind in Brunos Datensatz nicht als protected markiert, mittlerweile jedoch protected? | |
setdiff(protected_IDs, protected_IDs_old) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment