Skip to content

Instantly share code, notes, and snippets.

Last active January 29, 2021 04:39
Show Gist options
  • Save JBGruber/dee4c44e7d38d537426f57ba1e4f84ab to your computer and use it in GitHub Desktop.
Save JBGruber/dee4c44e7d38d537426f57ba1e4f84ab to your computer and use it in GitHub Desktop.
Recovers damaged Twitter stream data (JSON file from rtweet) into parsed data frame.
#' Recovers Twitter damaged stream data (JSON file) into parsed data frame.
#' @param path Character, name of JSON file with data collected by
#' \code{\link{stream_tweets}}.
#' @param dir Character, name of a directory where intermediate files are
#' stored.
#' @param verbose Logical, should progress be displayed?
#' @family stream tweets
recover_stream <- function(path, dir = NULL, verbose = TRUE) {
# read file and split to tweets
lines <- readChar(path,$size, useBytes = TRUE)
tweets <- stringi::stri_split_fixed(lines, "\n{")[[1]]
tweets[-1] <- paste0("{", tweets[-1])
tweets <- tweets[!(tweets == "" | tweets == "{")]
# remove misbehaving characters
tweets <- gsub("\r", "", tweets, fixed = TRUE)
tweets <- gsub("\n", "", tweets, fixed = TRUE)
# write tweets to disk and try to read them in individually
if (is.null(dir)) {
dir <- paste0(tempdir(), "/tweets/")
dir.create(dir, showWarnings = FALSE)
if (verbose) {
pb <- progress::progress_bar$new(
format = "Processing tweets [:bar] :percent, :eta remaining",
total = length(tweets), clear = FALSE
tweets_l <- lapply(tweets, function(t) {
id <- unlist(stringi::stri_extract_first_regex(t, "(?<=id\":)\\d+(?=,)"))[1]
f <- paste0(dir, id, ".json")
writeLines(t, f, useBytes = TRUE)
out <- tryCatch(rtweet::parse_stream(f),
error = function(e) {})
if ("tbl_df" %in% class(out)) {
} else {
# test which ones failed
test <- vapply(tweets_l, is.character, FUN.VALUE = logical(1L))
bad_files <- unlist(tweets_l[test])
# Let user decide what to do
if (length(bad_files) > 0) {
message("There were ", length(bad_files),
" tweets with problems. Should they be copied to your working directory?")
sel <- menu(c("no", "yes", "copy a list with status_ids"))
if (sel == 2) {
dir.create(paste0(getwd(), "/broken_tweets/"), showWarnings = FALSE)
from = paste0(dir, bad_files, ".json"),
to = paste0(getwd(), "/broken_tweets/", bad_files, ".json")
} else if (sel == 3) {
writeLines(bad_files, "broken_tweets.txt")
# clean up
unlink(dir, recursive = TRUE)
# return good tweets
Copy link

Is there anyway to parallelize this and make it faster?

Copy link

JBGruber commented Aug 7, 2020

The slow part is basically the lapply loop. You could replace this with, for example, with pbapply::pblapply:

recover_stream <- function(path, dir = NULL, verbose = TRUE, cores = 1) {
  # read file and split to tweets
  lines <- readChar(path,$size, useBytes = TRUE)
  tweets <- stringi::stri_split_fixed(lines, "\n{")[[1]]
  tweets[-1] <- paste0("{", tweets[-1])
  tweets <- tweets[!(tweets == "" | tweets == "{")]
  # remove misbehaving characters
  tweets <- gsub("\r", "", tweets, fixed = TRUE)
  tweets <- gsub("\n", "", tweets, fixed = TRUE)
  # write tweets to disk and try to read them in individually
  if (is.null(dir)) {
    dir <- paste0(tempdir(), "/tweets/")
    dir.create(dir, showWarnings = FALSE)
  tweets_l <- pbapply::pblapply(tweets, function(t) {
    id <- unlist(stringi::stri_extract_first_regex(t, "(?<=id\":)\\d+(?=,)"))[1]
    f <- paste0(dir, id, ".json")
    writeLines(t, f, useBytes = TRUE)
    out <- tryCatch(rtweet::parse_stream(f),
                    error = function(e) {})
    if ("tbl_df" %in% class(out)) {
    } else {
  }, cl = cores)
  # test which ones failed
  test <- vapply(tweets_l, is.character, FUN.VALUE = logical(1L))
  bad_files <- unlist(tweets_l[test])
  # Let user decide what to do
  if (length(bad_files) > 0) {
    message("There were ", length(bad_files),
            " tweets with problems. Should they be copied to your working directory?")
    sel <- menu(c("no", "yes", "copy a list with status_ids"))
    if (sel == 2) {
      dir.create(paste0(getwd(), "/broken_tweets/"), showWarnings = FALSE)
        from = paste0(dir, bad_files, ".json"),
        to = paste0(getwd(), "/broken_tweets/", bad_files, ".json")
    } else if (sel == 3) {
      writeLines(bad_files, "broken_tweets.txt")
  # clean up
  unlink(dir, recursive = TRUE)
  # return good tweets

Use it like this:

recover_stream(file, cores = 3)

A disclaimer though: I tried to further develop this and I couldn't get it to work consistently. Often it works and recovers most or even all tweets. But sometimes it doesn't and I don't know why. Since my Twitter project came to an end, I eventually gave up. The respective issue is still open: ropensci/rtweet#354

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment