Skip to content

Instantly share code, notes, and snippets.

@Robin-Lord
Created October 8, 2023 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Robin-Lord/fda8ac325879198903143af461e60363 to your computer and use it in GitHub Desktop.
Save Robin-Lord/fda8ac325879198903143af461e60363 to your computer and use it in GitHub Desktop.
import tweepy
from google.cloud import bigquery
import os
from datetime import datetime
from typing import Dict, List, Tuple, Union, Any, Optional
# Load auth from env for security
auth: tweepy.OAuthHandler = tweepy.OAuthHandler(os.getenv("consumer_key"), os.getenv("consumer_secret"))
# Global flags to control printing
print_start: bool = True
print_time: bool = True
def timeit(method):
"""
Decorator to print the name of the function when it's called and the time it took to execute.
Args:
method (function): The function to be timed and printed.
Returns:
function: Wrapped function that prints its name and execution time.
"""
def timed(*args, **kw):
"""
Prints the name of the function, gets the start time, runs the function,
finds the end time, then prints the difference in time.
Args:
*args: Variable length argument list for the original function.
**kw: Arbitrary keyword arguments for the original function.
Returns:
Any: Result from the original method.
"""
if print_start:
print(f"<<{method.__name__}>>")
ts: datetime = datetime.now()
result = method(*args, **kw)
te: datetime = datetime.now()
if print_time:
print(f"<<{method.__name__}>> time to execute: {te - ts}\n-----------------------------------------")
return result
return timed
@timeit
def get_recent_updates(api: object, threshold_time: datetime) -> Tuple[Dict[str, Dict[str, List[str]]], List[str]]:
"""
Get recent direct messages, filter them by timestamp, and process the contents for updating usernames.
Args:
api (object): An instance of the API object for making requests.
threshold_time (datetime): The time threshold for filtering recent direct messages.
Returns:
Tuple[Dict[str, Dict[str, List[str]]], List[str]]:
- A dictionary of usernames to update with actions.
- A list of changing usernames.
"""
# Get direct messages
dms = api.list_direct_messages()
# Filter direct messages based on timestamp
recent_dms = [dm for dm in dms if datetime.datetime.fromtimestamp(int(dm.created_timestamp) / 1000) > threshold_time]
usernames_to_update: Dict[str, Dict[str, List[str]]] = {}
for dm in recent_dms:
print(">>>")
message_body = dm.message_create
# Check sender_id to exclude a specific id
if message_body["sender_id"] != "id_to_remove":
sender_id = message_body["sender_id"]
message_contents = message_body["message_data"]["text"].lower()
print(message_contents)
user_mentions = message_body["message_data"]["entities"]["user_mentions"]
names_to_change = []
# Process user mentions in the message
if user_mentions:
for user_mention in user_mentions:
screen_name = user_mention["screen_name"].lower()
# Initialize the user if not in the dictionary
if screen_name not in usernames_to_update:
usernames_to_update[screen_name] = {"add": [], "remove": []}
# Check if the message contains stop or remove keywords
if "stop" in message_contents or "remove" in message_contents:
usernames_to_update[screen_name]["remove"] = list(set(usernames_to_update[screen_name]["remove"] + [sender_id]))
message_to_user = f"Thanks, I've got your request to unfollow {screen_name}."
message_user(sender_id, message_to_user, api)
else:
usernames_to_update[screen_name]["add"] = list(set(usernames_to_update[screen_name]["add"] + [sender_id]))
message_to_user = f"""Thanks, I've got your request to follow {screen_name}. You can follow a maximum of 20.
If you'd like to stop following that name, just send another message containing "stop following @{screen_name}\""""
message_user(sender_id, message_to_user, api)
changing_usernames = list(usernames_to_update.keys())
return usernames_to_update, changing_usernames
@timeit
def get_records(client: object, table_name: str, identifier_column: str) -> Tuple[Dict[str, Dict[str, Any]], List[str]]:
"""
Fetch records from a BigQuery table and process them into a dictionary and a list.
Args:
client (object): BigQuery client instance to execute queries.
table_name (str): Name of the BigQuery table.
identifier_column (str): The column to be used as the main identifier for the dictionary.
Returns:
Tuple[Dict[str, Dict[str, Any]], List[str]]:
- A dictionary of records with identifier_column as key.
- A list of identifier values.
"""
# Constructing the query
query = f"""
SELECT *
FROM {table_name}
"""
query_job = client.query(query)
results = query_job.result() # Waits for job to complete.
results_dict: Dict[str, Dict[str, Any]] = {}
names_list: List[str] = []
try:
for row in results:
row_dict = dict(row)
col_key = row_dict[identifier_column]
results_dict[col_key] = {}
names_list.append(col_key)
# Iterating through rows to populate results_dict
for key, value in row_dict.items():
if key != identifier_column:
results_dict[col_key][key] = value
except Exception as e:
print(table_name)
print(row_dict)
raise (e)
print(results_dict)
return results_dict, names_list
@timeit
def get_to_change(columns_to_check: str, usernames_tuple: Tuple[str, ...]) -> List[Dict[str, Any]]:
"""
Fetch records from a BigQuery table based on certain columns and usernames.
Args:
columns_to_check (str): Column name to be used for filtering results.
usernames_tuple (Tuple[str, ...]): Tuple containing the usernames to be checked.
Returns:
List[Dict[str, Any]]: A list of records from the table.
"""
# Constructing the query using f-strings
query = f"""
SELECT *
FROM twitter_name_change.test_table
WHERE {columns_to_check} in {usernames_tuple}
"""
query_job = client.query(query)
results = query_job.result() # Waits for job to complete.
records = [dict(row) for row in results]
return records
@timeit
def get_dm(id_value: str, username: str, previous_username: str, profile_image: str, previous_display_url: str) -> str:
"""
Generate a direct message based on username and profile image changes.
Args:
id_value (str): User ID.
username (str): Current username.
previous_username (str): Previous username.
profile_image (str): Current profile image URL.
previous_display_url (str): Previous profile image URL.
Returns:
str: Generated direct message.
"""
# Check if username changed
if username != previous_username:
message = f"{id_value.lower()} was {previous_username} and is now {username}. T"
else:
message = f"{username} hasn't changed name but t"
# Check if profile image changed
if profile_image != previous_display_url:
message += f"heir profile picture changed from {previous_display_url} to {profile_image}"
else:
message += "heir profile picture hasn't changed."
return message
@timeit
def get_tweet(id_value: str, username: str, previous_username: str, profile_image: str, previous_display_url: str, user_length: int) -> str:
"""
Generate a tweet message based on username and profile image changes and the number of users informed.
Args:
id_value (str): User ID.
username (str): Current username.
previous_username (str): Previous username.
profile_image (str): Current profile image URL.
previous_display_url (str): Previous profile image URL.
user_length (int): Number of users informed.
Returns:
str: Generated tweet message.
"""
# Determine if the message should use "user" or "users"
users_or_user = "users" if user_length > 1 else "user"
# Construct the message based on the change in username
if username != previous_username:
message = f"I just DMd {user_length} {users_or_user} about {id_value.lower()} changing username from {previous_username} to {username}."
else:
message = f"I just DMd {user_length} {users_or_user} about {id_value.lower()}."
# Append the change in profile picture, if any
if profile_image != previous_display_url:
message += f" Their profile picture changed from {previous_display_url} to {profile_image}."
return message
@timeit
def tweet_message(message: str, api: Any) -> None:
"""
Post a tweet using the provided API instance.
Args:
message (str): Message to be tweeted.
api (Any): Tweepy API instance.
Returns:
None
"""
try:
api.update_status(message)
except:
print("Error posting message")
@timeit
def message_user(user: str, message: str, api: Any = tweepy.API(auth)) -> None:
"""
Send a direct message to a user using the provided API instance.
Args:
user (str): ID of the recipient.
message (str): Message to be sent.
api (Any): Tweepy API instance.
Returns:
None
"""
try:
api.send_direct_message(int(user), text=message)
except Exception as e:
print(f"Failed to message user: {type(user)} -- {user}")
from typing import Dict, Tuple, Union, Optional
@timeit
def check_on_twitter(record_to_check_key: str,
record_to_check_values: Dict[str, Union[str, None]],
username_col: str,
display_img_col: str,
api: Optional[tweepy.API] = tweepy.API(auth)) -> Tuple[Dict[str, Union[str, None]], bool, bool]:
"""
Queries Twitter to get user details based on a record key and compares it against existing data.
Sends messages and tweets based on changes detected.
Args:
record_to_check_key: The key used to check the record.
record_to_check_values: The dictionary containing values associated with the record.
username_col: The column/key in the dictionary indicating the username.
display_img_col: The column/key in the dictionary indicating the display image URL.
api: The tweepy API instance to use for Twitter interactions.
Returns:
Tuple containing the updated record, a flag if the record has changed, and a flag if it's a new record.
"""
# Get the ID value from the record key
id_value = record_to_check_key
# Try to retrieve user information from Twitter
try:
new_record = api.get_user(id_value)
except:
print ("username {} retrieval failed".format(id_value))
return None, None, None
# Extract user details from the response
user_info = new_record._json
username = user_info["name"]
profile_image = user_info["profile_image_url_https"]
# Get previous username and display URL; default to None if not found
try:
previous_username = record_to_check_values[username_col]
previous_display_url = record_to_check_values[display_img_col]
except:
previous_username = None
previous_display_url = None
# Initialize dictionary for values to upload (currently unused, may be useful later)
values_to_upload = {}
# Check if the current data matches the previous data
if username != previous_username or profile_image != previous_display_url:
changed = True
# If both previous data are present, send messages and tweets
if previous_username and previous_display_url:
dm = get_dm(id_value, username, previous_username, profile_image, previous_display_url)
interested_handles = record_to_check_values["interested_handles"]
print ("Interested handles: {}".format(interested_handles))
users_to_message_list = interested_handles.split(",")
print ("users_to_message_list: {}".format(users_to_message_list))
for user in list(set(users_to_message_list)):
if user != "":
print ("User:{}".format(user))
message_user(user, dm, api)
users_to_message_number = str(len(users_to_message_list))
tweet_to_send = get_tweet(id_value, username, previous_username, profile_image, previous_display_url, users_to_message_number)
tweet_message(tweet_to_send, api)
isnew = False
else:
isnew = True
# Update the record with the new values
record_to_check_values[username_col] = username
record_to_check_values[display_img_col] = profile_image
else:
changed = False
isnew = False
return record_to_check_values, changed, isnew
@timeit
def make_updates(client: Optional[bigquery.Client] = bigquery.Client(),
results_dict: Dict[str, Dict[str, Union[str, List[str]]]] = {},
changing_usernames: List[str] = [],
table_name: Optional[str] = getenv("table_name")) -> None:
"""
Updates specified records in a BigQuery table based on provided changes.
Args:
client: The BigQuery client instance used for updating the records.
results_dict: Dictionary containing keys as handles and values as their attributes.
changing_usernames: List of usernames/handles that have changes.
table_name: Name of the BigQuery table to update.
Returns:
None
"""
# Iterate through results dictionary
for key, value in results_dict.items():
# Only process keys/handles that have changes
if key in changing_usernames:
# Extract relevant information from the dictionary
handle = key
interested_handles = value["interested_handles"]
# Ensure uniqueness and proper format for interested handles
if isinstance(interested_handles, list) and len(interested_handles) > 1:
interested_handles = list(set(interested_handles))
display_name = value["display_name"]
display_image = value["display_image"]
# Construct the DML statement to update the records in BigQuery
dml_statement = ("""UPDATE {table_name}
SET interested_handles = '{interested_handles}',
display_name = '{display_name}',
display_image = '{display_image}'
WHERE handle = '{handle}'""").format(
table_name=table_name,
interested_handles=interested_handles,
display_name=display_name,
display_image=display_image,
handle=handle)
# Execute the DML statement and wait for it to finish
query_job = client.query(dml_statement) # API request
query_job.result() # Waits for statement to finish
@timeit
def add_names(client: Optional[bigquery.Client] = bigquery.Client(),
results_dict: Dict[str, Dict[str, str]] = {},
new_usernames: List[str] = [],
table_name: Optional[str] = getenv("table_name")) -> None:
"""
Adds new records to a BigQuery table based on the provided data.
Args:
client: The BigQuery client instance used for adding the records.
results_dict: Dictionary containing keys as handles and values as their attributes.
new_usernames: List of new usernames/handles to be added.
table_name: Name of the BigQuery table to add the records to.
Returns:
None
"""
# Iterate through results dictionary
for key, value in results_dict.items():
# Only process new usernames/handles
if key in new_usernames:
# Extract relevant information from the dictionary
handle = key
interested_handles = value["interested_handles"]
display_name = value["display_name"]
display_image = value["display_image"]
# Construct columns string for the INSERT statement
columns_string = ",".join(tuple(["handle"] + [col for col in value.keys()]))
# Construct the DML statement to add the records to BigQuery
dml_statement = """INSERT {table_name} ({columns})
VALUES('{handle}', '{interested_handles}', '{display_name}', '{display_image}')""".format(
table_name=table_name,
columns=columns_string,
handle=handle,
interested_handles=interested_handles,
display_name=display_name,
display_image=display_image)
# Print the DML statement for debugging or logging
print(dml_statement)
# Execute the DML statement and wait for it to finish
query_job = client.query(dml_statement) # API request
query_job.result() # Waits for statement to finish
@timeit
def remove_from_bgq(table_name: str,
column_to_change: str,
criteria_list: List[str],
client: Optional[bigquery.Client] = bigquery.Client()) -> None:
"""
Removes records from a BigQuery table based on the provided criteria.
Args:
table_name: Name of the BigQuery table to remove records from.
column_to_change: The column name which is used to determine which records to delete.
criteria_list: List of criteria values. Records matching these values in the specified column will be removed.
client: The BigQuery client instance used for removing the records.
Returns:
None
"""
# Construct the DELETE query statement based on the provided parameters
query = """DELETE FROM
{table_name}
WHERE {column_to_change} in unnest({criteria_list})
""".format(table_name=table_name, column_to_change=column_to_change, criteria_list=criteria_list)
# Print the DELETE query statement for debugging or logging
print(query)
# Execute the DELETE statement and remove records from the BigQuery table
query_job = client.query(query) # API request
query_job.result() # Waits for statement to finish
@timeit
def make_changes(update_dict: dict,
usernames_tuple: tuple,
results_dict: dict,
names_returned: list,
id_column: str = "display_name",
column_to_change: str = "interested_handles") -> (dict, bool, list):
"""
Processes changes based on the provided update dictionary and updates results_dict accordingly.
Args:
update_dict: Dictionary containing keys of user IDs and values with lists of names to add and remove.
usernames_tuple: Tuple of usernames.
results_dict: Dictionary with existing information.
names_returned: List of names returned from some external source.
id_column: Name of the column which represents display names. Defaults to 'display_name'.
column_to_change: Name of the column that will be changed. Defaults to 'interested_handles'.
Returns:
results_dict: Updated dictionary after processing the changes.
removing_any: Boolean indicating if any names were removed.
to_remove: List of keys that need to be removed.
"""
print("Update dict: {}".format(update_dict))
to_remove = []
removing_any = False
# Iterate over the update dictionary to process additions
for key, value in update_dict.items():
names_to_add = value["add"]
for name in names_to_add:
name = str(name)
existing_requests = 0
# Check if name is already in the results dictionary and count occurrences
for res_key, res_val in results_dict.items():
interested_handles_list = str(res_val["interested_handles"]).split(",")
if name in interested_handles_list:
existing_requests += 1
# Skip if there are too many existing requests for a name
if existing_requests > 19:
continue
else:
# Add the name to the results dictionary
if key in results_dict.keys():
accessed_key = results_dict[key]
if "interested_handles" in accessed_key:
accessed_key["interested_handles"] = str(accessed_key["interested_handles"]) + "," + name
else:
accessed_key["interested_handles"] = [name]
else:
results_dict[key] = {"interested_handles": name}
example_item = next(iter(results_dict.values()))
# Initialize other attributes in the dictionary to None
for exa_key in example_item.keys():
if not exa_key == "interested_handles":
results_dict[key][exa_key] = None
# Process removals
names_to_remove = str(value["remove"])
if key in results_dict:
existing_interest = str(results_dict[key]["interested_handles"])
new_interested_list = [x for x in existing_interest.split(",") if not x in names_to_remove]
# Check if we need to remove any names
if len(new_interested_list) < 1:
removing_any = True
to_remove = to_remove + [key]
if key in results_dict.keys():
results_dict[key]["interested_handles"] = ",".join(new_interested_list)
print("Removing: {}".format(to_remove))
return results_dict, removing_any, to_remove
@timeit
def query_bq(request: dict,
id_column: str = getenv("id_col"),
column_to_change: str = getenv("column_to_change")) -> None:
"""
Function to query BigQuery based on given request, and update records based on
Twitter information and recent DMs.
Args:
request: Dictionary containing information regarding the query.
id_column: Column name for the ID, fetched from environment variables by default.
column_to_change: Column name to be changed, fetched from environment variables by default.
Returns:
None
"""
# Initializing Twitter and BigQuery client variables from environment variables
consumer_key = getenv("consumer_key")
consumer_secret = getenv("consumer_secret")
access_token = getenv("access_token")
access_secret = getenv("access_secret")
# Recording when the process starts
start_time = datetime.datetime.now()
# Setting the threshold for how far back to check DMs for new requests
threshold = int(getenv("threshold"))
threshold_time = start_time - datetime.timedelta(minutes=threshold)
# Initializing BigQuery client and setting table name
client = bigquery.Client()
table_name = getenv("table_name")
# Setting up Twitter authorization
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)
# Checking for recent DMs to determine changes to make
to_update, changing_usernames = get_recent_updates(api, threshold_time)
# Process updates if any are found
if len(to_update) > 0:
results_dict, names_list = get_records(client, table_name, id_column)
updated_dict, removing_any, to_remove = make_changes(
update_dict=to_update,
usernames_tuple=changing_usernames,
results_dict=results_dict,
names_returned=names_list,
id_column=id_column,
column_to_change=column_to_change)
# If any records need to be removed, remove them from BigQuery
if removing_any:
remove_from_bgq(table_name, column_to_change="handle", criteria_list=to_remove, client=client)
else:
updated_dict, names_list = get_records(client, table_name, id_column)
# Dictionaries to hold changing and new usernames
changing_usernames = {}
new_usernames = {}
# Check each record on Twitter and classify if it's new or changing
for key, value in updated_dict.items():
checked_value, changed, isnew = check_on_twitter(
record_to_check_key=key,
record_to_check_values=value,
username_col="display_name",
display_img_col="display_image",
api=api)
if checked_value is None:
continue
# If record has changed, categorize as new or just changing
if changed:
if isnew:
new_usernames[key] = checked_value
else:
changing_usernames[key] = checked_value
# Update and add records to BigQuery as required
make_updates(client, results_dict=updated_dict,
changing_usernames=changing_usernames, table_name=table_name)
add_names(client, results_dict=updated_dict,
new_usernames=new_usernames, table_name=table_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment