Skip to content

Instantly share code, notes, and snippets.

@ghifarit53
Created June 14, 2024 12:21
Show Gist options
  • Save ghifarit53/af81af010a8b18aecfa880055ea567af to your computer and use it in GitHub Desktop.
Save ghifarit53/af81af010a8b18aecfa880055ea567af to your computer and use it in GitHub Desktop.

Wrapper around https://github.com/helmisatria/tweet-harvest

Usage:

$ python3 harvest.py yyy mm dd n keyword
  • yyyy: year (e.g. 2018)
  • mm: month (e.g. 08)
  • dd: date (e.g. 20)
  • n: minimum number of tweets to scrape (e.g. 30)
  • keyword: keyword to search (e.g. 'gempa bumi')

keyword need to be wrapped with quote (' or ") if contains a space

from time import sleep
from datetime import datetime, timedelta
import os
import sys
import calendar
import subprocess
def get_twitter_token():
twitter_token = os.environ.get("TWITTER_TOKEN")
if twitter_token is not None:
return twitter_token
# If the environment variable is not set, try reading from .env file
try:
with open(".env", "r") as env_file:
for line in env_file:
if line.startswith("TWITTER_TOKEN="):
return line.strip().split("=", 1)[1]
except FileNotFoundError:
pass
return None
def get_last_date_of_month(year, month):
last_date = calendar.monthrange(year, month)[1]
return last_date
def get_date_range(date, month, year):
last_date = get_last_date_of_month(year, month)
date_range_list = []
if last_date is not None:
start_date = datetime(year, month, date)
next_day = start_date + timedelta(days=1)
while start_date <= datetime(year, month, last_date):
date_range_list.append((start_date.strftime('%d-%m-%Y'),
next_day.strftime('%d-%m-%Y')))
start_date, next_day = next_day, next_day + timedelta(days=1)
return date_range_list
def to_yyyy_mm_dd(input_date):
input_date_obj = datetime.strptime(input_date, "%d-%m-%Y")
output_date = input_date_obj.strftime("%Y-%m-%d")
return output_date
def run_tweet_harvest(twitter_token, date_range, tweet_limit, search_keyword):
command = "tweet-harvest"
# yapf: disable
args = [
"--token", twitter_token,
"-f", str(date_range[0]),
"-t", str(date_range[1]),
"-l", str(tweet_limit),
"-s", search_keyword,
"-o", f"{search_keyword}-{to_yyyy_mm_dd(date_range[0])}"
]
# yapf: enable
p = subprocess.Popen([command] + args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
n_scrolling = 0 # Count the numbers of scrolling
current_n = 0
# got_atleast_one = False
for line in p.stdout:
decoded = line.decode("utf-8").strip()
# print(decoded)
if "Total tweets saved:" in decoded:
# got_atleast_one = True
n = int(decoded.split(":")[1].strip())
if n > current_n:
current_n = n
print(f"Found {n} tweets")
if "Scrolling more..." in decoded:
n_scrolling += 1
if "limit" in decoded or n_scrolling == 4:
p.terminate()
return -1
if "Already got" in decoded:
n = int(decoded.split()[2].strip())
if n < tweet_limit:
p.terminate()
print(
f"Expected {tweet_limit} tweets, but only found {current_n}"
)
return -1
# if max_n < tweet_limit and got_atleast_one == True:
# p.terminate()
# print(f"Expected {tweet_limit} tweets, but only found {max_n}")
# return -1
_, _ = p.communicate()
return 0
def main():
# Get twitter token from environment variable
twitter_token = get_twitter_token()
if twitter_token is None:
print("ERROR: TWITTER_TOKEN environment variable not found")
exit(1)
if len(sys.argv) != 6:
print(f"ERROR: Expected 5 arguments, got {len(sys.argv) - 1}")
exit(1)
year = int(sys.argv[1])
month = int(sys.argv[2])
start_date = int(sys.argv[3])
tweet_limit = int(sys.argv[4])
search_keyword = sys.argv[5]
for date_range in get_date_range(start_date, month, year):
nice_date_format = datetime.strptime(date_range[0],
"%d-%m-%Y").strftime("%B %d, %Y")
while True:
print("===== HARVESTING STARTED =====")
print(f"Finding tweets from {nice_date_format}")
print(f"Minimum tweets is {tweet_limit}")
print(f"Search keyword is '{search_keyword}'")
status = run_tweet_harvest(twitter_token, date_range, tweet_limit,
search_keyword)
if status == -1:
print("===== HARVESTING TERMINATED =====")
print("Possibly rate limited, re-attempting in 2 minutes")
sleep(120) # Wait for 5 minutes before retrying
elif status == 0:
print("===== HARVESTING FINISHED =====")
print("Now resting for 20 seconds")
sleep(20)
break # Break out of the retry loop if harvesting was successful
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment