|
from time import sleep |
|
from datetime import datetime, timedelta |
|
|
|
import os |
|
import sys |
|
import calendar |
|
import subprocess |
|
|
|
|
|
def get_twitter_token(): |
|
twitter_token = os.environ.get("TWITTER_TOKEN") |
|
|
|
if twitter_token is not None: |
|
return twitter_token |
|
|
|
# If the environment variable is not set, try reading from .env file |
|
try: |
|
with open(".env", "r") as env_file: |
|
for line in env_file: |
|
if line.startswith("TWITTER_TOKEN="): |
|
return line.strip().split("=", 1)[1] |
|
|
|
except FileNotFoundError: |
|
pass |
|
|
|
return None |
|
|
|
|
|
def get_last_date_of_month(year, month): |
|
last_date = calendar.monthrange(year, month)[1] |
|
return last_date |
|
|
|
|
|
def get_date_range(date, month, year): |
|
last_date = get_last_date_of_month(year, month) |
|
date_range_list = [] |
|
|
|
if last_date is not None: |
|
start_date = datetime(year, month, date) |
|
next_day = start_date + timedelta(days=1) |
|
|
|
while start_date <= datetime(year, month, last_date): |
|
date_range_list.append((start_date.strftime('%d-%m-%Y'), |
|
next_day.strftime('%d-%m-%Y'))) |
|
start_date, next_day = next_day, next_day + timedelta(days=1) |
|
|
|
return date_range_list |
|
|
|
|
|
def to_yyyy_mm_dd(input_date): |
|
input_date_obj = datetime.strptime(input_date, "%d-%m-%Y") |
|
output_date = input_date_obj.strftime("%Y-%m-%d") |
|
return output_date |
|
|
|
|
|
def run_tweet_harvest(twitter_token, date_range, tweet_limit, search_keyword): |
|
command = "tweet-harvest" |
|
# yapf: disable |
|
args = [ |
|
"--token", twitter_token, |
|
"-f", str(date_range[0]), |
|
"-t", str(date_range[1]), |
|
"-l", str(tweet_limit), |
|
"-s", search_keyword, |
|
"-o", f"{search_keyword}-{to_yyyy_mm_dd(date_range[0])}" |
|
] |
|
# yapf: enable |
|
|
|
p = subprocess.Popen([command] + args, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE) |
|
|
|
n_scrolling = 0 # Count the numbers of scrolling |
|
|
|
current_n = 0 |
|
# got_atleast_one = False |
|
for line in p.stdout: |
|
decoded = line.decode("utf-8").strip() |
|
# print(decoded) |
|
|
|
if "Total tweets saved:" in decoded: |
|
# got_atleast_one = True |
|
n = int(decoded.split(":")[1].strip()) |
|
if n > current_n: |
|
current_n = n |
|
print(f"Found {n} tweets") |
|
|
|
if "Scrolling more..." in decoded: |
|
n_scrolling += 1 |
|
|
|
if "limit" in decoded or n_scrolling == 4: |
|
p.terminate() |
|
return -1 |
|
|
|
if "Already got" in decoded: |
|
n = int(decoded.split()[2].strip()) |
|
if n < tweet_limit: |
|
p.terminate() |
|
print( |
|
f"Expected {tweet_limit} tweets, but only found {current_n}" |
|
) |
|
return -1 |
|
|
|
# if max_n < tweet_limit and got_atleast_one == True: |
|
# p.terminate() |
|
# print(f"Expected {tweet_limit} tweets, but only found {max_n}") |
|
# return -1 |
|
|
|
_, _ = p.communicate() |
|
return 0 |
|
|
|
|
|
def main(): |
|
# Get twitter token from environment variable |
|
twitter_token = get_twitter_token() |
|
if twitter_token is None: |
|
print("ERROR: TWITTER_TOKEN environment variable not found") |
|
exit(1) |
|
|
|
if len(sys.argv) != 6: |
|
print(f"ERROR: Expected 5 arguments, got {len(sys.argv) - 1}") |
|
exit(1) |
|
|
|
year = int(sys.argv[1]) |
|
month = int(sys.argv[2]) |
|
start_date = int(sys.argv[3]) |
|
tweet_limit = int(sys.argv[4]) |
|
search_keyword = sys.argv[5] |
|
|
|
for date_range in get_date_range(start_date, month, year): |
|
nice_date_format = datetime.strptime(date_range[0], |
|
"%d-%m-%Y").strftime("%B %d, %Y") |
|
while True: |
|
print("===== HARVESTING STARTED =====") |
|
print(f"Finding tweets from {nice_date_format}") |
|
print(f"Minimum tweets is {tweet_limit}") |
|
print(f"Search keyword is '{search_keyword}'") |
|
|
|
status = run_tweet_harvest(twitter_token, date_range, tweet_limit, |
|
search_keyword) |
|
|
|
if status == -1: |
|
print("===== HARVESTING TERMINATED =====") |
|
print("Possibly rate limited, re-attempting in 2 minutes") |
|
sleep(120) # Wait for 5 minutes before retrying |
|
elif status == 0: |
|
print("===== HARVESTING FINISHED =====") |
|
print("Now resting for 20 seconds") |
|
sleep(20) |
|
break # Break out of the retry loop if harvesting was successful |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |