-
-
Save kam193/c48d3118779f70259dea62cd2e399758 to your computer and use it in GitHub Desktop.
Script used for automatically extract sentences from europarl dataset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import subprocess | |
import tempfile | |
from datetime import datetime | |
from functools import wraps | |
from typing import Callable, List | |
from langdetect import detect, lang_detect_exception | |
class SentenceExtractor: | |
def __init__(self, steps: List[Callable], tmp_dir="tmp") -> None: | |
self._steps = steps | |
self.tmp_dir = tmp_dir | |
def _prepare(self, prefix=None): | |
if not prefix: | |
prefix = datetime.now() | |
self._prefix = f"{self.tmp_dir}/{prefix}" | |
self._step_counter = 0 | |
if not os.path.exists(self._prefix): | |
os.mkdir(self._prefix) | |
def _get_step_filename(self, func: Callable): | |
self._step_counter += 1 | |
return f"{self._prefix}/{self._step_counter}_{func.__name__}" | |
def run(self, filename: str, prefix: str = None): | |
self._prepare(prefix) | |
current_file = filename | |
for step in self._steps: | |
next_file = self._get_step_filename(step) | |
if not os.path.isfile(next_file): | |
print(f"Executing step {step.__name__}") | |
step(current_file, next_file) | |
current_file = next_file | |
print(f"Finished with file {current_file}") | |
return current_file | |
def per_line(func): | |
@wraps(func) | |
@open_files | |
def _wrapper(input, output): | |
for line in input: | |
result = func(line.strip()) | |
if result: | |
if isinstance(result, str): | |
output.writelines([result, "\n"]) | |
else: | |
output.writelines([line.strip() + "\n" for line in result if line]) | |
return _wrapper | |
def open_files(func): | |
@wraps(func) | |
def _wrapper(input_filename: str, output_filename: str): | |
with open(input_filename, "r") as input, open(output_filename, "w+") as output: | |
return func(input, output) | |
return _wrapper | |
def without_numbers(input_filename, output_filename): | |
with open(output_filename, "w+") as output: | |
subprocess.run(["grep", "-v", "-P", "[0-9]", input_filename], stdout=output) | |
@per_line | |
def only_polish(line: str): | |
try: | |
return line if detect(line) == "pl" else None | |
except lang_detect_exception.LangDetectException: | |
return None | |
brackets_content_re = re.compile(r"\((.*?)\)") | |
brackets_re = re.compile(r"\(|\)") | |
@per_line | |
def remove_brackets_contains(line): | |
line = brackets_content_re.sub("", line) | |
return brackets_re.sub("", line) | |
@open_files | |
def fix_double_space(input, output): | |
subprocess.run(["awk", '{gsub(/ /," ");}1'], stdin=input, stdout=output) | |
@open_files | |
def without_name_abrevations(input, output): | |
subprocess.run( | |
["grep", "-v", "[A-ZĄĘŚĆŻŹŃŁÓ][A-ZĄĘŚĆŻŹŃŁÓ]"], stdin=input, stdout=output | |
) | |
sentence_separator_re = re.compile(r"(.*?)[\.\!\?] *([A-Z\-].*)") | |
sub_sentence_separator_re = re.compile(r"(.*): *(.*)") | |
@per_line | |
def split_sentences(line): | |
result = [] | |
for sentence in sentence_separator_re.split(line): | |
result += [*sub_sentence_separator_re.split(sentence)] | |
return result | |
@open_files | |
def remove_end_sentence_dot(input, output): | |
subprocess.run(["awk", '{gsub(/[.]$/, "");}1'], stdin=input, stdout=output) | |
@open_files | |
def remove_begin_sentence_dash(input, output): | |
subprocess.run(["awk", '{sub("^([ ?]*[-] *)*", "");}1'], stdin=input, stdout=output) | |
@open_files | |
def remove_begin_sentence_space(input, output): | |
subprocess.run(["awk", '{sub("^ *", "");}1'], stdin=input, stdout=output) | |
@open_files | |
def without_abreviations_and_slash(input, output): | |
subprocess.run(["grep", "-v", "[.-/\]"], stdin=input, stdout=output) | |
@open_files | |
def replace_unicode_chars_to_basic(input, output): | |
subprocess.run(["sed", 's/”/"/g; s/•/-/g; s/·/-/g;'], stdin=input, stdout=output) | |
@open_files | |
def without_names(input, output): | |
# this filter out the equivalent for Mr/Mrs + position + name | |
# and any Name1-Name2 form | |
subprocess.run( | |
[ | |
"grep", | |
"-v", | |
"-E", | |
"([pP]an([iau]|ie|ią|owie)? (wice)?((poseł|pośle|posła|posłowi|komisarz(u)?|premier|premier(ze|owi)|przewodniczący|minister|ministrze) )?|((wice)?(poseł|pośle|posła|posłowi|komisarz|posłan(ka|ki|ce|kom)|przewodniczący) )|-)([A-Z]|de|van|von)", | |
], | |
stdin=input, | |
stdout=output, | |
) | |
@open_files | |
def without_common_abreviations(input, output): | |
subprocess.run( | |
["grep", "-v", "-E", "(np|art|tzn|itd|itp|tzw|ust|ds)\."], | |
stdin=input, | |
stdout=output, | |
) | |
@open_files | |
def only_allowed_chars(input, output): | |
subprocess.run( | |
["grep", "-x", '[A-ZabcdefghijklmnoprstuwyząęśćżźńłóĄĘŚĆŻŹŃŁÓ ",?!-]\+'], | |
stdin=input, | |
stdout=output, | |
) | |
@open_files | |
def length_limit(input, output): | |
subprocess.run(["awk", "NF<=14"], stdin=input, stdout=output) | |
@open_files | |
def unique(input, output): | |
with tempfile.TemporaryFile() as tmp: | |
subprocess.run(["sort"], stdin=input, stdout=tmp) | |
tmp.seek(0) | |
subprocess.run(["uniq"], stdin=tmp, stdout=output) | |
if __name__ == "__main__": | |
extractor = SentenceExtractor( | |
[ | |
without_numbers, | |
only_polish, | |
remove_brackets_contains, | |
fix_double_space, | |
split_sentences, | |
split_sentences, # intentionally twice | |
without_names, | |
without_name_abrevations, | |
without_common_abreviations, | |
replace_unicode_chars_to_basic, | |
remove_end_sentence_dot, | |
fix_double_space, | |
remove_begin_sentence_space, | |
remove_begin_sentence_dash, | |
only_allowed_chars, | |
length_limit, | |
unique, | |
] | |
) | |
extractor.run("original/europarl-v7.pl-en.pl", prefix="test") | |
# extractor.run("original/test.txt") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment