Skip to content

Instantly share code, notes, and snippets.

@maliqq
Created March 15, 2010 18:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maliqq/333130 to your computer and use it in GitHub Desktop.
Save maliqq/333130 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import feedparser
import re
import urllib, urllib2
import datetime, time
import os
import sqlite3
DEBUG = True
class UpdatesHistory:
# `CREATE TABLE updates (message TEXT, dt DATETIME, guid VARCHAR(255)UNIQUE, message_id LONG);`
DATETIME_FORMAT = "%Y-%m-%d %H:%M"
def __init__(self):
self.db = sqlite3.connect("./kinomotor_updates.sqlite3")
self.cursor = self.db.cursor()
@property
def latest_update_datetime(self):
self.cursor.execute("SELECT MAX(dt) FROM updates WHERE 1")
for row in self.cursor:
if row[0] != None:
return datetime.datetime.strptime(row[0], self.DATETIME_FORMAT)
return row[0]
def guid_exist(self, guid):
self.cursor.execute("SELECT COUNT(*) FROM updates WHERE guid = ?", (guid,))
for row in self.cursor:
return int(row[0]) != 0
def register_update(self, entry):
self.db.execute("INSERT INTO updates (guid, dt, message, message_id) VALUES (?, ?, ?, ?)", (entry['guid'], entry['dt'].strftime(self.DATETIME_FORMAT), entry['message'], entry['message_id'],))
self.db.commit()
class TwitterClient:
POST_URL = "http://twitter.com/statuses/update.xml"
def __init__(self, username, password):
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, "http://twitter.com/", username, password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
def post(self, message):
if DEBUG:
print message + "\n"
return -1
else:
req = urllib2.Request(self.POST_URL, urllib.urlencode({"status": message.encode("utf-8")}), )
try:
resp = urllib2.urlopen(req)
content = resp.read()
m = re.search(ur"<id>(\d+)<\/id>", content, re.M)
if m:
return int(m.group(1))
except:
pass
return None
class FeedFetcher:
FEED_URL = "http://kino.motor.kg/Rss.aspx"
def get_entries(self):
d = feedparser.parse(self.FEED_URL)
entries = []
for entry in d['entries']:
entries.append(self.get_entry(entry))
return entries
def get_entry(self, entry):
dt = datetime.datetime(*(entry.updated_parsed[0:6]))
description = entry.description
title = entry.title
link = entry.link
guid = entry.guid
message = title
m = re.search(ur"<tr><td valign=\"top\" align=\"right\"><b>Дата&nbsp;выхода:<\/b><\/td><td valign=\"top\">.*?(\d{4})<\/td><\/tr>", description, re.M)
if m:
year = m.group(1)
message += " (" + year + ")"
m = re.search(ur"<tr><td valign=\"top\" align=\"right\"><b>Жанр:<\/b><\/td><td valign=\"top\">(.*?)<\/td><\/tr>", description, re.M)
if m:
genre = m.group(1)
message += " [" + genre + "]"
m = re.search(ur"<tr><td valign=\"top\" align=\"right\"><b>Рейтинг&nbsp;IMDB:<\/b><\/td><td valign=\"top\">(.*?)<\/td><\/tr>", description, re.M)
if m:
imdb = m.group(1)
message += " IMDB: " + imdb
message += " " + link
return dict(message = message, dt = dt, guid = guid)
class KinoMotorUpdater:
def __init__(self, username, password):
self.bot = TwitterClient(username, password)
def update(self):
feed = FeedFetcher()
history = UpdatesHistory()
latest_update_datetime = history.latest_update_datetime
for entry in feed.get_entries():
# TODO check HTML code presence in message
if latest_update_datetime:
if entry['dt'] <= latest_update_datetime:
continue
if history.guid_exist(entry['guid']):
continue
entry['message_id'] = message_id =\
self.bot.post(entry['message'])
if message_id != None:
history.register_update(entry)
if not latest_update_datetime:
break
import sys
if __name__ == "__main__":
username = "test"
password = "test"
if len(sys.argv) >= 3:
username = sys.argv[1]
password = sys.argv[2]
DEBUG = False
kinomotor_updater = KinoMotorUpdater(username, password)
kinomotor_updater.update()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment