Skip to content

Instantly share code, notes, and snippets.

@myles
Created November 27, 2009 15:28
Show Gist options
  • Save myles/244064 to your computer and use it in GitHub Desktop.
Save myles/244064 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# encoding: utf-8
HASTTAGS_GROUPS = {
'gtalug': 'GTALUG',
'pygta': 'PyGTA',
'asgardproject': 'AsgardProject',
'asgard': 'AsgardProject',
'http': 'HTTP',
'html': 'HTML',
'xmpp': 'XMPP',
'jabber': 'XMPP',
'python': 'Python',
'linuxcaffe': 'Linuxcaffe',
'django': 'Django',
}
import re
import sqlite3
import logging
from urllib import urlencode
import twitter
import httplib2
import simplejson
TWITTER_USERNAME = ''
TWITTER_PASSWORD = ''
STATUSNET_API = '' # Identi.ca is http://identi.ca/api/
STATUSNET_USERNAME = ''
STATUSNET_PASSWORD = ''
logging.basicConfig(level=logging.INFO)
def main():
db = sqlite3.connect("tweets.db")
c = db.cursor()
# Create the database Table and Index
c.execute("""CREATE TABLE IF NOT EXISTS status (
id text, status text, seen boolean)""")
c.execute("""CREATE UNIQUE INDEX IF NOT EXISTS
id on status (id ASC)""")
db.commit()
# Connect to Twitter and download the latest tweets.
api = twitter.Api(TWITTER_USERNAME, TWITTER_PASSWORD,
input_encoding=None)
tweets = api.GetUserTimeline(TWITTER_USERNAME)
# Insert tweets into the database.
for i in tweets:
c.execute("""INSERT OR IGNORE INTO status (id, status)
VALUES (?, ?)""", (i.id, i.text))
db.commit()
# Get all the tweets that are not seen.
c.execute("""SELECT * FROM status WHERE seen IS NULL
ORDER BY id ASC""")
queue = []
for row in c:
(key, status, state) = row
queue.append((key, status))
for i in queue:
(key, status) = i
logging.info('[Original] %s: %s' % (key, status))
# Check to make sure it isn't a reply or public direct.
if status.startswith('@'):
c.execute("""UPDATE OR REPLACE status
SET seen=? WHERE id=?""", ('1', key))
db.commit()
break
hashtag_ref_re = re.compile(r"(\A|\W)#(?P<tag>\w+)(\Z|\W)",
re.IGNORECASE)
# Check to make sure that the status update contains
# a hashtag.
if hashtag_ref_re.search(status):
for tag in hashtag_ref_re.finditer(status):
tag_name = tag.group().strip().strip('#')
# Checkes if the hasttag matches a group.
try:
group = HASTTAGS_GROUPS[tag_name.lower()]
status = status.replace(tag.group().strip(),
'!%s' % group)
except KeyError:
pass
logging.info('[Processed] %s: %s' % (key, status))
# Connecting to Identi.ca
h = httplib2.Http()
h.add_credentials(STATUSNET_USERNAME, STATUSNET_PASSWORD,
'identi.ca')
hrs = {'Content-Type': 'application/x-www-form-urlencoded'}
# Posting the status.
resp, content = h.request(
STATUSNET_API + 'statuses/update.json',
'POST', urlencode({'status': status}),
headers=hrs)
if resp['status'] == '200':
# If posting the status worked mark update as seen
c.execute("""UPDATE OR REPLACE status
SET seen=? WHERE id=?""", ('1', key))
db.commit()
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment