Skip to content

Instantly share code, notes, and snippets.

@telen
Last active June 14, 2016 14:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save telen/7226712 to your computer and use it in GitHub Desktop.
Save telen/7226712 to your computer and use it in GitHub Desktop.
使用python抓取北京/广州摇号网站公布结果,并解析pdf到文本 并写入数据库 写入oracle使用sqlldr命令导入 写入mysql则直接insert(好像太多的数据executemany执行不了,所以切成小块执行)
beijing,personal,201310,1,6998102285168,刘宇宸
beijing,personal,201310,2,5491100633292,王良
beijing,personal,201310,3,3446103258689,王墨
#!/usr/bin/env python
# -*- coding:utf8 -*-
# 抓取北京摇号公布结果并转换成格式化的文本,使用sqlldr导入到Oracle数据库
# Require beautifulsoup, pdfminer
# Author: telen telen.telen@gmail.com
# Date: 2013-10-23
import os
import urllib2
import urllib
from bs4 import BeautifulSoup
import subprocess
from parsePdfViaPdfminer import PdfParser
urls = ['http://www.bjhjyd.gov.cn/jggb/index.html', 'http://jtzl.gzjt.gov.cn/index/gbl/']
visit = []
new_pdf = []
pdf_md5 = {} # 文件名对应md5字典
dbname = 'db'
host = '10.2.134.58'
pwd = '123456'
user = 'user'
def scrapUrlsBeijing():
# 伪装浏览器agent
headers = {'User-Agent':'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'}
req = urllib2.Request(urls[0], headers=headers)
content = urllib2.urlopen(req).read()
soup = BeautifulSoup(content)
for tag in soup.findAll('a', {"class":"text"}):
#print 'tag text:' + tag.getText().encode('utf8')
if '个人指标'.decode('utf8') in tag.getText():
print tag.getText().strip().encode('utf8'), tag['href']
visit.append(tag['href'])
def scrapUrlsGuangzhou():
# 伪装浏览器agent
headers = {'User-Agent':'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'}
req = urllib2.Request(urls[1], headers=headers)
content = urllib2.urlopen(req).read()
soup = BeautifulSoup(content)
for tag in soup.findAll('a', {"class":"text"}):
#print 'tag text:' + tag.getText().encode('utf8')
if '指标配置结果'.decode('utf8') in tag.getText():
print tag.getText().strip().encode('utf8'),tag['href']
visit.append(tag['href'])
# 解析下载页面并下载pdf
# TODO 验证md5
def downloadPdf(url):
loadsoup = BeautifulSoup(urllib2.urlopen(url))
tags = loadsoup.find('div', {'class':'details'})
pdfurl = tags.find('a')['href']
pdfname = pdfurl[pdfurl.rindex('/')+1:]
# save md5 text
if pdfname not in pdf_md5:
pdf_md5[pdfname] = tags.find('p').getText()[13:].encode('utf8')
print pdfurl
# if new, then download. 如果新文件,下载,否则文件已经存在不下载
if pdfname not in [f for f in os.listdir('./files/bj/') if os.path.isfile(os.path.join('./files/bj/',f))]:
print "new file, download..."
urllib.urlretrieve(pdfurl, 'files/bj/'+pdfname)
new_pdf.append('files/bj/'+pdfname)
def downloadPdfGuangzhou(url):
loadsoup = BeautifulSoup(urllib2.urlopen(url))
tags = loadsoup.find('div', {'class':'details'})
pdfurl = tags.findAll('a')
for ourl in pdfurl:
rurl = ourl['href']
pdfname = rurl[rurl.rindex('/')+1:]
print pdfname
# if new, then download. 如果新文件,下载,否则文件已经存在不下载
if pdfname not in [f for f in os.listdir('./files/gz/') if os.path.isfile(os.path.join('./files/gz/',f))]:
print "new file, download..."
urllib.urlretrieve(rurl, 'files/gz/'+pdfname)
new_pdf.append('files/gz/'+pdfname)
def load2db(file):
""" Load formatted file to oracle database via oracle's sqlldr command."""
# use the subprocess modual to execute sqlldr command, with parameters
print file
subprocess.call(['sqlldr', 'userid=×××/×××@host/orcl', \
'control=import.ctl', 'errors=9999', 'data='+file])
def chunks(l, n):
return [l[i:i+n] for i in range(0, len(l), n)]
def load2mysql(conn, file_d):
""" load to mysql """
with open(file_d) as f:
lines = f.readlines()
for slict in chunks(lines, 500):
try:
conn.cursor().executemany("""insert into opg_navi_carlottery(city, type, term, reqindex, reqcode, reqname)
values(%s, %s, %s, %s, %s, %s);""", map(tuple, map((lambda x: x.strip().split(",")), slict)) )
conn.commit()
except:
conn.rollback()
f.close()
if __name__ == '__main__':
sqlldr_cmd = 'sqlldr userid=×××/×××@host/orcl control=${LOGDIR1}/import.ctl errors=9999 \
log=${LOGDIR}/${DIR}/vf_tmp.log bad=${LOGDIR}/${DIR}/vf_tmp.bad data="${LOGDIR}/${DIR}/vf.log.${YEST}"'
if not os.path.exists("files/bj"):
os.makedirs("files/bj")
if not os.path.exists("files/gz"):
os.makedirs("files/gz")
try:
#scrapUrlsBeijing()
scrapUrlsGuangzhou()
except Exception, e:
raise e
for url in visit:
#downloadPdf(url)
downloadPdfGuangzhou(url)
pdfParser = PdfParser()
# loop all pdf files
"""
for pdf_file in [f for f in os.listdir('./files/bj/') if os.path.isfile(os.path.join('./files',f))]:
data_file = pdfParser.parsePdf("files/" + pdf_file)
print data_file
#load2db(data_file)
"""
conn = MySQLdb.connect("10.2.134.58", user, pwd, dbname, charset='utf8')
# loop new pdf files
while len(new_pdf):
data_file = pdfParser.parsePdf(new_pdf.pop())
print data_file
#load2mysql(conn, data_file)
#load2db(data_file)
conn.close()
#print pdf_md5
print "Fin."
#!/usr/bin/env python
# -*- coding:utf8 -*-
# 抓取北京摇号公布结果并转换成格式化的文本,使用sqlldr导入到Oracle数据库
# Require beautifulsoup, pdfminer
# Author: telen telen.telen@gmail.com
# Date: 2013-10-23
import os
import re
from pdfminer.pdfparser import PDFParser, PDFDocument
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.pdfdevice import PDFDevice
from pdfminer.layout import LAParams
from pdfminer.converter import PDFPageAggregator
class PdfParser(object):
""" Parse A pdf file to text."""
def __init__(self, data_dir="files/loadfiles", p=re.compile('\d+\s{6}\d{13}\s{6}\W+'), \
city='beijing', q_type='personal'):
self.data_dir = data_dir
self.p = p
self.city = city
self.q_type = q_type
def parsePdf(self, pdf_file):
""" Parse a file to text and format and write to text file,
prepare to sqlldr(Oracle) command """
# Open a PDF file.
fb = open(pdf_file, 'rb')
# Create a PDF parser object associate with the file object.
parser = PDFParser(fb)
# Create a PDF document object that stores the document structure.
doc = PDFDocument()
# Connect the parser and document objects.
parser.set_document(doc)
doc.set_parser(parser)
# Supply the password for initialization
# (If no password is set, given an empty string.)
doc.initialize('')
# Check if the document allows text extraction. If not, abort.
if not doc.is_extractable:
raise PDFTextExtractionNotAllowed
# Create a PDF resource manage object that stores shared resources.
rsrcmgr = PDFResourceManager()
# Creatre a PDF device object.
#device = PDFDevice(rsrcmgr)
# Set parameter for analysis.
laparams = LAParams()
# Create a PDF page aggregator object.
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
# Create a PDF interpreter object.
interpreter = PDFPageInterpreter(rsrcmgr, device)
# first, create folder
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
f = object
term_code = ''
detect_city = True
# Precess each page contained in the document.
for page in doc.get_pages():
#print page
interpreter.process_page(page)
# receive the LTPage object for the page.
layout = device.get_result()
for pg_obj in layout:
for box_obj in pg_obj:
line = box_obj.get_text().encode('utf8').strip()
if detect_city:
if '北京市' in line:
self.city = 'beijing'
detect_city = False
elif '广州市' in line:
self.city = 'guangzhou'
detect_city = False
if '单位指标配置' in line or '单位普通指标配置' in line or '单位节能指标配置' in line:
self.q_type = 'company'
elif '个人指标配置' in line or '个人普通指标配置' in line or '个人节能指标配置' in line:
self.q_type = 'personal'
if '分期编号' in line:
term_code = line[-6:]
f = open(self.data_dir + '/' + self.city + '_' + self.q_type + '_' + term_code + '.dat', 'w')
print 'term code:' + line[-6:]
# quta lists
m = self.p.match(line)
if m:
formated_line = self.city +',' + self.q_type + ',' + term_code + \
',' + ','.join(m.group().split(' ')) + '\n'
#print formated_line
f.write(formated_line)
#print ','.join(line.split(" "))
break
with fb:
fb.close()
with f:
f.close()
print "DONE."
return self.data_dir + '/' + self.city + '_' + self.q_type + '_' + term_code + '.dat'
if __name__ == '__main__':
pdfParser = PdfParser()
pdfParser.parsePdf("files/1298687736510.pdf")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment