Skip to content

Instantly share code, notes, and snippets.

@snakers4
Last active September 14, 2023 20:00
Show Gist options
  • Save snakers4/51b1b60a976f579393fc7b70e653ceb9 to your computer and use it in GitHub Desktop.
Save snakers4/51b1b60a976f579393fc7b70e653ceb9 to your computer and use it in GitHub Desktop.
Plain common crawl pre-processing
import gc
import gzip
import time
import json
import shutil
import os,sys
import tldextract
import collections
import pandas as pd
from tqdm import tqdm
import urllib.request
storage_folder = 'data/'
file_prefix = 'https://commoncrawl.s3.amazonaws.com/'
def read_every_line(fname,
max_lines=-1):
lines = []
with open(fname, encoding='utf-8') as f:
for i, l in enumerate(f):
lines.append(l)
if i>max_lines and max_lines>0:
break
return lines
def reporthook(count, block_size, total_size):
global start_time
if count == 0:
start_time = time.time()
return
duration = time.time() - start_time
progress_size = int(count * block_size)
speed = int(progress_size / (1024 * duration))
percent = int(count * block_size * 100 / total_size)
sys.stdout.write("\r...%d%%, %d MB, %d KB/s, %d seconds passed" %
(percent, progress_size / (1024 * 1024), speed, duration))
sys.stdout.flush()
def save(url, filename):
urllib.request.urlretrieve(url, filename, reporthook)
def process_index_file_line(line):
assert type(line)==str
try:
lst = line.replace('\n','').split()
ts = lst[1]
data = json.loads(line.replace('\n','').split(ts)[-1].strip())
except:
return ()
if data['status'] != '200':
return ()
else:
try:
language = data['languages']
except:
language = 'none'
try:
_tldextract = tldextract.extract(data['url'])
tup = (ts,
data['url'],
_tldextract.suffix,
data['length'],
data['offset'],
data['filename'],
language
)
return tup
except:
return ()
def process_index_file(file_name):
print('Unzipping index file ... ')
df_name = file_name.replace('.gz','.feather')
file_unzipped = file_name.split('.gz')[0]
with gzip.open(file_name, 'rb') as f_in:
with open(file_unzipped, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
lines = read_every_line(file_unzipped,
1e8)
print('{} lines extracted'.format(len(lines)))
print('Pre-processing index lines ... ')
out = list_multiprocessing(lines,
process_index_file_line,
workers=8)
# filter our blank lines
out = [_ for _ in out if _ != ()]
print('Index pre-processed ... ')
print('Processing index dataframe ... ')
ts_list = [_[0] for _ in out]
url_list = [_[1] for _ in out]
tld = [_[2] for _ in out]
length_list = [_[3] for _ in out]
offset_list = [_[4] for _ in out]
warc_list = [_[5] for _ in out]
language_list = [_[6] for _ in out]
cols = ['ts','url','tld','length','offset','warc','language']
df = pd.DataFrame(data={
'ts':ts_list,
'url':url_list,
'tld':tld,
'length':length_list,
'offset':offset_list,
'warc':warc_list,
'language':language_list}
,columns=cols)
df = df[df.language=='rus']
df['wet'] = df.warc.apply(lambda x: x.replace('/warc/','/wet/').replace('.warc.','.warc.wet.'))
df['wet'] = df['wet'].apply(lambda x: file_prefix + x)
print('Index dataframe is ready ... ')
os.remove(file_name)
os.remove(file_unzipped)
print('Files removed ... ')
df = df.dropna().drop_duplicates().reset_index(drop=True)
df.to_feather(df_name)
print('Df saved ... ')
def list_multiprocessing(param_lst,
func,
**kwargs):
workers = kwargs.pop('workers')
with Pool(workers) as p:
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)]
result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst)))
# lists do not need such sorting, but this can be useful later
result=sorted(result,key=lambda x:x[0])
return [_[1] for _ in result]
def _apply_lst(args):
params, func, num, kwargs = args
return num, func(*params,**kwargs)
cc_indexes = read_every_line('data/cc-index.paths')
# remove the meta-data / technical lines
cc_indexes = cc_indexes[:-2]
# remove line breaks
cc_indexes = [_.replace('\n','') for _ in cc_indexes]
file_dict = collections.OrderedDict()
# iterate over the index files
for i,cc_index in enumerate(cc_indexes):
if i>75:
cc_index_file = cc_index.split('/')[-1]
file_dict[os.path.join(storage_folder,cc_index_file)] = file_prefix + cc_index
else:
pass
for i,(file_name,url) in enumerate(tqdm(file_dict.items())):
print('PROCESSING INDEX FILE [{}]/[{}] ...'.format(i,len(file_dict)))
print('Downloading an index file {} ...'.format(file_name))
save(url, file_name)
process_index_file(file_name)
gc.collect()
# print(i,(file_name,url))
print('Downloaded an index file ...')
import gc
import time
import pickle
import pandas as pd
def pckl(obj,path):
with open(path, 'wb') as handle:
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)
def upkl(path):
with open(path, 'rb') as handle:
_ = pickle.load(handle)
return _
dfs = []
# get whole index files
for i in range(0,10):
dfs.append(pd.read_feather('index_269_{}.feather'.format(str(i))))
print('Index {} loaded'.format(i))
df = pd.concat(dfs)
del dfs
gc.collect()
print('Memory released')
time.sleep(10)
# rank wet files by their popularity within Russian websites
wet_urls = list(df.wet.value_counts().index)
url_set = set(df.url.unique())
pckl(wet_urls,'wet_urls.pickle')
pckl(url_set,'url_set.pickle')
import gc
import re
import gzip
import time
# just git clone https://github.com/erroneousboat/warc3.git
# you will need a warc subfolder from there
import warc
import nltk
import pickle
# you will need to download this
nltk.download('punkt')
import shutil
import os,sys
import tldextract
import pandas as pd
from tqdm import tqdm
import urllib.request
from multiprocessing import Pool
def pckl(obj,path):
with open(path, 'wb') as handle:
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)
def upkl(path):
with open(path, 'rb') as handle:
_ = pickle.load(handle)
return _
def reporthook(count, block_size, total_size):
global start_time
if count == 0:
start_time = time.time()
return
duration = time.time() - start_time
progress_size = int(count * block_size)
speed = int(progress_size / (1024 * duration))
percent = int(count * block_size * 100 / total_size)
sys.stdout.write("\r...%d%%, %d MB, %d KB/s, %d seconds passed" %
(percent, progress_size / (1024 * 1024), speed, duration))
sys.stdout.flush()
def save(url, filename):
urllib.request.urlretrieve(url, filename, reporthook)
def read_every_line(fname,
max_lines=-1):
lines = []
with open(fname, encoding='utf-8') as f:
for i, l in enumerate(f):
lines.append(l)
if i>max_lines and max_lines>0:
break
return lines
def remove_special_chars(text,char_list):
for char in char_list:
text=text.replace(char,'')
return text.replace(u'\xa0', u' ')
def remove_html_tags(text):
"""Remove html tags from a string"""
import re
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
def _remove_non_printed_chars(string):
reg = re.compile('[^a-zA-Zа-яА-ЯёЁ]')
return reg.sub('', string)
def process_web_text(text):
# fist remove any remaining HTML
text = remove_html_tags(text)
# then split by line
sentences = text.split('\n')
# then omit sentences with more than 50% non printable chars
sentences = [nltk.sent_tokenize(sentence) for sentence in sentences if len(sentence)//2<len(_remove_non_printed_chars(sentence))-2]
sentences = [item for sublist in sentences for item in sublist]
return sentences
def list_multiprocessing(param_lst,
func,
**kwargs):
workers = kwargs.pop('workers')
with Pool(workers) as p:
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)]
result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst)))
# lists do not need such sorting, but this can be useful later
result=sorted(result,key=lambda x:x[0])
return [_[1] for _ in result]
def _apply_lst(args):
params, func, num, kwargs = args
return num, func(*params,**kwargs)
def process_wet_file(wet_url):
global url_set
try:
print('Downloading WET file {} ... '.format(wet_url))
file_name = wet_url.split('/')[-1]
file_unzipped = file_name.replace('.warc.wet.gz','.warc')
save(wet_url, file_name)
print('Download complete {} ... ')
print('Unzipping index file ... ')
df_name = file_name.replace('.warc.wet.gz','.feather')
cols = ['url','domain','tld','sentence']
df = pd.DataFrame(columns=cols)
# unzip a file
with gzip.open(file_name, 'rb') as f_in:
with open(file_unzipped, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
lines = read_every_line(file_unzipped,
1e8)
print('File unzipped ... ')
print('Processing WET file ... ')
with warc.open(file_unzipped) as f:
for i,record in enumerate(f):
if record.url in url_set:
_tldextract = tldextract.extract(record.url)
d = _tldextract.domain
tld = _tldextract.suffix
text = record.payload.read().decode("utf-8")
sentences = process_web_text(text)
temp_df = pd.DataFrame(data={
'url':[record.url]*len(sentences),
'domain':[d]*len(sentences),
'tld':[tld]*len(sentences),
'sentence':sentences}
,columns=cols)
df = df.append(temp_df)
print('WET file processed ... ')
os.remove(file_name)
os.remove(file_unzipped)
print('Files removed ... ')
df = df.dropna().drop_duplicates().reset_index(drop=True)
df.to_feather(df_name)
print('Df saved ... ')
except Exception as e:
print('{} is not processed'.format(wet_url))
print(e)
# rank wet files by their popularity within Russian websites
wet_urls = upkl('wet_urls.pickle')
url_set = upkl('url_set.pickle')
print('Total {} WET files, Total {} URLs'.format(len(wet_urls),len(url_set)))
print('Downloads starting now ...')
list_multiprocessing(wet_urls[:],
process_wet_file,
workers=4)
import pandas as pd
dfs = []
cols = ['url','tld','wet']
for i in range(1,100):
dfs.append(pd.read_feather('../common_crawl/data/cdx-000{}.feather'.format(str(i).zfill(2)))[cols])
for i in range(100,269):
dfs.append(pd.read_feather('../common_crawl/data/cdx-00{}.feather'.format(str(i).zfill(3)))[cols])
df = pd.concat(dfs)
df = df.reset_index(drop=True)
chunk_size = len(df) // 10
start = 0
end = chunk_size-1
c = 0
while end < df.shape[0]:
chunk = df.iloc[start:end].reset_index(drop=True)
try:
chunk.to_feather('index_269_{}.feather'.format(str(c)))
except (Exception) as e:
print (e)
print (chunk)
print (chunk.info())
c+=1
start += chunk_size
end += chunk_size
@elmurod1202
Copy link

elmurod1202 commented Dec 20, 2019

Hey @snakers4!.
I was using your code and the 1st command file(parse_cc_index.py) has a script and I did'nt get why you have 75 there?, what was the purpose of giving that boundary?.
Code here:
# iterate over the index files for i,cc_index in enumerate(cc_indexes): if i>75: cc_index_file = cc_index.split('/')[-1] file_dict[os.path.join(storage_folder,cc_index_file)] = file_prefix + cc_index else: pass

@snakers4
Copy link
Author

@elmurod1202
I believe I was just re-running something manually after something crashed
It was a long time ago

@yemaney
Copy link

yemaney commented Sep 14, 2023

What is the format of string that is supposed to be in the data/cc-index.paths file?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment