import tweepy
from import bigquery
import os
from datetime import datetime
from typing import Dict, List, Tuple, Union, Any, Optional
# Load auth from env for security
auth: tweepy.OAuthHandler = tweepy.OAuthHandler(os.getenv("consumer_key"), os.getenv("consumer_secret"))
# Global flags to control printing
print_start: bool = True
print_time: bool = True
def timeit(method):
Decorator to print the name of the function when it's called and the time it took to execute.
method (function): The function to be timed and printed.
function: Wrapped function that prints its name and execution time.
def timed(*args, **kw):
Prints the name of the function, gets the start time, runs the function,
finds the end time, then prints the difference in time.
*args: Variable length argument list for the original function.
**kw: Arbitrary keyword arguments for the original function.
Any: Result from the original method.
if print_start:
ts: datetime =
result = method(*args, **kw)
te: datetime =
if print_time:
print(f"<<{method.__name__}>> time to execute: {te - ts}\n-----------------------------------------")
return result
return timed
def get_recent_updates(api: object, threshold_time: datetime) -> Tuple[Dict[str, Dict[str, List[str]]], List[str]]:
Get recent direct messages, filter them by timestamp, and process the contents for updating usernames.
api (object): An instance of the API object for making requests.
threshold_time (datetime): The time threshold for filtering recent direct messages.
Tuple[Dict[str, Dict[str, List[str]]], List[str]]:
- A dictionary of usernames to update with actions.
- A list of changing usernames.
# Get direct messages
dms = api.list_direct_messages()
# Filter direct messages based on timestamp
recent_dms = [dm for dm in dms if datetime.datetime.fromtimestamp(int(dm.created_timestamp) / 1000) > threshold_time]
usernames_to_update: Dict[str, Dict[str, List[str]]] = {}
for dm in recent_dms:
message_body = dm.message_create
# Check sender_id to exclude a specific id
if message_body["sender_id"] != "id_to_remove":
sender_id = message_body["sender_id"]
message_contents = message_body["message_data"]["text"].lower()
user_mentions = message_body["message_data"]["entities"]["user_mentions"]
names_to_change = []
# Process user mentions in the message
if user_mentions:
for user_mention in user_mentions:
screen_name = user_mention["screen_name"].lower()
# Initialize the user if not in the dictionary
if screen_name not in usernames_to_update:
usernames_to_update[screen_name] = {"add": [], "remove": []}
# Check if the message contains stop or remove keywords
if "stop" in message_contents or "remove" in message_contents:
usernames_to_update[screen_name]["remove"] = list(set(usernames_to_update[screen_name]["remove"] + [sender_id]))
message_to_user = f"Thanks, I've got your request to unfollow {screen_name}."
message_user(sender_id, message_to_user, api)
usernames_to_update[screen_name]["add"] = list(set(usernames_to_update[screen_name]["add"] + [sender_id]))
message_to_user = f"""Thanks, I've got your request to follow {screen_name}. You can follow a maximum of 20.
If you'd like to stop following that name, just send another message containing "stop following @{screen_name}\""""
message_user(sender_id, message_to_user, api)
changing_usernames = list(usernames_to_update.keys())
return usernames_to_update, changing_usernames
def get_records(client: object, table_name: str, identifier_column: str) -> Tuple[Dict[str, Dict[str, Any]], List[str]]:
Fetch records from a BigQuery table and process them into a dictionary and a list.
client (object): BigQuery client instance to execute queries.
table_name (str): Name of the BigQuery table.
identifier_column (str): The column to be used as the main identifier for the dictionary.
Tuple[Dict[str, Dict[str, Any]], List[str]]:
- A dictionary of records with identifier_column as key.
- A list of identifier values.
# Constructing the query
query = f"""
FROM {table_name}
query_job = client.query(query)
results = query_job.result() # Waits for job to complete.
results_dict: Dict[str, Dict[str, Any]] = {}
names_list: List[str] = []
for row in results:
row_dict = dict(row)
col_key = row_dict[identifier_column]
results_dict[col_key] = {}
# Iterating through rows to populate results_dict
for key, value in row_dict.items():
if key != identifier_column:
results_dict[col_key][key] = value
except Exception as e:
raise (e)
return results_dict, names_list
def get_to_change(columns_to_check: str, usernames_tuple: Tuple[str, ...]) -> List[Dict[str, Any]]:
Fetch records from a BigQuery table based on certain columns and usernames.
columns_to_check (str): Column name to be used for filtering results.
usernames_tuple (Tuple[str, ...]): Tuple containing the usernames to be checked.
List[Dict[str, Any]]: A list of records from the table.
# Constructing the query using f-strings
query = f"""
FROM twitter_name_change.test_table
WHERE {columns_to_check} in {usernames_tuple}
query_job = client.query(query)
results = query_job.result() # Waits for job to complete.
records = [dict(row) for row in results]
return records
def get_dm(id_value: str, username: str, previous_username: str, profile_image: str, previous_display_url: str) -> str:
Generate a direct message based on username and profile image changes.
id_value (str): User ID.
username (str): Current username.
previous_username (str): Previous username.
profile_image (str): Current profile image URL.
previous_display_url (str): Previous profile image URL.
str: Generated direct message.
# Check if username changed
if username != previous_username:
message = f"{id_value.lower()} was {previous_username} and is now {username}. T"
message = f"{username} hasn't changed name but t"
# Check if profile image changed
if profile_image != previous_display_url:
message += f"heir profile picture changed from {previous_display_url} to {profile_image}"
message += "heir profile picture hasn't changed."
return message
def get_tweet(id_value: str, username: str, previous_username: str, profile_image: str, previous_display_url: str, user_length: int) -> str:
Generate a tweet message based on username and profile image changes and the number of users informed.
id_value (str): User ID.
username (str): Current username.
previous_username (str): Previous username.
profile_image (str): Current profile image URL.
previous_display_url (str): Previous profile image URL.
user_length (int): Number of users informed.
str: Generated tweet message.
# Determine if the message should use "user" or "users"
users_or_user = "users" if user_length > 1 else "user"
# Construct the message based on the change in username
if username != previous_username:
message = f"I just DMd {user_length} {users_or_user} about {id_value.lower()} changing username from {previous_username} to {username}."
message = f"I just DMd {user_length} {users_or_user} about {id_value.lower()}."
# Append the change in profile picture, if any
if profile_image != previous_display_url:
message += f" Their profile picture changed from {previous_display_url} to {profile_image}."
return message
def tweet_message(message: str, api: Any) -> None:
Post a tweet using the provided API instance.
message (str): Message to be tweeted.
api (Any): Tweepy API instance.
print("Error posting message")
def message_user(user: str, message: str, api: Any = tweepy.API(auth)) -> None:
Send a direct message to a user using the provided API instance.
user (str): ID of the recipient.
message (str): Message to be sent.
api (Any): Tweepy API instance.
api.send_direct_message(int(user), text=message)
except Exception as e:
print(f"Failed to message user: {type(user)} -- {user}")
from typing import Dict, Tuple, Union, Optional
def check_on_twitter(record_to_check_key: str,
record_to_check_values: Dict[str, Union[str, None]],
username_col: str,
display_img_col: str,
api: Optional[tweepy.API] = tweepy.API(auth)) -> Tuple[Dict[str, Union[str, None]], bool, bool]:
Queries Twitter to get user details based on a record key and compares it against existing data.
Sends messages and tweets based on changes detected.
record_to_check_key: The key used to check the record.
record_to_check_values: The dictionary containing values associated with the record.
username_col: The column/key in the dictionary indicating the username.
display_img_col: The column/key in the dictionary indicating the display image URL.
api: The tweepy API instance to use for Twitter interactions.
Tuple containing the updated record, a flag if the record has changed, and a flag if it's a new record.
# Get the ID value from the record key
id_value = record_to_check_key
# Try to retrieve user information from Twitter
new_record = api.get_user(id_value)
print ("username {} retrieval failed".format(id_value))
return None, None, None
# Extract user details from the response
user_info = new_record._json
username = user_info["name"]
profile_image = user_info["profile_image_url_https"]
# Get previous username and display URL; default to None if not found
previous_username = record_to_check_values[username_col]
previous_display_url = record_to_check_values[display_img_col]
previous_username = None
previous_display_url = None
# Initialize dictionary for values to upload (currently unused, may be useful later)
values_to_upload = {}
# Check if the current data matches the previous data
if username != previous_username or profile_image != previous_display_url:
changed = True
# If both previous data are present, send messages and tweets
if previous_username and previous_display_url:
dm = get_dm(id_value, username, previous_username, profile_image, previous_display_url)
interested_handles = record_to_check_values["interested_handles"]
print ("Interested handles: {}".format(interested_handles))
users_to_message_list = interested_handles.split(",")
print ("users_to_message_list: {}".format(users_to_message_list))
for user in list(set(users_to_message_list)):
if user != "":
print ("User:{}".format(user))
message_user(user, dm, api)
users_to_message_number = str(len(users_to_message_list))
tweet_to_send = get_tweet(id_value, username, previous_username, profile_image, previous_display_url, users_to_message_number)
tweet_message(tweet_to_send, api)
isnew = False
isnew = True
# Update the record with the new values
record_to_check_values[username_col] = username
record_to_check_values[display_img_col] = profile_image
changed = False
isnew = False
return record_to_check_values, changed, isnew
def make_updates(client: Optional[bigquery.Client] = bigquery.Client(),
results_dict: Dict[str, Dict[str, Union[str, List[str]]]] = {},
changing_usernames: List[str] = [],
table_name: Optional[str] = getenv("table_name")) -> None:
Updates specified records in a BigQuery table based on provided changes.
client: The BigQuery client instance used for updating the records.
results_dict: Dictionary containing keys as handles and values as their attributes.
changing_usernames: List of usernames/handles that have changes.
table_name: Name of the BigQuery table to update.
# Iterate through results dictionary
for key, value in results_dict.items():
# Only process keys/handles that have changes
if key in changing_usernames:
# Extract relevant information from the dictionary
handle = key
interested_handles = value["interested_handles"]
# Ensure uniqueness and proper format for interested handles
if isinstance(interested_handles, list) and len(interested_handles) > 1:
interested_handles = list(set(interested_handles))
display_name = value["display_name"]
display_image = value["display_image"]
# Construct the DML statement to update the records in BigQuery
dml_statement = ("""UPDATE {table_name}
SET interested_handles = '{interested_handles}',
display_name = '{display_name}',
display_image = '{display_image}'
WHERE handle = '{handle}'""").format(
# Execute the DML statement and wait for it to finish
query_job = client.query(dml_statement) # API request
query_job.result() # Waits for statement to finish
def add_names(client: Optional[bigquery.Client] = bigquery.Client(),
results_dict: Dict[str, Dict[str, str]] = {},
new_usernames: List[str] = [],
table_name: Optional[str] = getenv("table_name")) -> None:
Adds new records to a BigQuery table based on the provided data.
client: The BigQuery client instance used for adding the records.
results_dict: Dictionary containing keys as handles and values as their attributes.
new_usernames: List of new usernames/handles to be added.
table_name: Name of the BigQuery table to add the records to.
# Iterate through results dictionary
for key, value in results_dict.items():
# Only process new usernames/handles
if key in new_usernames:
# Extract relevant information from the dictionary
handle = key
interested_handles = value["interested_handles"]
display_name = value["display_name"]
display_image = value["display_image"]
# Construct columns string for the INSERT statement
columns_string = ",".join(tuple(["handle"] + [col for col in value.keys()]))
# Construct the DML statement to add the records to BigQuery
dml_statement = """INSERT {table_name} ({columns})
VALUES('{handle}', '{interested_handles}', '{display_name}', '{display_image}')""".format(
# Print the DML statement for debugging or logging
# Execute the DML statement and wait for it to finish
query_job = client.query(dml_statement) # API request
query_job.result() # Waits for statement to finish
def remove_from_bgq(table_name: str,
column_to_change: str,
criteria_list: List[str],
client: Optional[bigquery.Client] = bigquery.Client()) -> None:
Removes records from a BigQuery table based on the provided criteria.
table_name: Name of the BigQuery table to remove records from.
column_to_change: The column name which is used to determine which records to delete.
criteria_list: List of criteria values. Records matching these values in the specified column will be removed.
client: The BigQuery client instance used for removing the records.
# Construct the DELETE query statement based on the provided parameters
query = """DELETE FROM
WHERE {column_to_change} in unnest({criteria_list})
""".format(table_name=table_name, column_to_change=column_to_change, criteria_list=criteria_list)
# Print the DELETE query statement for debugging or logging
# Execute the DELETE statement and remove records from the BigQuery table
query_job = client.query(query) # API request
query_job.result() # Waits for statement to finish
def make_changes(update_dict: dict,
usernames_tuple: tuple,
results_dict: dict,
names_returned: list,
id_column: str = "display_name",
column_to_change: str = "interested_handles") -> (dict, bool, list):
Processes changes based on the provided update dictionary and updates results_dict accordingly.
update_dict: Dictionary containing keys of user IDs and values with lists of names to add and remove.
usernames_tuple: Tuple of usernames.
results_dict: Dictionary with existing information.
names_returned: List of names returned from some external source.
id_column: Name of the column which represents display names. Defaults to 'display_name'.
column_to_change: Name of the column that will be changed. Defaults to 'interested_handles'.
results_dict: Updated dictionary after processing the changes.
removing_any: Boolean indicating if any names were removed.
to_remove: List of keys that need to be removed.
print("Update dict: {}".format(update_dict))
to_remove = []
removing_any = False
# Iterate over the update dictionary to process additions
for key, value in update_dict.items():
names_to_add = value["add"]
for name in names_to_add:
name = str(name)
existing_requests = 0
# Check if name is already in the results dictionary and count occurrences
for res_key, res_val in results_dict.items():
interested_handles_list = str(res_val["interested_handles"]).split(",")
if name in interested_handles_list:
existing_requests += 1
# Skip if there are too many existing requests for a name
if existing_requests > 19:
# Add the name to the results dictionary
if key in results_dict.keys():
accessed_key = results_dict[key]
if "interested_handles" in accessed_key:
accessed_key["interested_handles"] = str(accessed_key["interested_handles"]) + "," + name
accessed_key["interested_handles"] = [name]
results_dict[key] = {"interested_handles": name}
example_item = next(iter(results_dict.values()))
# Initialize other attributes in the dictionary to None
for exa_key in example_item.keys():
if not exa_key == "interested_handles":
results_dict[key][exa_key] = None
# Process removals
names_to_remove = str(value["remove"])
if key in results_dict:
existing_interest = str(results_dict[key]["interested_handles"])
new_interested_list = [x for x in existing_interest.split(",") if not x in names_to_remove]
# Check if we need to remove any names
if len(new_interested_list) < 1:
removing_any = True
to_remove = to_remove + [key]
if key in results_dict.keys():
results_dict[key]["interested_handles"] = ",".join(new_interested_list)
print("Removing: {}".format(to_remove))
return results_dict, removing_any, to_remove
def query_bq(request: dict,
id_column: str = getenv("id_col"),
column_to_change: str = getenv("column_to_change")) -> None:
Function to query BigQuery based on given request, and update records based on
Twitter information and recent DMs.
request: Dictionary containing information regarding the query.
id_column: Column name for the ID, fetched from environment variables by default.
column_to_change: Column name to be changed, fetched from environment variables by default.
# Initializing Twitter and BigQuery client variables from environment variables
consumer_key = getenv("consumer_key")
consumer_secret = getenv("consumer_secret")
access_token = getenv("access_token")
access_secret = getenv("access_secret")
# Recording when the process starts
start_time =
# Setting the threshold for how far back to check DMs for new requests
threshold = int(getenv("threshold"))
threshold_time = start_time - datetime.timedelta(minutes=threshold)
# Initializing BigQuery client and setting table name
client = bigquery.Client()
table_name = getenv("table_name")
# Setting up Twitter authorization
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)
# Checking for recent DMs to determine changes to make
to_update, changing_usernames = get_recent_updates(api, threshold_time)
# Process updates if any are found
if len(to_update) > 0:
results_dict, names_list = get_records(client, table_name, id_column)
updated_dict, removing_any, to_remove = make_changes(
# If any records need to be removed, remove them from BigQuery
if removing_any:
remove_from_bgq(table_name, column_to_change="handle", criteria_list=to_remove, client=client)
updated_dict, names_list = get_records(client, table_name, id_column)
# Dictionaries to hold changing and new usernames
changing_usernames = {}
new_usernames = {}
# Check each record on Twitter and classify if it's new or changing
for key, value in updated_dict.items():
checked_value, changed, isnew = check_on_twitter(
if checked_value is None:
# If record has changed, categorize as new or just changing
if changed:
if isnew:
new_usernames[key] = checked_value
changing_usernames[key] = checked_value
# Update and add records to BigQuery as required
make_updates(client, results_dict=updated_dict,
changing_usernames=changing_usernames, table_name=table_name)
add_names(client, results_dict=updated_dict,
new_usernames=new_usernames, table_name=table_name)
