Skip to content

Instantly share code, notes, and snippets.

@napsternxg
Created February 3, 2023 04:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save napsternxg/f420b18cf40106d7fbe81174b2fd2b3a to your computer and use it in GitHub Desktop.
Save napsternxg/f420b18cf40106d7fbe81174b2fd2b3a to your computer and use it in GitHub Desktop.
Parse TweetNERD files
from zipfile import ZipFile
from pathlib import Path
import pandas as pd
import numpy as np
import json
JOB_FILES = list(Path(".").glob("**/job_*.json.zip"))
JOB_ID_TO_OUTPUT_PART = {
1873084: 12,
1876298: 13,
1877851: 14,
1882110: 15,
1887777: 16,
}
def read_data(zip_file):
with ZipFile(zip_file) as myzip:
files = myzip.namelist()
print(files)
with myzip.open(files[0]) as fp:
df = pd.read_json(fp, orient="records", lines=True, dtype={"id": np.uint64})
return df
def parse_annotation(annotation):
# print(annotation)
span = [
dict(
text=token["text"],
start_index=int(token["start_index"]),
end_index=int(token["end_index"]),
word_index=int(token["word_index"]),
)
for token in annotation["span"]
]
span = list(sorted(span, key=lambda x: int(x["start_index"])))
start = span[0]["start_index"]
end = span[-1]["end_index"]
phrase = [span[0]["text"]]
for i in range(1, len(span)):
diff = span[i - 1]["start_index"] - span[i - 1]["end_index"]
phrase.append(" " * diff)
phrase.append(span[i]["text"])
phrase = "".join(phrase)
wikidata_id = annotation["wikidata_id"]
return dict(phrase=phrase, start=start, end=end, entityId=wikidata_id)
def parse_annotations(annotations):
annotations = json.loads(annotations)
out = [parse_annotation(annotation) for annotation in annotations]
return out
def parse_judgement(judgement):
jid = judgement["id"]
job_id = judgement["job_id"]
unit_id = judgement["unit_id"]
unit_data = judgement["unit_data"]
tweet_id = unit_data["tweet_id"]
text = unit_data["text"]
annotations = judgement["data"].get(
"annotation_output_this_is_automatically_generated_do_not_edit", "[]"
)
annotations = parse_annotations(annotations)
out = dict(
jid=jid,
job_id=job_id,
unit_id=unit_id,
tweet_id=tweet_id,
text=text,
annotations=annotations,
)
return out
def parse_result(result):
judgements = result["judgments"]
out = [parse_judgement(judgement) for judgement in judgements]
return out
MISSING_VALS = {"phrase": "NO_PHRASE", "entityId": "NO_ENTITY", "start": -1, "end": -1}
def get_parsed_results(df):
parsed_results = (
df[["id", "job_id", "parsed_results"]]
.explode("parsed_results")
.reset_index(drop=True)
)
parsed_results = pd.concat(
[parsed_results[["id"]], pd.json_normalize(parsed_results["parsed_results"])],
axis=1,
)
parsed_results = parsed_results.explode("annotations").reset_index(drop=True)
parsed_results = pd.concat(
[
parsed_results.drop("annotations", axis=1),
pd.json_normalize(parsed_results["annotations"]),
],
axis=1,
)
parsed_results = parsed_results.fillna(MISSING_VALS).astype(
{"start": int, "end": int}
)
return parsed_results
def get_grouped_results(parsed_results):
grouped_results = (
parsed_results.groupby(["unit_id", "tweet_id", "start", "end", "entityId"])
.agg(
{
"jid": list,
"job_id": "first",
"text": "first",
"phrase": "first",
"id": "count",
}
)
.rename(columns={"id": "score"})
)
return grouped_results
OUTPUT_COLS = ["tweet_id", "phrase", "start", "end", "entityId", "score"]
def get_df_public(grouped_results):
job_id = grouped_results["job_id"].iloc[0]
output_file_name = f"part_{JOB_ID_TO_OUTPUT_PART[job_id]}"
df_public = grouped_results.reset_index()[OUTPUT_COLS]
return df_public, output_file_name
def convert_job_file_to_public_file(zip_file):
df = read_data(zip_file)
df["parsed_results"] = df["results"].apply(parse_result)
parsed_results = get_parsed_results(df)
grouped_results = get_grouped_results(parsed_results)
df_public, output_file_name = get_df_public(grouped_results)
unique_tweets = df_public.tweet_id.unique().shape[0]
num_rows = df_public.shape[0]
print(f"{output_file_name} | {num_rows} | {unique_tweets}")
df_public.to_csv(f"{output_file_name}.public.tsv", sep="\t", index=False)
return num_rows, unique_tweets
def main():
total_rows, total_tweets = 0, 0
for zip_file in JOB_FILES:
print(f"Processing zip_file={zip_file}")
num_rows, unique_tweets = convert_job_file_to_public_file(zip_file)
total_rows += num_rows
total_tweets += unique_tweets
print(f"total_rows={total_rows:,}, total_tweets={total_tweets:,}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment