Created
February 3, 2023 04:16
-
-
Save napsternxg/f420b18cf40106d7fbe81174b2fd2b3a to your computer and use it in GitHub Desktop.
Parse TweetNERD files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from zipfile import ZipFile | |
from pathlib import Path | |
import pandas as pd | |
import numpy as np | |
import json | |
JOB_FILES = list(Path(".").glob("**/job_*.json.zip")) | |
JOB_ID_TO_OUTPUT_PART = { | |
1873084: 12, | |
1876298: 13, | |
1877851: 14, | |
1882110: 15, | |
1887777: 16, | |
} | |
def read_data(zip_file): | |
with ZipFile(zip_file) as myzip: | |
files = myzip.namelist() | |
print(files) | |
with myzip.open(files[0]) as fp: | |
df = pd.read_json(fp, orient="records", lines=True, dtype={"id": np.uint64}) | |
return df | |
def parse_annotation(annotation): | |
# print(annotation) | |
span = [ | |
dict( | |
text=token["text"], | |
start_index=int(token["start_index"]), | |
end_index=int(token["end_index"]), | |
word_index=int(token["word_index"]), | |
) | |
for token in annotation["span"] | |
] | |
span = list(sorted(span, key=lambda x: int(x["start_index"]))) | |
start = span[0]["start_index"] | |
end = span[-1]["end_index"] | |
phrase = [span[0]["text"]] | |
for i in range(1, len(span)): | |
diff = span[i - 1]["start_index"] - span[i - 1]["end_index"] | |
phrase.append(" " * diff) | |
phrase.append(span[i]["text"]) | |
phrase = "".join(phrase) | |
wikidata_id = annotation["wikidata_id"] | |
return dict(phrase=phrase, start=start, end=end, entityId=wikidata_id) | |
def parse_annotations(annotations): | |
annotations = json.loads(annotations) | |
out = [parse_annotation(annotation) for annotation in annotations] | |
return out | |
def parse_judgement(judgement): | |
jid = judgement["id"] | |
job_id = judgement["job_id"] | |
unit_id = judgement["unit_id"] | |
unit_data = judgement["unit_data"] | |
tweet_id = unit_data["tweet_id"] | |
text = unit_data["text"] | |
annotations = judgement["data"].get( | |
"annotation_output_this_is_automatically_generated_do_not_edit", "[]" | |
) | |
annotations = parse_annotations(annotations) | |
out = dict( | |
jid=jid, | |
job_id=job_id, | |
unit_id=unit_id, | |
tweet_id=tweet_id, | |
text=text, | |
annotations=annotations, | |
) | |
return out | |
def parse_result(result): | |
judgements = result["judgments"] | |
out = [parse_judgement(judgement) for judgement in judgements] | |
return out | |
MISSING_VALS = {"phrase": "NO_PHRASE", "entityId": "NO_ENTITY", "start": -1, "end": -1} | |
def get_parsed_results(df): | |
parsed_results = ( | |
df[["id", "job_id", "parsed_results"]] | |
.explode("parsed_results") | |
.reset_index(drop=True) | |
) | |
parsed_results = pd.concat( | |
[parsed_results[["id"]], pd.json_normalize(parsed_results["parsed_results"])], | |
axis=1, | |
) | |
parsed_results = parsed_results.explode("annotations").reset_index(drop=True) | |
parsed_results = pd.concat( | |
[ | |
parsed_results.drop("annotations", axis=1), | |
pd.json_normalize(parsed_results["annotations"]), | |
], | |
axis=1, | |
) | |
parsed_results = parsed_results.fillna(MISSING_VALS).astype( | |
{"start": int, "end": int} | |
) | |
return parsed_results | |
def get_grouped_results(parsed_results): | |
grouped_results = ( | |
parsed_results.groupby(["unit_id", "tweet_id", "start", "end", "entityId"]) | |
.agg( | |
{ | |
"jid": list, | |
"job_id": "first", | |
"text": "first", | |
"phrase": "first", | |
"id": "count", | |
} | |
) | |
.rename(columns={"id": "score"}) | |
) | |
return grouped_results | |
OUTPUT_COLS = ["tweet_id", "phrase", "start", "end", "entityId", "score"] | |
def get_df_public(grouped_results): | |
job_id = grouped_results["job_id"].iloc[0] | |
output_file_name = f"part_{JOB_ID_TO_OUTPUT_PART[job_id]}" | |
df_public = grouped_results.reset_index()[OUTPUT_COLS] | |
return df_public, output_file_name | |
def convert_job_file_to_public_file(zip_file): | |
df = read_data(zip_file) | |
df["parsed_results"] = df["results"].apply(parse_result) | |
parsed_results = get_parsed_results(df) | |
grouped_results = get_grouped_results(parsed_results) | |
df_public, output_file_name = get_df_public(grouped_results) | |
unique_tweets = df_public.tweet_id.unique().shape[0] | |
num_rows = df_public.shape[0] | |
print(f"{output_file_name} | {num_rows} | {unique_tweets}") | |
df_public.to_csv(f"{output_file_name}.public.tsv", sep="\t", index=False) | |
return num_rows, unique_tweets | |
def main(): | |
total_rows, total_tweets = 0, 0 | |
for zip_file in JOB_FILES: | |
print(f"Processing zip_file={zip_file}") | |
num_rows, unique_tweets = convert_job_file_to_public_file(zip_file) | |
total_rows += num_rows | |
total_tweets += unique_tweets | |
print(f"total_rows={total_rows:,}, total_tweets={total_tweets:,}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment