Skip to content

Instantly share code, notes, and snippets.

@Segerberg
Created March 11, 2022 21:15
Show Gist options
  • Save Segerberg/9c098b199bfc612441155022d66a16bb to your computer and use it in GitHub Desktop.
Save Segerberg/9c098b199bfc612441155022d66a16bb to your computer and use it in GitHub Desktop.
from warcio.capture_http import capture_http
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import os
import sqlite3
import datetime
import sys
result_page_params = {
'S21CNR': '20',
'Z21ID': '',
'C21COM': 'T',
'S21FMT': 'fullwebr',
'T21CNR': 100,
'T21PRF': 'T=',
'CODE': 'dic_web.mnu'
}
class Dedup:
def __init__(self):
self.file = os.path.join("dedup.db")
def start(self):
conn = sqlite3.connect(self.file)
conn.execute(
"create table if not exists dedup ("
" key varchar(300) primary key);"
)
conn.commit()
conn.close()
def save(self, key):
conn = sqlite3.connect(self.file)
conn.execute(
"insert or replace into dedup (key) values (?)", (key,)
)
conn.commit()
conn.close()
def lookup(self, key):
result = False
conn = sqlite3.connect(self.file)
cursor = conn.execute("select key from dedup where key = ?", (key,))
result_tuple = cursor.fetchone()
conn.close()
if result_tuple:
result = True
return result
def get_page_requisits(elements, key):
with capture_http(warc_filename):
for item in elements:
if dedup.lookup(item[key]):
print("DUP")
continue
dedup.save(item[key])
try:
r = requests.get(f"{parsed_url.scheme}://{parsed_url.hostname}{item[key]}")
except:
continue
def get_records(url):
with capture_http(warc_filename):
records_page = requests.get(f"{parsed_url.scheme}://{parsed_url.hostname}{url}")
record_soup = BeautifulSoup(records_page.content, "html.parser")
try:
content = record_soup.find('td', {"class":"main_content"})
links = content.find_all('a', href=True)
for link in links:
if dedup.lookup(link['href']):
continue
dedup.save(link['href'])
resource = requests.get(f"{parsed_url.scheme}://{parsed_url.hostname}{link['href'].replace('&S21CNR=20', '&S21CNR=2000')}")
print(link['href'])
except AttributeError:
pass
def get_next_dict_page(url, params, next=None, parent=None):
with capture_http(warc_filename):
try:
dictionary_page = requests.get(url,params=params)
print(dictionary_page.url)
dictionary_soup = BeautifulSoup(dictionary_page.content, "html.parser")
title_links = dictionary_soup.find_all('a', href=True) # Find all title links
imgs = dictionary_soup.find_all('img',src=True)
get_page_requisits(imgs, 'src')
for title_link in title_links:
if "S21STR" in title_link['href']: # Filter out links
next = title_link.text
get_records(title_link['href'])
print(next)
if next and parent != next:
params['T21TRM'] = next
get_next_dict_page(url, params, parent=next)
return next
except requests.exceptions.ConnectionError:
print("CONNECTION ERROR")
pass
def main(url):
with capture_http(warc_filename):
r = requests.get(url)
soup = BeautifulSoup(r.content, "html.parser")
script_srcs = soup.find_all('script', src=True)
get_page_requisits(script_srcs,'src')
stylesheets = soup.find_all('link', href=True)
get_page_requisits(stylesheets, 'href')
imgs = soup.find_all('img', src=True)
get_page_requisits(imgs, 'src')
# Find all links to DBS
dbs = soup.find_all('a', href=True)
for db in dbs:
# Filter out all non DB links
if not "C21COM=T" in db['href'] and 'I21DBN' in db['href'] and 'C21COM=S' not in db['href'] and 'javascript' not in db['href']:
# Extract DB name and update params
org_param_list = db['href'].split("&")[1:]
temp_params = {}
params = {}
for value in org_param_list:
value = (value.split("="))
temp_params[value[0]] = value[1]
if "_EX" in temp_params['I21DBN']:
params['I21DBN'] = f"{temp_params['I21DBN']}"
else:
params['I21DBN'] = f"{temp_params['I21DBN']}_EX"
params['P21DBN'] = temp_params['P21DBN']
params.update(result_page_params)
cgi_bin = (db['href'].split("&")[:1][0].split('?')[0]) # creates cgi part of url
frames = ["T=", "G=", "K=", "A="]
for f in frames:
params['T21PRF'] = f
page = get_next_dict_page(f"{parsed_url.scheme}://{parsed_url.hostname}{cgi_bin}", params=params)
if __name__ == '__main__':
url = sys.argv[1]
parsed_url = urlparse(url)
dedup = Dedup()
dedup.start()
warc_filename = f"{parsed_url.hostname}_{datetime.datetime.now().strftime('%Y%m%d-%H_%M_%S')}.warc.gz"
main(url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment