Skip to content

Instantly share code, notes, and snippets.

@dylanjcastillo
Last active August 17, 2020 11:09
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save dylanjcastillo/99a79d53a3d73d8eebf0a5c0b81531d1 to your computer and use it in GitHub Desktop.
Save dylanjcastillo/99a79d53a3d73d8eebf0a5c0b81531d1 to your computer and use it in GitHub Desktop.
"""
1. Setup a Twitter Developer account and create new App, get its consumer key and consumer secret and replace them below
2. Replace TWEETS_DB, QUERY, and LANGUAGE values
3. Install required packages: `pip install tweepy schedule`
4. Run process using `python tweets_to_db.py`
"""
import tweepy
import sqlite3
import datetime
import schedule
import time
TWEETS_DB = "tweets.db"
QUERY = "@realDonaldTrump -filter:retweets" # Example query
LANGUAGE = "en" # Choose language
consumer_key = "INSERT-YOUR-KEY-HERE"
consumer_secret = "INSERT-YOUR-SECRET-HERE"
auth = tweepy.AppAuthHandler(consumer_key, consumer_secret)
api = tweepy.API(auth)
def create_table():
"""Create Table in DB"""
conn = sqlite3.connect(TWEETS_DB)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS TWEETS(
ID INTEGER PRIMARY KEY,
TWEET_ID INTEGER UNIQUE NOT NULL,
INSERT_TIMESTAMP TEXT,
TEXT TEXT,
FOLLOWERS_COUNT INTEGER,
FAVOURITES_COUNT INTEGER,
FRIENDS_COUNT INTEGER,
TWEETS_COUNT INTEGER
)
"""
)
conn.close()
return
def extract_tweets_data(response):
"""Get relevant data from tweets"""
output = []
timestamp = datetime.datetime.now()
for status in response:
elem = status._json
tweet_id = elem.get("id")
text = elem.get("full_text")
user = elem.get("user")
followers_count = user.get("followers_count")
favourites_count = user.get("favourites_count")
friends_count = user.get("friends_count")
tweets_count = user.get("statuses_count")
output.append(
(
tweet_id,
timestamp,
text,
followers_count,
favourites_count,
friends_count,
tweets_count,
)
)
return output
def insert_data(tweets):
"""Insert data into DB"""
conn = sqlite3.connect(TWEETS_DB)
insert_query = """
INSERT INTO TWEETS (
TWEET_ID,
INSERT_TIMESTAMP,
TEXT,
FOLLOWERS_COUNT,
FAVOURITES_COUNT,
FRIENDS_COUNT,
TWEETS_COUNT
)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
conn.executemany(insert_query, tweets)
conn.commit()
conn.close()
return
def get_latest_tweets(since_id):
"""Get last 100 tweets"""
results = api.search(
q=QUERY,
lang=LANGUAGE,
result_type="recent",
count="100",
tweet_mode="extended",
since_id=since_id,
)
print(f"Got {len(results)} result(s) from the Twitter API")
search_limits = (
api.rate_limit_status()
.get("resources", {})
.get("search", {})
.get("/search/tweets", {})
)
print(f"Limits of API: {search_limits}")
return results
def download_tweets():
"""Download tweets from DB"""
print("Getting last TWEET_ID from database")
conn = sqlite3.connect(TWEETS_DB)
cur = conn.cursor()
cur.execute("SELECT MAX(TWEET_ID) FROM TWEETS")
last_id = cur.fetchone()[0]
print("Getting most recent tweets from API")
latest_tweets = get_latest_tweets(since_id=last_id)
processed_tweets = extract_tweets_data(latest_tweets)
print("Inserting tweets into DB")
insert_data(processed_tweets)
print("Finished execution")
def main():
"""Execute routines"""
print(f"Started new execution at: {datetime.datetime.now()}")
create_table()
download_tweets()
schedule.every(30).seconds.do(download_tweets)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment