Skip to content

Instantly share code, notes, and snippets.

Last active November 14, 2023 06:30
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save leogao2/e09b64eae3b987925ccf3b86401624c6 to your computer and use it in GitHub Desktop.
from utils import *
import magic
mime = magic.Magic(mime=True)
import re
import multiprocessing as mp
import chardet
import bs4
import time
sh("mkdir -p tmp tmp2 out done fallback_needed errored")
def any_to_utf8(b):
return b.decode('utf-8')
except UnicodeDecodeError:
# try to figure out encoding if not urf-8
guess = chardet.detect(b)['encoding']
if not guess or guess == 'UTF-8': return
return b.decode(guess)
except (UnicodeDecodeError, LookupError):
# still cant figure out encoding, give up
def convert(tex):
out_name = tex.split('/')[2:] >> join('_')
with open(tex, 'rb') as fh:
b =
cont = any_to_utf8(b)
if cont is None: return
fwrite(tex, cont)
except FileNotFoundError:
# ???
sh(f'timeout 10s pandoc -s {tex} -o out/{out_name}.md --wrap=none')
except ExitCodeError:
import traceback
# fallback:
# move to fallback pile so we can handle it later
if '_extract' in tex.split('/')[:-1] >> join('/'):
loc = tex.split('/')[:-1] >> join('/')
loc = tex
sh(f'mv {loc} fallback_needed/')
# TODO: extract into script
sh(f'timeout 30s latexml {tex} --dest=tmp/{out_name}.xml --quiet --quiet 2>/dev/null && timeout 10s latexmlpost tmp/{out_name}.xml --mathtex --dest=tmp/{out_name}.html --nographicimages --nopictureimages')
soup = fread(f'tmp/{out_name}.html') >> apply(bs4.BeautifulSoup)
for x in soup.findAll("table", {"class": "ltx_eqn_table"}): = 'div'
fwrite(f'tmp/{out_name}.html', str(soup))
sh(f'timeout 10s pandoc -s tmp/{out_name}.html -t markdown_github-raw_html -o out/{out_name}')
h = fread(f'out/{out_name}') >> apply(X.strip()) >> apply(X.split('\n')) >> apply(X[:-1]) >> join('\n')
fwrite(f'out/{out_name}', h)
sh(f'rm tmp/{out_name}.html tmp/{out_name}.xml')
except ExitCodeError:
import traceback
def preextract_tar(dump):
dump_name = dump.split('/')[-1][:-4]
sh(f"(mkdir -p tmp2/{dump_name}; tar xf {dump} -C tmp2/{dump_name} && touch tmp2/done_{dump_name}; echo finished preload of {dump_name}) &")
def copy_tar(dump):
dump_name = dump.split('/')[-1][:-4]
for i in range(120):
if os.path.exists(f'tmp2/done_{dump_name}'):
sh(f'mv tmp2/{dump_name}/* tmp')
return True
print('waiting for tar...')
return False
pool = mp.Pool(12)
files = ls('files')
sh("rm -rf tmp/* tmp2/*")
for i, dump in enumerate(tqdm(files)):
if i + 1 < len(files): preextract_tar(files[i + 1])
sh("rm -rf tmp/*")
if not copy_tar(dump): continue
# extract
sh(f"tar xf {dump} -C tmp")
for doc in lsr('tmp'):
if doc.endswith('.gz'):
sh(f"gunzip {doc}")
type = mime.from_file(doc[:-3])
# print(type)
if type == 'application/x-tar':
sh(f"mkdir -p {doc[:-3]}_extract && tar xf {doc[:-3]} -C {doc[:-3]}_extract")
sh(f"rm {doc[:-3]}")
elif type == 'text/x-tex':
sh(f"mv {doc[:-3]} {doc[:-3]}.tex")
sh(f"rm {doc[:-3]}")
elif doc.endswith('.pdf'):
sh(f"rm {doc}")
# process
def tex_files():
for doc in ls(ls('tmp')[0]):
if os.path.isdir(doc):
for name in ['main', 'Main', 'MAIN', 'paper', 'Paper']: # common main file names
if os.path.exists(doc + '/' + name + '.tex'):
yield doc + '/' + name + '.tex'
if ls(doc) >> filt(X.endswith('.tex')) >> apply(len) == 1:
yield ls(doc) >> filt(X.endswith('.tex')) >> one()
# more than one top-level tex file, keep anything with \title
for titledoc in ls(doc) >> filt(X.endswith('.tex')):
if r'\title' in fread(titledoc):
yield titledoc
elif doc.endswith('.tex'):
yield doc
texfiles = list(tex_files()), texfiles)
sh(f'mv {dump} done')
print(f'marking {dump} as done')
sh(f'mv {dump} errored')
import os
from functools import reduce
import operator
import mailparser
import lm_dataformat as lmd
from tqdm import tqdm
import json
class ExitCodeError(Exception): pass
def sh(x):
if os.system(x): raise ExitCodeError()
def ls(x):
return [x + '/' + fn for fn in os.listdir(x)]
def lsr(x):
if os.path.isdir(x):
return reduce(operator.add, map(lsr, ls(x)), [])
return [x]
def fwrite(fname, content):
with open(fname, 'w') as fh:
def fread(fname):
with open(fname) as fh:
class each:
def __init__(self, f):
self.f = f
def __rrshift__(self, other):
return list(map(self.f, other))
class filt:
def __init__(self, f):
self.f = f
def __rrshift__(self, other):
return list(filter(self.f, other))
class apply:
def __init__(self, f):
self.f = f
def __rrshift__(self, other):
return self.f(other)
class one:
def __rrshift__(self, other):
if isinstance(other, list):
assert len(other) == 1
return other[0]
return next(other)
return None
class join:
def __init__(self, sep):
self.sep = sep
def __rrshift__(self, other):
if other is None: return
return self.sep.join(other)
return None
Y = object()
def id(x):
return x
class Reflective:
def __getattribute__(self, f):
def _fn(*args, **kwargs):
return lambda x: x.__getattribute__(f)(*args, **kwargs)
return _fn
def __getitem__(self, a):
return lambda x: x[a]
def __mul__(self, other):
if other == Y:
def _f(x, y=None):
if y == None:
x, y = x
return x * y
return _f
return lambda x: x * other
def __rmul__(self, other):
if other == Y:
def _f(x, y=None):
if y == None:
x, y = x
return y * x
return _f
return lambda x: other * x
def __add__(self, other):
if other == Y:
def _f(x, y=None):
if y == None:
x, y = x
return x + y
return _f
return lambda x: x + other
def __radd__(self, other):
if other == Y:
def _f(x, y=None):
if y == None:
x, y = x
return y + x
return _f
return lambda x: other + x
# (b -> a -> b) -> b -> [a] -> b
def foldl(f, init, arr):
curr = init
for elem in arr:
curr = f(curr, elem)
return curr
# (a -> b -> b) -> b -> [a] -> b
def foldr(f, init, arr):
curr = init
for elem in arr[::-1]:
curr = f(elem, curr)
return curr
def comp(*fs):
if len(fs) == 1:
return fs[0]
def _f(x):
for f in fs[::-1]:
x = f(x)
return x
return _f
X = Reflective()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment