Created
December 1, 2020 13:36
-
-
Save internaut/b6039f99a730181945a9dc7e263a8ea3 to your computer and use it in GitHub Desktop.
Sample scripts for blog post "Robust data collection via web scraping and web APIs".
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample scripts for blog post "Robust data collection via web scraping and web APIs" | |
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/). | |
Script 1. Starting point – baseline (unreliable) web scraping script. | |
December 2020, Markus Konrad <markus.konrad@wzb.eu> | |
""" | |
from datetime import datetime, timedelta | |
from collections import defaultdict | |
import json | |
import requests | |
from bs4 import BeautifulSoup | |
#%% | |
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html' | |
# start day for archive retrieval | |
START_DATE = datetime(2020, 11, 1) | |
# last day for archive retrieval | |
END_DATE = datetime(2020, 11, 7) | |
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json' | |
#%% | |
archive_rows = defaultdict(list) | |
duration = END_DATE - START_DATE # timedelta | |
# loop through the days in the specified timespan | |
for day in range(duration.days + 1): | |
fetch_date = START_DATE + timedelta(days=day) | |
fetch_date_str = fetch_date.date().isoformat() | |
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year) | |
print(f'day {day+1}: {fetch_date_str} from {archive_url}') | |
# fetch HTML from archive URL | |
resp = requests.get(archive_url) | |
if resp.ok: | |
# parse page | |
soup = BeautifulSoup(resp.content, 'html.parser') | |
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'}) | |
headlines_container = container[0].select('article') | |
for hcont in headlines_container: # iterate through article teasers | |
# skip gallery, video, audio, paid content or ads | |
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0 | |
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text: | |
continue | |
# get the URL to the full article | |
title_elem = hcont.select_one('h2 a') | |
if title_elem is None: | |
continue | |
url = title_elem.attrs.get('href', '') | |
if url is None: | |
continue | |
# get headline | |
headline = title_elem.attrs.get('title', '') | |
if not headline: | |
continue | |
headline = headline.replace('\xa0', ' ') | |
# add all fetched metadata for this article at this date | |
archive_rows[fetch_date_str].append({ | |
'archive_headline': headline, | |
'url': url, | |
'archive_retrieved': datetime.today().isoformat(timespec='seconds'), | |
'pub_date': fetch_date_str, | |
}) | |
#%% | |
print(f'storing fetched data to {OUTPUT_JSON}...') | |
with open(OUTPUT_JSON, 'w') as f: | |
json.dump(archive_rows, f, indent=2) | |
print('done.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample scripts for blog post "Robust data collection via web scraping and web APIs" | |
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/). | |
Script 2. Improvement: storing intermediate results to a cache file after every web scraping iteration. | |
December 2020, Markus Konrad <markus.konrad@wzb.eu> | |
""" | |
from datetime import datetime, timedelta | |
from collections import defaultdict | |
import json | |
import os | |
import pickle | |
import requests | |
from bs4 import BeautifulSoup | |
#%% | |
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html' | |
# start day for archive retrieval | |
START_DATE = datetime(2020, 11, 1) | |
# last day for archive retrieval | |
END_DATE = datetime(2020, 11, 7) | |
CACHEFILE = 'cache.pickle' | |
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json' | |
#%% helper functions | |
def load_cache(init_with): | |
if os.path.exists(CACHEFILE): | |
print('loading existing data from %s' % CACHEFILE) | |
with open(CACHEFILE, 'rb') as f: | |
return pickle.load(f) | |
else: | |
print('initializing with empty dataset') | |
return init_with | |
def store_cache(data): | |
with open(CACHEFILE, 'wb') as f: | |
pickle.dump(data, f) | |
#%% | |
archive_rows = load_cache(init_with=defaultdict(list)) | |
duration = END_DATE - START_DATE # timedelta | |
# loop through the days in the specified timespan | |
for day in range(duration.days + 1): | |
fetch_date = START_DATE + timedelta(days=day) | |
fetch_date_str = fetch_date.date().isoformat() | |
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year) | |
print(f'day {day+1}: {fetch_date_str} from {archive_url}') | |
# check if data already exists | |
if fetch_date_str in archive_rows.keys(): | |
print('> already fetched this date – skipping') | |
continue | |
# fetch HTML from archive URL | |
resp = requests.get(archive_url) | |
if resp.ok: | |
# parse page | |
soup = BeautifulSoup(resp.content, 'html.parser') | |
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'}) | |
headlines_container = container[0].select('article') | |
for hcont in headlines_container: # iterate through article teasers | |
# skip gallery, video, audio, paid content or ads | |
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0 | |
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text: | |
continue | |
# get the URL to the full article | |
title_elem = hcont.select_one('h2 a') | |
if title_elem is None: | |
continue | |
url = title_elem.attrs.get('href', '') | |
if url is None: | |
continue | |
# get headline | |
headline = title_elem.attrs.get('title', '') | |
if not headline: | |
continue | |
headline = headline.replace('\xa0', ' ') | |
# add all fetched metadata for this article at this date | |
archive_rows[fetch_date_str].append({ | |
'archive_headline': headline, | |
'url': url, | |
'archive_retrieved': datetime.today().isoformat(timespec='seconds'), | |
'pub_date': fetch_date_str, | |
}) | |
store_cache(archive_rows) | |
#%% | |
print(f'storing fetched data to {OUTPUT_JSON}...') | |
with open(OUTPUT_JSON, 'w') as f: | |
json.dump(archive_rows, f, indent=2) | |
print('done.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample scripts for blog post "Robust data collection via web scraping and web APIs" | |
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/). | |
Script 3. Improvement: set timeout, handle exceptions, define retry strategy. | |
December 2020, Markus Konrad <markus.konrad@wzb.eu> | |
""" | |
from datetime import datetime, timedelta | |
from collections import defaultdict | |
import json | |
import os | |
import pickle | |
import requests | |
from requests.adapters import HTTPAdapter | |
from urllib3 import Retry | |
from bs4 import BeautifulSoup | |
#%% | |
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html' | |
# start day for archive retrieval | |
START_DATE = datetime(2020, 11, 1) | |
# last day for archive retrieval | |
END_DATE = datetime(2020, 11, 7) | |
CACHEFILE = 'cache.pickle' | |
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json' | |
#%% helper functions | |
def load_cache(init_with): | |
if os.path.exists(CACHEFILE): | |
print('loading existing data from %s' % CACHEFILE) | |
with open(CACHEFILE, 'rb') as f: | |
return pickle.load(f) | |
else: | |
print('initializing with empty dataset') | |
return init_with | |
def store_cache(data): | |
with open(CACHEFILE, 'wb') as f: | |
pickle.dump(data, f) | |
#%% | |
#retryadapter = HTTPAdapter(max_retries=3) # see https://2.python-requests.org/en/master/api/#requests.adapters.HTTPAdapter | |
retryadapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)) | |
httpsess = requests.Session() | |
httpsess.mount('https://', retryadapter) | |
archive_rows = load_cache(init_with=defaultdict(list)) | |
duration = END_DATE - START_DATE # timedelta | |
# loop through the days in the specified timespan | |
for day in range(duration.days + 1): | |
fetch_date = START_DATE + timedelta(days=day) | |
fetch_date_str = fetch_date.date().isoformat() | |
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year) | |
print(f'day {day+1}: {fetch_date_str} from {archive_url}') | |
# check if data already exists | |
if fetch_date_str in archive_rows.keys(): | |
print('> already fetched this date – skipping') | |
continue | |
# fetch HTML from archive URL | |
try: | |
# NOW USING httpsess.get(...) INSTEAD OF requests.get(...) | |
#resp = httpsess.get(archive_url, timeout=15) | |
resp = httpsess.get(archive_url, timeout=0.001 if day == 3 else 15) # to try out timeout error on day 3 | |
except IOError as exc: | |
print(f'> got IO error: {exc}') | |
continue | |
if resp.ok: | |
# parse page | |
soup = BeautifulSoup(resp.content, 'html.parser') | |
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'}) | |
headlines_container = container[0].select('article') | |
for hcont in headlines_container: # iterate through article teasers | |
# skip gallery, video, audio, paid content or ads | |
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0 | |
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text: | |
continue | |
# get the URL to the full article | |
title_elem = hcont.select_one('h2 a') | |
if title_elem is None: | |
continue | |
url = title_elem.attrs.get('href', '') | |
if url is None: | |
continue | |
# get headline | |
headline = title_elem.attrs.get('title', '') | |
if not headline: | |
continue | |
headline = headline.replace('\xa0', ' ') | |
# add all fetched metadata for this article at this date | |
archive_rows[fetch_date_str].append({ | |
'archive_headline': headline, | |
'url': url, | |
'archive_retrieved': datetime.today().isoformat(timespec='seconds'), | |
'pub_date': fetch_date_str, | |
}) | |
store_cache(archive_rows) | |
#%% | |
print(f'storing fetched data to {OUTPUT_JSON}...') | |
with open(OUTPUT_JSON, 'w') as f: | |
json.dump(archive_rows, f, indent=2) | |
print('done.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample scripts for blog post "Robust data collection via web scraping and web APIs" | |
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/). | |
Script 4. Improvement: rotate cache file, capture OS signals to avoid corrupted cache file. | |
December 2020, Markus Konrad <markus.konrad@wzb.eu> | |
""" | |
from datetime import datetime, timedelta | |
from collections import defaultdict | |
import json | |
import os | |
import pickle | |
import signal | |
import requests | |
from requests.adapters import HTTPAdapter | |
from urllib3 import Retry | |
from bs4 import BeautifulSoup | |
#%% | |
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html' | |
# start day for archive retrieval | |
START_DATE = datetime(2020, 11, 1) | |
# last day for archive retrieval | |
END_DATE = datetime(2020, 11, 7) | |
CACHEFILE = 'cache.pickle' | |
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json' | |
#%% helper functions | |
def load_cache(init_with): | |
if os.path.exists(CACHEFILE): | |
print('loading existing data from %s' % CACHEFILE) | |
with open(CACHEFILE, 'rb') as f: | |
return pickle.load(f) | |
else: | |
print('initializing with empty dataset') | |
return init_with | |
def store_cache(data, rotate_files=True): | |
if rotate_files and os.path.exists(CACHEFILE): | |
os.rename(CACHEFILE, CACHEFILE + '~') | |
with open(CACHEFILE, 'wb') as f: | |
pickle.dump(data, f) | |
# global variable that is set to True if the script is aborted by OS (e.g. by pressing Ctrl-C) | |
# this makes sure that the script is not interrupted while data is stored to disk which would end up | |
# in a corrupted file | |
abort_script = False | |
def handle_abort(signum, frame): | |
"""Handler for OS signals to abort script. Sets global `abort_script` to True.""" | |
global abort_script | |
print('received signal %d – aborting script...' % signum) | |
abort_script = True | |
# setup handler for OS signals that kill this script | |
for signame in ('SIGINT', 'SIGHUP', 'SIGTERM'): | |
sig = getattr(signal, signame, None) | |
if sig is not None: | |
signal.signal(sig, handle_abort) | |
#%% | |
#retryadapter = HTTPAdapter(max_retries=3) # see https://2.python-requests.org/en/master/api/#requests.adapters.HTTPAdapter | |
retryadapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)) | |
httpsess = requests.Session() | |
httpsess.mount('https://', retryadapter) | |
archive_rows = load_cache(init_with=defaultdict(list)) | |
duration = END_DATE - START_DATE # timedelta | |
# loop through the days in the specified timespan | |
for day in range(duration.days + 1): | |
if abort_script: # if interrupted by OS, break loop | |
break | |
fetch_date = START_DATE + timedelta(days=day) | |
fetch_date_str = fetch_date.date().isoformat() | |
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year) | |
print(f'day {day+1}: {fetch_date_str} from {archive_url}') | |
# check if data already exists | |
if fetch_date_str in archive_rows.keys(): | |
print('> already fetched this date – skipping') | |
continue | |
# fetch HTML from archive URL | |
try: | |
# NOW USING httpsess.get(...) INSTEAD OF requests.get(...) | |
resp = httpsess.get(archive_url, timeout=15) | |
#resp = httpsess.get(archive_url, timeout=0.001 if day == 3 else 15) # to try out timeout error on day 3 | |
except IOError as exc: | |
print(f'> got IO error: {exc}') | |
continue | |
if resp.ok: | |
# parse page | |
soup = BeautifulSoup(resp.content, 'html.parser') | |
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'}) | |
headlines_container = container[0].select('article') | |
for hcont in headlines_container: # iterate through article teasers | |
# skip gallery, video, audio, paid content or ads | |
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0 | |
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text: | |
continue | |
# get the URL to the full article | |
title_elem = hcont.select_one('h2 a') | |
if title_elem is None: | |
continue | |
url = title_elem.attrs.get('href', '') | |
if url is None: | |
continue | |
# get headline | |
headline = title_elem.attrs.get('title', '') | |
if not headline: | |
continue | |
headline = headline.replace('\xa0', ' ') | |
# add all fetched metadata for this article at this date | |
archive_rows[fetch_date_str].append({ | |
'archive_headline': headline, | |
'url': url, | |
'archive_retrieved': datetime.today().isoformat(timespec='seconds'), | |
'pub_date': fetch_date_str, | |
}) | |
store_cache(archive_rows) | |
#%% | |
if abort_script: | |
print('aborted.') | |
exit(1) | |
print(f'storing fetched data to {OUTPUT_JSON}...') | |
with open(OUTPUT_JSON, 'w') as f: | |
json.dump(archive_rows, f, indent=2) | |
print('done.') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sample scripts for blog post "Robust data collection via web scraping and web APIs" | |
(https://datascience.blog.wzb.eu/2020/12/01/robust-data-collection-via-web-scraping-and-web-apis/). | |
Script 5. Improvement: only store on every ith iteration, compress cache file. | |
December 2020, Markus Konrad <markus.konrad@wzb.eu> | |
""" | |
from datetime import datetime, timedelta | |
from collections import defaultdict | |
import json | |
import os | |
import pickle | |
import signal | |
from zipfile import ZipFile, ZIP_DEFLATED | |
import requests | |
from requests.adapters import HTTPAdapter | |
from urllib3 import Retry | |
from bs4 import BeautifulSoup | |
#%% | |
ARCHIVE_URL_FORMAT = 'https://www.spiegel.de/nachrichtenarchiv/artikel-{:02d}.{:02d}.{}.html' | |
# start day for archive retrieval | |
START_DATE = datetime(2020, 11, 1) | |
# last day for archive retrieval | |
END_DATE = datetime(2020, 11, 7) | |
CACHEFILE = 'cache.pickle' | |
CACHERATE = 2 # only store cache on every second day | |
ZIPCACHE = True # if True, compress the cache file | |
OUTPUT_JSON = f'spon_{START_DATE.date().isoformat()}_{END_DATE.date().isoformat()}.json' | |
#%% helper functions | |
def load_cache(init_with): | |
fname = CACHEFILE + '.zip' if ZIPCACHE else CACHEFILE | |
if os.path.exists(fname): | |
print('loading existing data from %s' % fname) | |
if ZIPCACHE: | |
with ZipFile(fname, 'r') as f: | |
return pickle.loads(f.read(CACHEFILE)) | |
else: | |
with open(fname, 'rb') as f: | |
return pickle.load(f) | |
else: | |
print('initializing with empty dataset') | |
return init_with | |
def store_cache(data, rotate_files=True): | |
fname = CACHEFILE + '.zip' if ZIPCACHE else CACHEFILE | |
if rotate_files and os.path.exists(fname): | |
os.rename(fname, fname + '~') | |
print(f'> storing cache to {fname}') | |
if ZIPCACHE: | |
with ZipFile(fname, 'w', compression=ZIP_DEFLATED, compresslevel=9) as f: | |
f.writestr(CACHEFILE, pickle.dumps(data)) | |
else: | |
with open(fname, 'wb') as f: | |
pickle.dump(data, f) | |
# global variable that is set to True if the script is aborted by OS (e.g. by pressing Ctrl-C) | |
# this makes sure that the script is not interrupted while data is stored to disk which would end up | |
# in a corrupted file | |
abort_script = False | |
def handle_abort(signum, frame): | |
"""Handler for OS signals to abort script. Sets global `abort_script` to True.""" | |
global abort_script | |
print('received signal %d – aborting script...' % signum) | |
abort_script = True | |
# setup handler for OS signals that kill this script | |
for signame in ('SIGINT', 'SIGHUP', 'SIGTERM'): | |
sig = getattr(signal, signame, None) | |
if sig is not None: | |
signal.signal(sig, handle_abort) | |
#%% | |
#retryadapter = HTTPAdapter(max_retries=3) # see https://2.python-requests.org/en/master/api/#requests.adapters.HTTPAdapter | |
retryadapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)) | |
httpsess = requests.Session() | |
httpsess.mount('https://', retryadapter) | |
archive_rows = load_cache(init_with=defaultdict(list)) | |
duration = END_DATE - START_DATE # timedelta | |
# loop through the days in the specified timespan | |
for day in range(duration.days + 1): | |
if abort_script: # if interrupted by OS, break loop | |
break | |
fetch_date = START_DATE + timedelta(days=day) | |
fetch_date_str = fetch_date.date().isoformat() | |
archive_url = ARCHIVE_URL_FORMAT.format(fetch_date.day, fetch_date.month, fetch_date.year) | |
print(f'day {day+1}: {fetch_date_str} from {archive_url}') | |
# check if data already exists | |
if fetch_date_str in archive_rows.keys(): | |
print('> already fetched this date – skipping') | |
continue | |
# fetch HTML from archive URL | |
try: | |
# NOW USING httpsess.get(...) INSTEAD OF requests.get(...) | |
resp = httpsess.get(archive_url, timeout=15) | |
#resp = httpsess.get(archive_url, timeout=0.001 if day == 3 else 15) # to try out timeout error on day 3 | |
except IOError as exc: | |
print(f'> got IO error: {exc}') | |
continue | |
if resp.ok: | |
# parse page | |
soup = BeautifulSoup(resp.content, 'html.parser') | |
container = soup.find_all('section', attrs={'data-area': 'article-teaser-list'}) | |
headlines_container = container[0].select('article') | |
for hcont in headlines_container: # iterate through article teasers | |
# skip gallery, video, audio, paid content or ads | |
if any(len(hcont.find_all('span', attrs={'data-conditional-flag': k})) != 0 | |
for k in ('gallery', 'video', 'audio', 'paid')) or 'ANZEIGE' in hcont.text: | |
continue | |
# get the URL to the full article | |
title_elem = hcont.select_one('h2 a') | |
if title_elem is None: | |
continue | |
url = title_elem.attrs.get('href', '') | |
if url is None: | |
continue | |
# get headline | |
headline = title_elem.attrs.get('title', '') | |
if not headline: | |
continue | |
headline = headline.replace('\xa0', ' ') | |
# add all fetched metadata for this article at this date | |
archive_rows[fetch_date_str].append({ | |
'archive_headline': headline, | |
'url': url, | |
'archive_retrieved': datetime.today().isoformat(timespec='seconds'), | |
'pub_date': fetch_date_str, | |
}) | |
# store to cache according to CACHERATE or when this is the last iteration | |
if (day+1) % CACHERATE == 0 or day == duration.days: | |
store_cache(archive_rows) | |
#%% | |
if abort_script: | |
print('aborted.') | |
exit(1) | |
print(f'storing fetched data to {OUTPUT_JSON}...') | |
with open(OUTPUT_JSON, 'w') as f: | |
json.dump(archive_rows, f, indent=2) | |
print('done.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment