Skip to content

Instantly share code, notes, and snippets.

@steelywing
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steelywing/832457653281be6f8e49 to your computer and use it in GitHub Desktop.
Save steelywing/832457653281be6f8e49 to your computer and use it in GitHub Desktop.
import re
import csv
import sys
import requests
from requests.exceptions import RequestException
import threading
from subprocess import call
from bs4 import BeautifulSoup
from html.parser import HTMLParseError
from urllib.parse import urljoin
def download(url, filename, session=requests, retry=2):
for i in range(retry):
try:
response = session.get(url)
break
except RequestException:
# print('retry to download "{}"...'.format(url))
pass
else:
return False
with open(filename, 'wb') as f:
f.write(response.content)
return True
class LoadUrl:
semaphore = None
session = requests
@classmethod
def load(cls, url, encoding, retry=2) -> 'BeautifulSoup':
"""return BeautifulSoup instance"""
# print('loading "{}"...'.format(url))
if cls.semaphore is None:
raise ValueError('{}.semaphore not initialized'.format(
type(cls).__name__
))
with cls.semaphore:
for i in range(retry):
try:
response = cls.session.get(url)
break
except RequestException:
# print('retry to load "{}"...'.format(url))
pass
else:
raise RequestError('fail to load: ' + url)
response.encoding = encoding
return BeautifulSoup(response.text)
def load_material_info(url: 'HTML URL of material', encoding='utf-8', session=requests):
dom = LoadUrl.load(url, encoding)
title = dom.select('h2.name')
if not title:
raise HTMLParseError('title not found')
title = title[0].get_text()
tags = dom.select('div.txt p')
if tags:
tags = tags[0].get_text()
# tags = tags.split(',')
else:
print('tags not found in url: ' + url, file=sys.stderr)
links = dom.select('a.unnamed1')
if not links:
raise HTMLParseError('download link not found')
link = urljoin(url, links[0].attrs['href'])
# ext = re.search('\.\w+$', link)
# if ext:
# title += ext.group(0)
# print('downloading "{}" ({})...'.format(
# title, link
# ))
# success = download(link, title, session=session)
# print('done' if success else 'fail')
# output.write('{},{},{}\n'.format(title, str(tags), link))
return link, title, tags
class MaterialThread(threading.Thread):
csv_writer = None
csv_lock = threading.Lock()
def __init__(self, url, encoding):
super().__init__(name=url)
self.url = url
self.encoding = encoding
def run(self):
try:
with self.csv_lock:
self.csv_writer.writerow(
load_material_info(
self.url,
self.encoding,
)
)
except (RequestException, HTMLParseError) as e:
message = 'fail to load material page "{}" ({})'.format(url, e)
print(message)
print(message, file=sys.stderr)
# change windows shell to utf-8 code page
call(['chcp', '65001'], shell=True)
# sys.stderr = open('sccnn.log', 'w', encoding='utf-8')
url = 'http://www.sccnn.com/shiliangtuku/'
html_encoding = 'gb2312'
threads = []
output = open('list.csv', 'w', encoding='utf-8', newline='')
csv_writer = csv.writer(output)
MaterialThread.csv_writer = csv_writer
# max 10 running threads
LoadUrl.semaphore = threading.Semaphore(10)
LoadUrl.session = requests.Session()
LoadUrl.session.headers.update({'referer': 'http://www.sccnn.com/'})
try:
while True:
print('loading "{}"...'.format(url))
dom = LoadUrl.load(url, html_encoding)
# find material links
tables = dom.find_all('table', width='128')
links = []
for table in tables:
a = table.select('a')
if a:
links.append(
urljoin(url, a[0].attrs['href'])
)
for link in links:
thread = MaterialThread(link, html_encoding)
thread.start()
threads.append(thread)
# next page
next = dom.find('a', text='>')
if not next:
break
url = urljoin(url, next.attrs['href'])
except RequestException as e:
print('fail to load url "{}" ({})'.format(url, e))
except KeyboardInterrupt:
print('canceled, current page: ' + url)
finally:
print('waiting threads to complete...')
print('\n'.join(
[t.name for t in threads if t.is_alive()]
))
for thread in threads:
thread.join()
output.close()
sys.stdout.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment