-
-
Save daphee/3544a71719ca390ec1256dabf87ec0e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
""" | |
Put transcripts to be processed into the "transcripts" subfolder | |
Split transcripts will be output into the "out" subfolder | |
""" | |
FILE_ENCODING = "latin-1" | |
#FILE_ENCODING = "utf-8" | |
INCLUDE_EMPTY_LINES = True | |
# Lines starting with those words don't require a (PARTY) before the colon: | |
SPECIAL_WORDS = ["Vizepräsident", "Präsident"] | |
# Those words will be filtered from the words before the colon | |
SPECIAL_NAME_WORDS = ["Dr.", "Bundespräsident"] + SPECIAL_WORDS | |
# Max 4 + 1 (party/special) words before colon | |
# Choosen pretty randomly, may need tweaking | |
# without it there are a lots of false positivies as the criterium "parentheses + colon" isn't very strong | |
NAME_MAX_WORDS = 4 | |
NAME_MAX_LENGTH = 100 | |
# party should also contain no spaces | |
PATTERN_PARTY = "([^\(]+)\(([^)]+)\)" | |
MONTHS = "Januar|Februar|März|April|Mai|Juni|Juli|August|September|Oktober|November|Dezember" | |
PATTERN_FILE_META = "\n.*Deutscher.*Bundestag.*?([0-9]+)\..*Wahlperiode.*?([0-9]+)\..*Sitzung.*?([0-9]{1,2})\..*(MONTHS).*([0-9]{4}).*?\n".replace("MONTHS", MONTHS) | |
PATTERN_PAGE_NUM = "\n\n([0-9])+\n\n" | |
def matchLine(line, currentPage): | |
# REQUIREMENT doesn't begin with a parenthesis (comment form other politician) and contains colon | |
isMatch = line and line[0] != "(" and ":" in line | |
# split the checks into seperate ifs to try to always do the minimum work to weed out non-matching lines | |
# if all the requirements would be in a single if like REQUIREMENT1 and REQUIREMENT2 and ... | |
# python would do this automatically as it is lazy evaluating big expressions like that | |
# if REQUIREMENT2 would be false all additional REQUIREMENTS after 2 wouldn't even be evaluated | |
if isMatch: | |
split = line.split(":") | |
beforeColon, rest = split[0], ":".join(split[1:]) | |
# REQUIREMENT length less or equal than NAME_MAX_LENGTH, less or equal than NAME_MAX_WORDS words | |
isMatch = len(beforeColon) <= NAME_MAX_LENGTH and len(beforeColon.split()) <= NAME_MAX_WORDS | |
if isMatch: | |
# in the case there are more colons just take the first part and put the rest back together | |
beforeColon, rest = split[0], ":".join(split[1:]) | |
# REQUIREMENT either starts with special word | |
beginsWithSpecial = any([beforeColon.startswith(word) for word in SPECIAL_WORDS]) | |
# OR REQUIREMENT part before colon contains opening and closing parentheses (party) | |
partyMatch = re.search(PATTERN_PARTY, beforeColon) | |
if beginsWithSpecial: | |
split = beforeColon.split() | |
sectionName, sectionParty = " ".join(split[1:]), split[0] | |
# special words are included in empty lines containing only the word | |
fullName = " ".join(split) | |
elif partyMatch: | |
sectionName, sectionParty = partyMatch.groups() | |
fullName = sectionName | |
isMatch = beginsWithSpecial or partyMatch | |
# remove special_name_words | |
if isMatch: | |
sectionPage = currentPage | |
sectionContent = rest + "\n" | |
sectionName = " ".join([w for w in sectionName.split() if w not in SPECIAL_NAME_WORDS]) | |
return (sectionName, sectionParty, sectionPage, sectionContent, fullName) | |
return None | |
def main(): | |
# Collect input files (only .txt files) | |
inputFiles = [f for f in os.listdir("transcripts") if os.path.isfile(os.path.join("transcripts", f)) and f.endswith("txt")] | |
for iFile in inputFiles: | |
print("Parsing", iFile) | |
# read fileline | |
with open(os.path.join("transcripts", iFile), "rb") as f: | |
text = f.read().decode(FILE_ENCODING) | |
# parse metadata, once per file | |
metaMatch = re.search(PATTERN_FILE_META, text) | |
if not metaMatch: | |
print("Error parsing metadata. Skipping this file") | |
continue | |
# extract capturing groups for meta | |
legPeriod, session, day, monthStr, year = metaMatch.groups() | |
# convert month to int by getting the index in MONTHS | |
# this is generally a bad approach but in this we only deal with German dates and in a very specific format so it should get the job done | |
month = str(MONTHS.split("|").index(monthStr) + 1) | |
# Remove all Wahlperiode/session lines | |
text = re.sub(PATTERN_FILE_META, "", text) | |
lines = text.split("\n") | |
# find first section and first page | |
# remember the line index of the first section as that's the index which the next loop will start working from | |
firstSectionIndex, currentPage = None, None | |
i = 0 | |
while i < len(lines): | |
if matchLine(lines[i], currentPage=None): | |
foundFirstSection = True | |
firstSectionIndex = i | |
# digits preceeded and succeed by an empty line | |
if lines[i].strip().isnumeric() and i >= 1 and i < len(lines)-1 and lines[i-1].strip() == "" and lines[i+1].strip() == "": | |
# found a section before a page num was found => currentPage (current section page) is page-1 | |
currentPage = int(lines[i].strip()) | |
if firstSectionIndex: | |
currentPage -= 1 | |
currentPage = str(currentPage) | |
if firstSectionIndex and currentPage: | |
break | |
i += 1 | |
if not firstSectionIndex: | |
print("No section found. Skipping this file") | |
continue | |
if not currentPage: | |
print("No page number found. Skipping this file") | |
continue | |
i = firstSectionIndex | |
print("Putting the first {} lines into '{}'".format(i, "beginning_" + iFile)) | |
with open(os.path.join("out", "beginning_" + iFile), "w") as f: | |
f.write("\n".join(lines[:i])) | |
sectionName, sectionParty, sectionPage, sectionContent, fullName = None, None, None, None, None | |
skipped = 1 | |
subpageNum = 1 | |
while i < len(lines): | |
match = matchLine(lines[i], currentPage) | |
# if new section found or last line in input write current data to file (and not first section) | |
if (match or i == len(lines) - 1) and sectionName: | |
# add last line | |
if i == len(lines) - 1: | |
sectionContent += lines[i] | |
# if you don't like this behaviour you can just change this line back to | |
# fileName = sectionPage + "_" | |
fileName = sectionPage + "." + str(subpageNum) + "_" | |
fileName += sectionName + "_" | |
fileName += sectionParty+ "_" | |
fileName += legPeriod + "_" | |
fileName += session + "_" | |
fileName += day + month + year + ".txt" | |
fileName = fileName.lower().replace("/", "_").replace(" ", "_") | |
print("Writing to '{}'".format(fileName)) | |
with open(os.path.join("out", fileName), "w") as f: | |
f.write(sectionContent) | |
# not first match. previous match written to file. now extract current match | |
if match: | |
if match[2] == sectionPage: | |
subpageNum += 1 | |
else: | |
subpageNum = 1 | |
sectionName, sectionParty, sectionPage, sectionContent, fullName = match | |
# first match | |
elif match: | |
sectionName, sectionParty, sectionPage, sectionContent, fullName = match | |
# page number found | |
elif not match and lines[i].strip().isnumeric() and i >= 1 and i < len(lines)-1 and lines[i-1].strip() == "" and lines[i+1].strip() == "": | |
currentPage = lines[i].strip() | |
i += 1 | |
# exclude lines containing speaker name (doesn't check that this line is preceded by a "Deutscher Bundestag..."-line) | |
elif not match and lines[i].strip() == fullName: | |
pass | |
# add content lines to current section (skip empty lines) | |
elif not match and (INCLUDE_EMPTY_LINES or lines[i].strip()): | |
sectionContent += lines[i] + "\n" | |
i += 1 | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment