-
-
Save leogao2/e09b64eae3b987925ccf3b86401624c6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from utils import * | |
import magic | |
mime = magic.Magic(mime=True) | |
import re | |
import multiprocessing as mp | |
import chardet | |
import bs4 | |
import time | |
sh("mkdir -p tmp tmp2 out done fallback_needed errored") | |
def any_to_utf8(b): | |
try: | |
return b.decode('utf-8') | |
except UnicodeDecodeError: | |
# try to figure out encoding if not urf-8 | |
guess = chardet.detect(b)['encoding'] | |
if not guess or guess == 'UTF-8': return | |
try: | |
return b.decode(guess) | |
except (UnicodeDecodeError, LookupError): | |
# still cant figure out encoding, give up | |
return | |
def convert(tex): | |
print(tex) | |
out_name = tex.split('/')[2:] >> join('_') | |
try: | |
with open(tex, 'rb') as fh: | |
b = fh.read() | |
cont = any_to_utf8(b) | |
if cont is None: return | |
fwrite(tex, cont) | |
except FileNotFoundError: | |
# ??? | |
return | |
try: | |
sh(f'timeout 10s pandoc -s {tex} -o out/{out_name}.md --wrap=none') | |
print(os.path.exists(f'out/{out_name}.md')) | |
except ExitCodeError: | |
import traceback | |
traceback.print_exc() | |
# fallback: | |
try: | |
# move to fallback pile so we can handle it later | |
if '_extract' in tex.split('/')[:-1] >> join('/'): | |
loc = tex.split('/')[:-1] >> join('/') | |
else: | |
loc = tex | |
sh(f'mv {loc} fallback_needed/') | |
return | |
# TODO: extract into script | |
sh(f'timeout 30s latexml {tex} --dest=tmp/{out_name}.xml --quiet --quiet 2>/dev/null && timeout 10s latexmlpost tmp/{out_name}.xml --mathtex --dest=tmp/{out_name}.html --nographicimages --nopictureimages') | |
soup = fread(f'tmp/{out_name}.html') >> apply(bs4.BeautifulSoup) | |
for x in soup.findAll("table", {"class": "ltx_eqn_table"}): | |
x.name = 'div' | |
fwrite(f'tmp/{out_name}.html', str(soup)) | |
sh(f'timeout 10s pandoc -s tmp/{out_name}.html -t markdown_github-raw_html -o out/{out_name}_FALLBACK.md') | |
h = fread(f'out/{out_name}_FALLBACK.md') >> apply(X.strip()) >> apply(X.split('\n')) >> apply(X[:-1]) >> join('\n') | |
fwrite(f'out/{out_name}_FALLBACK.md', h) | |
sh(f'rm tmp/{out_name}.html tmp/{out_name}.xml') | |
except ExitCodeError: | |
import traceback | |
traceback.print_exc() | |
def preextract_tar(dump): | |
dump_name = dump.split('/')[-1][:-4] | |
sh(f"(mkdir -p tmp2/{dump_name}; tar xf {dump} -C tmp2/{dump_name} && touch tmp2/done_{dump_name}; echo finished preload of {dump_name}) &") | |
def copy_tar(dump): | |
dump_name = dump.split('/')[-1][:-4] | |
for i in range(120): | |
if os.path.exists(f'tmp2/done_{dump_name}'): | |
sh(f'mv tmp2/{dump_name}/* tmp') | |
return True | |
print('waiting for tar...') | |
time.sleep(1) | |
return False | |
pool = mp.Pool(12) | |
files = ls('files') | |
sh("rm -rf tmp/* tmp2/*") | |
preextract_tar(files[0]) | |
for i, dump in enumerate(tqdm(files)): | |
if i + 1 < len(files): preextract_tar(files[i + 1]) | |
try: | |
sh("rm -rf tmp/*") | |
if not copy_tar(dump): continue | |
# extract | |
print(dump) | |
sh(f"tar xf {dump} -C tmp") | |
for doc in lsr('tmp'): | |
if doc.endswith('.gz'): | |
sh(f"gunzip {doc}") | |
type = mime.from_file(doc[:-3]) | |
# print(type) | |
if type == 'application/x-tar': | |
sh(f"mkdir -p {doc[:-3]}_extract && tar xf {doc[:-3]} -C {doc[:-3]}_extract") | |
sh(f"rm {doc[:-3]}") | |
elif type == 'text/x-tex': | |
sh(f"mv {doc[:-3]} {doc[:-3]}.tex") | |
else: | |
sh(f"rm {doc[:-3]}") | |
elif doc.endswith('.pdf'): | |
sh(f"rm {doc}") | |
# process | |
def tex_files(): | |
for doc in ls(ls('tmp')[0]): | |
if os.path.isdir(doc): | |
for name in ['main', 'Main', 'MAIN', 'paper', 'Paper']: # common main file names | |
if os.path.exists(doc + '/' + name + '.tex'): | |
yield doc + '/' + name + '.tex' | |
break | |
else: | |
if ls(doc) >> filt(X.endswith('.tex')) >> apply(len) == 1: | |
yield ls(doc) >> filt(X.endswith('.tex')) >> one() | |
continue | |
# more than one top-level tex file, keep anything with \title | |
for titledoc in ls(doc) >> filt(X.endswith('.tex')): | |
try: | |
if r'\title' in fread(titledoc): | |
yield titledoc | |
except: | |
pass | |
elif doc.endswith('.tex'): | |
yield doc | |
texfiles = list(tex_files()) | |
pool.map(convert, texfiles) | |
sh(f'mv {dump} done') | |
print(f'marking {dump} as done') | |
except: | |
sh(f'mv {dump} errored') | |
pool.close() | |
pool.join() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from functools import reduce | |
import operator | |
import mailparser | |
import lm_dataformat as lmd | |
from tqdm import tqdm | |
import json | |
class ExitCodeError(Exception): pass | |
def sh(x): | |
if os.system(x): raise ExitCodeError() | |
def ls(x): | |
return [x + '/' + fn for fn in os.listdir(x)] | |
def lsr(x): | |
if os.path.isdir(x): | |
return reduce(operator.add, map(lsr, ls(x)), []) | |
else: | |
return [x] | |
def fwrite(fname, content): | |
with open(fname, 'w') as fh: | |
fh.write(content) | |
def fread(fname): | |
with open(fname) as fh: | |
return fh.read() | |
class each: | |
def __init__(self, f): | |
self.f = f | |
def __rrshift__(self, other): | |
return list(map(self.f, other)) | |
class filt: | |
def __init__(self, f): | |
self.f = f | |
def __rrshift__(self, other): | |
return list(filter(self.f, other)) | |
class apply: | |
def __init__(self, f): | |
self.f = f | |
def __rrshift__(self, other): | |
return self.f(other) | |
class one: | |
def __rrshift__(self, other): | |
try: | |
if isinstance(other, list): | |
assert len(other) == 1 | |
return other[0] | |
return next(other) | |
except: | |
return None | |
class join: | |
def __init__(self, sep): | |
self.sep = sep | |
def __rrshift__(self, other): | |
if other is None: return | |
try: | |
return self.sep.join(other) | |
except: | |
return None | |
Y = object() | |
def id(x): | |
return x | |
class Reflective: | |
def __getattribute__(self, f): | |
def _fn(*args, **kwargs): | |
return lambda x: x.__getattribute__(f)(*args, **kwargs) | |
return _fn | |
def __getitem__(self, a): | |
return lambda x: x[a] | |
def __mul__(self, other): | |
if other == Y: | |
def _f(x, y=None): | |
if y == None: | |
x, y = x | |
return x * y | |
return _f | |
return lambda x: x * other | |
def __rmul__(self, other): | |
if other == Y: | |
def _f(x, y=None): | |
if y == None: | |
x, y = x | |
return y * x | |
return _f | |
return lambda x: other * x | |
def __add__(self, other): | |
if other == Y: | |
def _f(x, y=None): | |
if y == None: | |
x, y = x | |
return x + y | |
return _f | |
return lambda x: x + other | |
def __radd__(self, other): | |
if other == Y: | |
def _f(x, y=None): | |
if y == None: | |
x, y = x | |
return y + x | |
return _f | |
return lambda x: other + x | |
# (b -> a -> b) -> b -> [a] -> b | |
def foldl(f, init, arr): | |
curr = init | |
for elem in arr: | |
curr = f(curr, elem) | |
return curr | |
# (a -> b -> b) -> b -> [a] -> b | |
def foldr(f, init, arr): | |
curr = init | |
for elem in arr[::-1]: | |
curr = f(elem, curr) | |
return curr | |
def comp(*fs): | |
if len(fs) == 1: | |
return fs[0] | |
def _f(x): | |
for f in fs[::-1]: | |
x = f(x) | |
return x | |
return _f | |
X = Reflective() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment