Skip to content

Instantly share code, notes, and snippets.

@bojanbjelic
Forked from gka/alfred-script.sh
Created April 11, 2012 08:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bojanbjelic/2357799 to your computer and use it in GitHub Desktop.
Save bojanbjelic/2357799 to your computer and use it in GitHub Desktop.
Local full-text search on tweets and favorites
cd /your/local/path/to/tweet-search/
rm last-results.html
search.py {query}
open last-results.html
#!/usr/bin/env python
"""
downloads tweets to local sqlite database
"""
screen_name = 'YOUR_TWITTER_NAME'
consumer_key = '...'
consumer_secret = '...'
auth_token = '...'
auth_token_secret = '...'
db_file = 'tweets.db'
from twitter import Twitter, OAuth
from twitter.api import TwitterHTTPError
import sys
import sqlite3
from time import sleep
import rfc822
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('create table if not exists tweets (id integer unique, user, text, tstamp)')
c.execute('create index if not exists tweet_index on tweets (text)')
c.execute('select max(id) from tweets') # select the last id
try:
since_id = int(c.fetchone()[0])
print 'since_id', since_id
except:
since_id = None
pass
t = Twitter(auth=OAuth(auth_token, auth_token_secret, consumer_key, consumer_secret))
def parse_date(rfcdate):
d = rfc822.parsedate_tz(rfcdate)
return '%d-%02d-%02d %02d:%02d:%02d' % d[:6]
def load_tweets(max_id=None, recursive=True):
min_id = sys.maxint
args = dict(screen_name=screen_name, count=200, include_rts=True, trim_user=True, include_entities=True)
if max_id:
args['max_id'] = max_id
print 'max_id =', max_id
if since_id:
args['since_id'] = since_id
try:
res = t.statuses.user_timeline(**args)
except TwitterHTTPError:
print TwitterHTTPError
print "Twitter needs some more time, let's wait 5 secs"
sleep(5)
load_tweets(max_id=max_id, recursive=recursive)
return
if res:
for tweet in res:
min_id = min(min_id, tweet['id'])
text = tweet['text']
if 'entities' in tweet:
for url in tweet['entities']['urls']:
if url['expanded_url'] is not None:
text = text.replace(url['url'], url['expanded_url'])
created_at = parse_date(tweet['created_at'])
user = screen_name
c.execute('insert or ignore into tweets values (?, ?, ?, ?) ', (tweet['id'], user, text, created_at))
conn.commit()
if recursive:
load_tweets(max_id=min_id - 1)
load_tweets()
def load_favorites(max_id=None, recursive=True):
min_id = sys.maxint
args = dict(count=200, include_entities=True)
if max_id:
args['max_id'] = max_id
#print 'max_id =', max_id
if since_id:
args['since_id'] = since_id
try:
res = t.favorites(**args)
except TwitterHTTPError:
print TwitterHTTPError
print "Twitter needs some more time, let's wait 5 secs"
sleep(5)
load_favorites(max_id=max_id, recursive=recursive)
return
if len(res) > 0:
for tweet in res:
min_id = min(min_id, tweet['id'])
text = tweet['text']
if 'entities' in tweet:
for url in tweet['entities']['urls']:
if url['expanded_url'] is not None:
text = text.replace(url['url'], url['expanded_url'])
user = tweet['user']['screen_name']
created_at = parse_date(tweet['created_at'])
c.execute('insert or ignore into tweets values (?, ?, ?, ?) ', (tweet['id'], user, text, created_at))
conn.commit()
if recursive:
load_tweets(max_id=min_id - 1)
load_favorites()
#!/usr/bin/env python
"""
performs full-text searches on local tweet database and outputs
the results as html. links, @names and #hashtags will be converted
into html links.
"""
db_file = 'tweets.db'
import sqlite3
import sys
import re
from datetime import datetime
link_patterns = [
(re.compile("\n", re.I), r'<br/>'),
(re.compile("http://([^\s\)]+)", re.I), r'<a href="http://\1">\g<0></a>'),
(re.compile("@([^-!\s\)]+)", re.I), r'<a href="https://twitter.com/#!/\1">\g<0></a>'),
(re.compile("#([^!\s\)]+)", re.I), r'<a href="https://twitter.com/#!/search/%23\1">\g<0></a>'),
]
if len(sys.argv) < 2:
print "usage: search.py QUERY"
exit(-1)
query_parts = sys.argv[1:]
query = ' '.join(query_parts)
conn = sqlite3.connect(db_file)
months = 'jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec'.split(',')
years = '2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015'.split(',')
c = conn.cursor()
if query == 'favs':
c.execute('select user, text, tstamp from tweets where user != "driven_by_data" order by tstamp desc')
out = '<h2>Favorite Tweets</h2>'
if len(query_parts) == 2 and query_parts[0][:3].lower() in months and query_parts[1] in years:
m = months.index(query_parts[0][:3].lower())
c.execute('select user, text, tstamp from tweets where strftime("%Y-%m", tstamp) = ? order by tstamp desc', ('%s-%02d' % (query_parts[1], m),))
out = '<h2>Tweets and Favorites from %s</h2>' % query
else:
c.execute('select user, text, tstamp from tweets where text like ? order by tstamp desc', ("%" + query + "%",))
out = '<h2>Search Results for "%s"</h2>' % query
now = datetime.today()
def format_delta(delta):
def format_unit(quantity, unit):
if quantity != 1:
unit += 's'
if quantity < 3:
quantity = ('zero', 'one', 'two')[quantity]
return '%s %s' % (str(quantity), unit)
if delta.days < 1:
if delta.seconds < 60:
return format_unit(delta.seconds, 'second')
if delta.seconds < 3600:
return format_unit(delta.seconds / 60, 'minute')
if delta.seconds < 24 * 3600:
return format_unit(delta.seconds / 3600, 'hour')
else:
if delta.days < 7:
return format_unit(delta.days, 'day')
if delta.days < 6 * 7:
return format_unit(delta.days / 7, 'week')
if delta.days < 300:
return format_unit(delta.days / 30, 'month')
return format_unit(delta.days / 365, 'year')
for row in c:
text = row[1]
created_at = datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S')
delta = now - created_at
time = format_delta(delta) + ' ago'
for reg, repl in link_patterns:
text = re.sub(reg, repl, text)
#print '@' + row[0] + ':', row[1]
out += '<li><strong>' + row[0] + '</strong> <span>' + time + '</span><br />' + text + '</li>'
html = '''
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>q: %s</title>
<style>
body { font-family: Helvetica Neue; font-weight: 300; font-size: 15px; line-height: 22px; background: #3C8FC9; }
div { width: 600px; margin: 0 auto; }
h2 { color: rgba(255,255,255,.75);font-weight:300;}
li {
list-style: none;
margin-bottom: 15px;
background: #fff;
padding: 10px;
box-shadow: 1px 0px 5px 0px rgba(0,0,0,.4);
border-radius:10px; }
ul { margin: 0; padding: 0; }
strong { font-weight: bold; }
span { color: #888 }
</style>
</head>
<body>
<div>
%s
</div>
</body>
</html>''' % (query, out)
open('last-results.html', 'w').write(html.encode('utf-8'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment