Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Sample scripts for blog post "Robust data collection via web scraping and web APIs".
"""
Sample scripts for blog post "Robust data collection via web scraping and web APIs"
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/).
Script 1. Starting point – baseline (unreliable) web scraping script.
December 2020, Markus Konrad <markus.konrad@wzb.eu>
"""
from datetime import datetime, timedelta
from collections import defaultdict
import json
import requests
from bs4 import BeautifulSoup
#%%
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html'
# start day for archive retrieval
START_DATE = datetime(2020, 11, 1)
# last day for archive retrieval
END_DATE = datetime(2020, 11, 7)
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json'
#%%
archive_rows = defaultdict(list)
duration = END_DATE - START_DATE # timedelta
# loop through the days in the specified timespan
for day in range(duration.days + 1):
fetch_date = START_DATE + timedelta(days=day)
fetch_date_str = fetch_date.date().isoformat()
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year)
print(f'day {day+1}: {fetch_date_str} from {archive_url}')
# fetch HTML from archive URL
resp = requests.get(archive_url)
if resp.ok:
# parse page
soup = BeautifulSoup(resp.content, 'html.parser')
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'})
headlines_container = container[0].select('article')
for hcont in headlines_container: # iterate through article teasers
# skip gallery, video, audio, paid content or ads
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text:
continue
# get the URL to the full article
title_elem = hcont.select_one('h2 a')
if title_elem is None:
continue
url = title_elem.attrs.get('href', '')
if url is None:
continue
# get headline
headline = title_elem.attrs.get('title', '')
if not headline:
continue
headline = headline.replace('\xa0', ' ')
# add all fetched metadata for this article at this date
archive_rows[fetch_date_str].append({
'archive_headline': headline,
'url': url,
'archive_retrieved': datetime.today().isoformat(timespec='seconds'),
'pub_date': fetch_date_str,
})
#%%
print(f'storing fetched data to {OUTPUT_JSON}...')
with open(OUTPUT_JSON, 'w') as f:
json.dump(archive_rows, f, indent=2)
print('done.')
"""
Sample scripts for blog post "Robust data collection via web scraping and web APIs"
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/).
Script 2. Improvement: storing intermediate results to a cache file after every web scraping iteration.
December 2020, Markus Konrad <markus.konrad@wzb.eu>
"""
from datetime import datetime, timedelta
from collections import defaultdict
import json
import os
import pickle
import requests
from bs4 import BeautifulSoup
#%%
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html'
# start day for archive retrieval
START_DATE = datetime(2020, 11, 1)
# last day for archive retrieval
END_DATE = datetime(2020, 11, 7)
CACHEFILE = 'cache.pickle'
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json'
#%% helper functions
def load_cache(init_with):
if os.path.exists(CACHEFILE):
print('loading existing data from %s' % CACHEFILE)
with open(CACHEFILE, 'rb') as f:
return pickle.load(f)
else:
print('initializing with empty dataset')
return init_with
def store_cache(data):
with open(CACHEFILE, 'wb') as f:
pickle.dump(data, f)
#%%
archive_rows = load_cache(init_with=defaultdict(list))
duration = END_DATE - START_DATE # timedelta
# loop through the days in the specified timespan
for day in range(duration.days + 1):
fetch_date = START_DATE + timedelta(days=day)
fetch_date_str = fetch_date.date().isoformat()
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year)
print(f'day {day+1}: {fetch_date_str} from {archive_url}')
# check if data already exists
if fetch_date_str in archive_rows.keys():
print('> already fetched this date – skipping')
continue
# fetch HTML from archive URL
resp = requests.get(archive_url)
if resp.ok:
# parse page
soup = BeautifulSoup(resp.content, 'html.parser')
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'})
headlines_container = container[0].select('article')
for hcont in headlines_container: # iterate through article teasers
# skip gallery, video, audio, paid content or ads
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text:
continue
# get the URL to the full article
title_elem = hcont.select_one('h2 a')
if title_elem is None:
continue
url = title_elem.attrs.get('href', '')
if url is None:
continue
# get headline
headline = title_elem.attrs.get('title', '')
if not headline:
continue
headline = headline.replace('\xa0', ' ')
# add all fetched metadata for this article at this date
archive_rows[fetch_date_str].append({
'archive_headline': headline,
'url': url,
'archive_retrieved': datetime.today().isoformat(timespec='seconds'),
'pub_date': fetch_date_str,
})
store_cache(archive_rows)
#%%
print(f'storing fetched data to {OUTPUT_JSON}...')
with open(OUTPUT_JSON, 'w') as f:
json.dump(archive_rows, f, indent=2)
print('done.')
"""
Sample scripts for blog post "Robust data collection via web scraping and web APIs"
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/).
Script 3. Improvement: set timeout, handle exceptions, define retry strategy.
December 2020, Markus Konrad <markus.konrad@wzb.eu>
"""
from datetime import datetime, timedelta
from collections import defaultdict
import json
import os
import pickle
import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from bs4 import BeautifulSoup
#%%
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html'
# start day for archive retrieval
START_DATE = datetime(2020, 11, 1)
# last day for archive retrieval
END_DATE = datetime(2020, 11, 7)
CACHEFILE = 'cache.pickle'
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json'
#%% helper functions
def load_cache(init_with):
if os.path.exists(CACHEFILE):
print('loading existing data from %s' % CACHEFILE)
with open(CACHEFILE, 'rb') as f:
return pickle.load(f)
else:
print('initializing with empty dataset')
return init_with
def store_cache(data):
with open(CACHEFILE, 'wb') as f:
pickle.dump(data, f)
#%%
#retryadapter = HTTPAdapter(max_retries=3) # see https://2.python-requests.org/en/master/api/#requests.adapters.HTTPAdapter
retryadapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1))
httpsess = requests.Session()
httpsess.mount('https://', retryadapter)
archive_rows = load_cache(init_with=defaultdict(list))
duration = END_DATE - START_DATE # timedelta
# loop through the days in the specified timespan
for day in range(duration.days + 1):
fetch_date = START_DATE + timedelta(days=day)
fetch_date_str = fetch_date.date().isoformat()
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year)
print(f'day {day+1}: {fetch_date_str} from {archive_url}')
# check if data already exists
if fetch_date_str in archive_rows.keys():
print('> already fetched this date – skipping')
continue
# fetch HTML from archive URL
try:
# NOW USING httpsess.get(...) INSTEAD OF requests.get(...)
#resp = httpsess.get(archive_url, timeout=15)
resp = httpsess.get(archive_url, timeout=0.001 if day == 3 else 15) # to try out timeout error on day 3
except IOError as exc:
print(f'> got IO error: {exc}')
continue
if resp.ok:
# parse page
soup = BeautifulSoup(resp.content, 'html.parser')
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'})
headlines_container = container[0].select('article')
for hcont in headlines_container: # iterate through article teasers
# skip gallery, video, audio, paid content or ads
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text:
continue
# get the URL to the full article
title_elem = hcont.select_one('h2 a')
if title_elem is None:
continue
url = title_elem.attrs.get('href', '')
if url is None:
continue
# get headline
headline = title_elem.attrs.get('title', '')
if not headline:
continue
headline = headline.replace('\xa0', ' ')
# add all fetched metadata for this article at this date
archive_rows[fetch_date_str].append({
'archive_headline': headline,
'url': url,
'archive_retrieved': datetime.today().isoformat(timespec='seconds'),
'pub_date': fetch_date_str,
})
store_cache(archive_rows)
#%%
print(f'storing fetched data to {OUTPUT_JSON}...')
with open(OUTPUT_JSON, 'w') as f:
json.dump(archive_rows, f, indent=2)
print('done.')
"""
Sample scripts for blog post "Robust data collection via web scraping and web APIs"
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/).
Script 4. Improvement: rotate cache file, capture OS signals to avoid corrupted cache file.
December 2020, Markus Konrad <markus.konrad@wzb.eu>
"""
from datetime import datetime, timedelta
from collections import defaultdict
import json
import os
import pickle
import signal
import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from bs4 import BeautifulSoup
#%%
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html'
# start day for archive retrieval
START_DATE = datetime(2020, 11, 1)
# last day for archive retrieval
END_DATE = datetime(2020, 11, 7)
CACHEFILE = 'cache.pickle'
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json'
#%% helper functions
def load_cache(init_with):
if os.path.exists(CACHEFILE):
print('loading existing data from %s' % CACHEFILE)
with open(CACHEFILE, 'rb') as f:
return pickle.load(f)
else:
print('initializing with empty dataset')
return init_with
def store_cache(data, rotate_files=True):
if rotate_files and os.path.exists(CACHEFILE):
os.rename(CACHEFILE, CACHEFILE + '~')
with open(CACHEFILE, 'wb') as f:
pickle.dump(data, f)
# global variable that is set to True if the script is aborted by OS (e.g. by pressing Ctrl-C)
# this makes sure that the script is not interrupted while data is stored to disk which would end up
# in a corrupted file
abort_script = False
def handle_abort(signum, frame):
"""Handler for OS signals to abort script. Sets global `abort_script` to True."""
global abort_script
print('received signal %d – aborting script...' % signum)
abort_script = True
# setup handler for OS signals that kill this script
for signame in ('SIGINT', 'SIGHUP', 'SIGTERM'):
sig = getattr(signal, signame, None)
if sig is not None:
signal.signal(sig, handle_abort)
#%%
#retryadapter = HTTPAdapter(max_retries=3) # see https://2.python-requests.org/en/master/api/#requests.adapters.HTTPAdapter
retryadapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1))
httpsess = requests.Session()
httpsess.mount('https://', retryadapter)
archive_rows = load_cache(init_with=defaultdict(list))
duration = END_DATE - START_DATE # timedelta
# loop through the days in the specified timespan
for day in range(duration.days + 1):
if abort_script: # if interrupted by OS, break loop
break
fetch_date = START_DATE + timedelta(days=day)
fetch_date_str = fetch_date.date().isoformat()
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year)
print(f'day {day+1}: {fetch_date_str} from {archive_url}')
# check if data already exists
if fetch_date_str in archive_rows.keys():
print('> already fetched this date – skipping')
continue
# fetch HTML from archive URL
try:
# NOW USING httpsess.get(...) INSTEAD OF requests.get(...)
resp = httpsess.get(archive_url, timeout=15)
#resp = httpsess.get(archive_url, timeout=0.001 if day == 3 else 15) # to try out timeout error on day 3
except IOError as exc:
print(f'> got IO error: {exc}')
continue
if resp.ok:
# parse page
soup = BeautifulSoup(resp.content, 'html.parser')
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'})
headlines_container = container[0].select('article')
for hcont in headlines_container: # iterate through article teasers
# skip gallery, video, audio, paid content or ads
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text:
continue
# get the URL to the full article
title_elem = hcont.select_one('h2 a')
if title_elem is None:
continue
url = title_elem.attrs.get('href', '')
if url is None:
continue
# get headline
headline = title_elem.attrs.get('title', '')
if not headline:
continue
headline = headline.replace('\xa0', ' ')
# add all fetched metadata for this article at this date
archive_rows[fetch_date_str].append({
'archive_headline': headline,
'url': url,
'archive_retrieved': datetime.today().isoformat(timespec='seconds'),
'pub_date': fetch_date_str,
})
store_cache(archive_rows)
#%%
if abort_script:
print('aborted.')
exit(1)
print(f'storing fetched data to {OUTPUT_JSON}...')
with open(OUTPUT_JSON, 'w') as f:
json.dump(archive_rows, f, indent=2)
print('done.')
"""
Sample scripts for blog post "Robust data collection via web scraping and web APIs"
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/).
Script 5. Improvement: only store on every ith iteration, compress cache file.
December 2020, Markus Konrad <markus.konrad@wzb.eu>
"""
from datetime import datetime, timedelta
from collections import defaultdict
import json
import os
import pickle
import signal
from zipfile import ZipFile, ZIP_DEFLATED
import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from bs4 import BeautifulSoup
#%%
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html'
# start day for archive retrieval
START_DATE = datetime(2020, 11, 1)
# last day for archive retrieval
END_DATE = datetime(2020, 11, 7)
CACHEFILE = 'cache.pickle'
CACHERATE = 2 # only store cache on every second day
ZIPCACHE = True # if True, compress the cache file
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json'
#%% helper functions
def load_cache(init_with):
fname = CACHEFILE + '.zip' if ZIPCACHE else CACHEFILE
if os.path.exists(fname):
print('loading existing data from %s' % fname)
if ZIPCACHE:
with ZipFile(fname, 'r') as f:
return pickle.loads(f.read(CACHEFILE))
else:
with open(fname, 'rb') as f:
return pickle.load(f)
else:
print('initializing with empty dataset')
return init_with
def store_cache(data, rotate_files=True):
fname = CACHEFILE + '.zip' if ZIPCACHE else CACHEFILE
if rotate_files and os.path.exists(fname):
os.rename(fname, fname + '~')
print(f'> storing cache to {fname}')
if ZIPCACHE:
with ZipFile(fname, 'w', compression=ZIP_DEFLATED, compresslevel=9) as f:
f.writestr(CACHEFILE, pickle.dumps(data))
else:
with open(fname, 'wb') as f:
pickle.dump(data, f)
# global variable that is set to True if the script is aborted by OS (e.g. by pressing Ctrl-C)
# this makes sure that the script is not interrupted while data is stored to disk which would end up
# in a corrupted file
abort_script = False
def handle_abort(signum, frame):
"""Handler for OS signals to abort script. Sets global `abort_script` to True."""
global abort_script
print('received signal %d – aborting script...' % signum)
abort_script = True
# setup handler for OS signals that kill this script
for signame in ('SIGINT', 'SIGHUP', 'SIGTERM'):
sig = getattr(signal, signame, None)
if sig is not None:
signal.signal(sig, handle_abort)
#%%
#retryadapter = HTTPAdapter(max_retries=3) # see https://2.python-requests.org/en/master/api/#requests.adapters.HTTPAdapter
retryadapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1))
httpsess = requests.Session()
httpsess.mount('https://', retryadapter)
archive_rows = load_cache(init_with=defaultdict(list))
duration = END_DATE - START_DATE # timedelta
# loop through the days in the specified timespan
for day in range(duration.days + 1):
if abort_script: # if interrupted by OS, break loop
break
fetch_date = START_DATE + timedelta(days=day)
fetch_date_str = fetch_date.date().isoformat()
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year)
print(f'day {day+1}: {fetch_date_str} from {archive_url}')
# check if data already exists
if fetch_date_str in archive_rows.keys():
print('> already fetched this date – skipping')
continue
# fetch HTML from archive URL
try:
# NOW USING httpsess.get(...) INSTEAD OF requests.get(...)
resp = httpsess.get(archive_url, timeout=15)
#resp = httpsess.get(archive_url, timeout=0.001 if day == 3 else 15) # to try out timeout error on day 3
except IOError as exc:
print(f'> got IO error: {exc}')
continue
if resp.ok:
# parse page
soup = BeautifulSoup(resp.content, 'html.parser')
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'})
headlines_container = container[0].select('article')
for hcont in headlines_container: # iterate through article teasers
# skip gallery, video, audio, paid content or ads
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text:
continue
# get the URL to the full article
title_elem = hcont.select_one('h2 a')
if title_elem is None:
continue
url = title_elem.attrs.get('href', '')
if url is None:
continue
# get headline
headline = title_elem.attrs.get('title', '')
if not headline:
continue
headline = headline.replace('\xa0', ' ')
# add all fetched metadata for this article at this date
archive_rows[fetch_date_str].append({
'archive_headline': headline,
'url': url,
'archive_retrieved': datetime.today().isoformat(timespec='seconds'),
'pub_date': fetch_date_str,
})
# store to cache according to CACHERATE or when this is the last iteration
if (day+1) % CACHERATE == 0 or day == duration.days:
store_cache(archive_rows)
#%%
if abort_script:
print('aborted.')
exit(1)
print(f'storing fetched data to {OUTPUT_JSON}...')
with open(OUTPUT_JSON, 'w') as f:
json.dump(archive_rows, f, indent=2)
print('done.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment