Skip to content

Instantly share code, notes, and snippets.

@dannguyen
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannguyen/c6635797f602a6615465 to your computer and use it in GitHub Desktop.
Save dannguyen/c6635797f602a6615465 to your computer and use it in GitHub Desktop.
Code for @BeholdEveryWord Twitter Bot: https://twitter.com/beholdeveryword
import json
import logging
import os.path
import tweepy
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('beholdeveryword.apis')
def tweepy_client(credsfile):
"""
Expects {credsfile} to be a filename for a JSON file in this format:
{
"consumer_key": "x",
"consumer_secret": "y",
"access_token": "z",
"access_token_secret": "aa"
}
Returns an authenticated tweepy.API object
"""
credsfile = os.path.expanduser(credsfile)
creds = json.load(open(credsfile))
# Get authentication token
auth = tweepy.OAuthHandler(consumer_key = creds['consumer_key'],
consumer_secret = creds['consumer_secret'])
auth.set_access_token(creds['access_token'],
creds['access_token_secret'])
# create an API handler
return tweepy.API(auth)
def get_latest_timeline_tweet_text(credsfile):
"""
Returns the latest text string from the authenticated user's
timeline, or None if no tweet yet exists
"""
t = tweepy_client(credsfile).user_timeline(count = 1, trim_user = True,
exclude_replies = True, include_rts = False
)
if t[0]:
return t[0].text
def send_tweet(txt, credsfile):
"""
Sends out {txt} as a new tweet with the
authenticated account from {credsfile}
Returns the tweepy Response object
"""
t = tweepy_client(credsfile)
resp = t.update_status(status = txt)
return resp._json
def get_wikipedia_url_for_word(word):
"""
contacts Wikipedia's API to see if an article with {word}.capitalize() exists
Returns wikipedia URL with {word} as title if it does exist
or None if not
note: this method is pretty sloppy and assumes word is just a single word with
all alphabet letters
"""
wend_point = "http://en.wikipedia.org/w/api.php?format=json&action=query&prop=info&titles="
title = word.capitalize()
resp = requests.get(wend_point + title).json()
if resp['query']['pages'].get('-1'):
return None
else:
return "http://en.wikipedia.org/wiki/%s" % title
def get_biblehub_url_for_word(word):
"""
Given {word} like "Abel"
creates the biblehub.com topic URL:
e.g. http://biblehub.com/topical/a/abel.html
and checks to see if it exists (i.e. has a HTTP status of 200)
Returns the biblehub.com URL or None
"""
slug = word.lower()
url = 'http://biblehub.com/topical/%s/%s.htm' % (slug[0], slug)
resp = requests.head(url)
return url if resp.status_code == 200 else None
from apis import send_tweet, get_latest_timeline_tweet_text
from apis import get_wikipedia_url_for_word, get_biblehub_url_for_word
from words import create_words_file, find_next_wordline
import json
import logging
import os.path
import re
TWITTER_CREDS = "~/.behold.twittercreds.json"
SOURCE_URL = 'http://www.gutenberg.org/cache/epub/10/pg10.txt'
WORDS_FILENAME = "/tmp/biblewords.txt"
TWEET_TEMPLATE = "Behold %s and its %s Biblical %s!\n👼🙏😇"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('beholdeveryword.every_word')
def create_next_tweet_text(tweet_txt, words_filename):
"""
Expects TWEET_TEMPLATE to be a String
{tweet_txt} is a String. A regex is used to extract a particular
{word}, and this is passed into words.find_next_wordline() to
get the next [word, word_count] (e.g. {seq}) to tweet
Returns a text string or None, depending on whether
find_next_wordline() returned {seq} or None
"""
new_tweet = None
mtch = re.search('(?<=Behold )[A-Z]+', tweet_txt)
word = mtch.group() if mtch else None
seq = find_next_wordline(current_word = word, words_filename = words_filename)
if seq is not None:
word = seq[0].upper()
tx = "appearances" if int(seq[1]) > 1 else 'appearance'
new_tweet = TWEET_TEMPLATE % (word, seq[1], tx)
## attempt to add BibleHub link
b_url = get_biblehub_url_for_word(word)
if b_url:
new_tweet += "\nBibleHub: " + b_url
## attempt to add Wikipedia link
w_url = get_wikipedia_url_for_word(word)
if w_url:
new_tweet += "\nWikipedia: " + w_url
return new_tweet
def dotweet(testing = False):
"""
Downloads and creates Biblical word count (WORDS_FILE) if necessary.
Gets latest tweet from a given account (TWITTER_SCREEN_NAME)
Uses Tweepy to send a "Behold..." tweet
Returns response object or None
"""
create_words_file(source_url = SOURCE_URL, words_filename = WORDS_FILENAME,
start_pt = '1:1', end_pt = 'END OF THE PROJECT GUTENBERG'
)
# Note: If the latest tweet isn't of the expected "Behold..." format,
# then the Twitter sequence will __start over__
tweet_text = get_latest_timeline_tweet_text(credsfile = TWITTER_CREDS)
logger.info("Latest tweet is: \"%s\"" % tweet_text)
# Now formulate the next tweet to send out
next_tweet = create_next_tweet_text(tweet_text, WORDS_FILENAME)
if next_tweet is None:
logger.warning("Nothing to tweet")
return None
else:
logger.info("Tweeting: \"%s\"" % next_tweet)
# send the tweet
if testing == True:
return next_tweet
else:
resp = send_tweet(next_tweet, credsfile = TWITTER_CREDS)
return resp
if __name__ == "__main__":
resp = dotweet()
if type(resp) is dict:
j = json.dumps(resp, indent = 2)
print(j)
else:
print(resp)
import os.path
import requests
import re
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('beholdeveryword.words')
def create_words_file(source_url, words_filename, start_pt = "", end_pt = "", do_refresh = False):
"""
Downloads latest Bible file and creates WORDS_FILE
"""
# if it already exists, and no need to refresh it, then
# just do nothing
if(os.path.exists(words_filename)
and os.path.getsize(words_filename) > 1000
and do_refresh == False
):
return
# else, re-download, and re-make
logger.info("Downloading text from %s" % source_url)
text = requests.get(source_url).text.upper()
a = text.index(start_pt)
b = text.index(end_pt)
words = re.findall("[A-Z]+", text[a:b])
wordcounts = {}
for word in words:
if word not in wordcounts:
wordcounts[word] = 1
else:
wordcounts[word] += 1
with open(words_filename, "w") as f:
for word, x in sorted(wordcounts.items()):
f.write('%s,%s\n' % (word, x))
logger.info("%s is %s bytes" % (words_filename, os.path.getsize(words_filename)))
# return the path to the file
return words_filename
def find_next_wordline(words_filename, current_word = None):
"""
Finds line after the first match of current_word
Returns a sequence/list, [word, word_count]
or,
if word is not found, or end of file,
returns None
"""
nextline = ''
with open(words_filename) as f:
if current_word is None:
nextline = f.readline()
else:
for line in f:
r = "^%s(?=,)" % current_word
s = re.search(r,line)
if s:
nextline = f.readline()
break
if nextline is '':
return None
else:
return nextline.strip().split(',')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment