Skip to content

Instantly share code, notes, and snippets.

@wenLiangcan
Last active December 10, 2015 01:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wenLiangcan/4362723 to your computer and use it in GitHub Desktop.
Save wenLiangcan/4362723 to your computer and use it in GitHub Desktop.
“一个-韩寒”的rss 采集
# coding: utf-8
from google.appengine.ext import db
from google.appengine.api import urlfetch
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
import urllib, urllib2, BeautifulSoup, re, datetime
#class Conf(db.Model):
#last_mark = db.TextProperty()
#next_count = db.IntegerProperty()
class Article(db.Model):
link = db.StringProperty()
title = db.StringProperty()
author = db.TextProperty()
content = db.TextProperty()
datetime = db.DateTimeProperty()
rss='''
<rss version="2.0">
<channel>
<title>%s</title>
<link>http://hanhan.qq.com/hanhan/one</link>
%s
</channel>
</rss>'''
item='''
<item>
<link>%s</link>
<title><![CDATA[%s]]></title>
<author><![CDATA[%s]]></author>
<description>
<![CDATA[%s]]>
</description>
<pubDate>%s</pubDate>
</item>
'''
old_unicode=unicode
def unicode(soup):
return old_unicode(soup.prettify(encoding='gb18030'),encoding='gb18030')
def sanitize(s):
return re.sub(r'<[^>]*>',' ',re.sub(r'<!--.*-->',' ',s)).strip()
def child_div(soup,classname):
return unicode(soup.find('div',{'class':classname}))
def one_to_article(one,link):
return {'link':link,
'title': sanitize(unicode(one.find('h1'))),
'author':sanitize(unicode(one.find('p',{'class':'auther'}))),
'content':child_div(one,'neirong'),
'datetime':datetime.datetime.now()}
def question_to_article(q,link):
return {'link':link,
'title':u'\u4e00\u4e2a\u95ee\u9898',
'author':'everyone',
'content':'<br/>'.join([child_div(q,'question'), child_div(q,'answer')]),
'datetime':datetime.datetime.now()}
def topbox_to_article(t,link):
return {'link':link,
'title':u'\u4e00\u4e2a\u89c6\u89d2',
'author':sanitize(child_div(t,'intename')),
#'content':unicode(t.find('p',{'class':'bigPic'}).findChild('img'))
'content':unicode(t.find('div',{'id':'tips'}))
+ u'<br/>' + sanitize(unicode(t.find('p',{'class':'intext'}))),
'datetime':datetime.datetime.now()}
class MainPage(webapp.RequestHandler):
def get(self):
items = []
query = db.Query(Article).order('-datetime').fetch(12)
for a in query:
items.append(item % (
a.link.encode('utf-8'),
a.title.encode('utf-8'),
a.author.encode('utf-8'),
a.content.encode('utf-8'),
a.datetime.strftime('%a, %d %b %Y %X GMT').encode('utf-8')
))
self.response.headers['Content-Type'] = 'application/rss+xml'
self.response.out.write(rss % (u'\u4e00\u4e2a'.encode('utf-8'), '\n'.join(items)))
class Update(webapp.RequestHandler):
def get(self):
self.response.headers['Content-Type'] = 'text/plain'
days = int(self.request.get('days','0'))
if not days:
diff = datetime.datetime.now() + datetime.timedelta(hours = 8) - datetime.datetime(2012,6,11)
self.response.out.write(str(diff)+'\n')
days = diff.days + 1
link = 'http://hanhan.qq.com/hanhan/one/one%d.htm' % days
result = urlfetch.fetch(link)
if result.status_code == 200:
soup = BeautifulSoup.BeautifulSoup(result.content, fromEncoding='gb18030')
for b in soup.findAll('b',{'class':'fr'}):
b.extract()
topbox = soup.find('div',{'class':'topBoxIn'})
if not topbox: # no content today
self.response.out.write('No content for %s !' % link)
return
# check no change
top = topbox_to_article(topbox ,link)
old_top = db.Query(Article).order('-datetime').filter('title =',u'\u4e00\u4e2a\u89c6\u89d2').fetch(1)
if old_top and old_top[0].content == top['content']:
self.response.out.write('No change! for %s' % link)
return
all_ones = soup.findAll('div', {'class':'ones'})
ones = filter(lambda o: o.find('h1',{'class':'tit'}), all_ones)
ones = [one_to_article(o,link) for o in ones]
qs = filter(lambda o: o.find('div',{'class':'question'}), all_ones)
qs = [question_to_article(q,link) for q in qs]
articles = ones+qs+[top]
for a in articles:
Article(**a).put()
self.response.out.write(str(len(articles))+' new articles for '+ link +' !')
else:
self.response.out.write('FAILED!')
application = webapp.WSGIApplication(
[('/', MainPage),('/update',Update),
], debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment