import csv | |
import glob | |
import gzip | |
import json | |
import os | |
SNAPSHOT_DIR = 'openalex-snapshot' | |
CSV_DIR = 'csv-files' | |
FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0')) | |
csv_files = { | |
'institutions': { | |
'institutions': { | |
'name': os.path.join(CSV_DIR, 'institutions.csv.gz'), | |
'columns': [ | |
'id', 'ror', 'display_name', 'country_code', 'type', 'homepage_url', 'image_url', 'image_thumbnail_url', | |
'display_name_acroynyms', 'display_name_alternatives', 'works_count', 'cited_by_count', 'works_api_url', | |
'updated_date' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'institutions_ids.csv.gz'), | |
'columns': [ | |
'institution_id', 'openalex', 'ror', 'grid', 'wikipedia', 'wikidata', 'mag' | |
] | |
}, | |
'geo': { | |
'name': os.path.join(CSV_DIR, 'institutions_geo.csv.gz'), | |
'columns': [ | |
'institution_id', 'city', 'geonames_city_id', 'region', 'country_code', 'country', 'latitude', | |
'longitude' | |
] | |
}, | |
'associated_institutions': { | |
'name': os.path.join(CSV_DIR, 'institutions_associated_institutions.csv.gz'), | |
'columns': [ | |
'institution_id', 'associated_institution_id', 'relationship' | |
] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'institutions_counts_by_year.csv.gz'), | |
'columns': [ | |
'institution_id', 'year', 'works_count', 'cited_by_count' | |
] | |
} | |
}, | |
'authors': { | |
'authors': { | |
'name': os.path.join(CSV_DIR, 'authors.csv.gz'), | |
'columns': [ | |
'id', 'orcid', 'display_name', 'display_name_alternatives', 'works_count', 'cited_by_count', | |
'last_known_institution', 'works_api_url', 'updated_date' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'authors_ids.csv.gz'), | |
'columns': [ | |
'author_id', 'openalex', 'orcid', 'scopus', 'twitter', 'wikipedia', 'mag' | |
] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'authors_counts_by_year.csv.gz'), | |
'columns': [ | |
'author_id', 'year', 'works_count', 'cited_by_count' | |
] | |
} | |
}, | |
'concepts': { | |
'concepts': { | |
'name': os.path.join(CSV_DIR, 'concepts.csv.gz'), | |
'columns': [ | |
'id', 'wikidata', 'display_name', 'level', 'description', 'works_count', 'cited_by_count', 'image_url', | |
'image_thumbnail_url', 'works_api_url', 'updated_date' | |
] | |
}, | |
'ancestors': { | |
'name': os.path.join(CSV_DIR, 'concepts_ancestors.csv.gz'), | |
'columns': ['concept_id', 'ancestor_id'] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'concepts_counts_by_year.csv.gz'), | |
'columns': ['concept_id', 'year', 'works_count', 'cited_by_count'] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'concepts_ids.csv.gz'), | |
'columns': ['concept_id', 'openalex', 'wikidata', 'wikipedia', 'umls_aui', 'umls_cui', 'mag'] | |
}, | |
'related_concepts': { | |
'name': os.path.join(CSV_DIR, 'concepts_related_concepts.csv.gz'), | |
'columns': ['concept_id', 'related_concept_id', 'score'] | |
} | |
}, | |
'venues': { | |
'venues': { | |
'name': os.path.join(CSV_DIR, 'venues.csv.gz'), | |
'columns': [ | |
'id', 'issn_l', 'issn', 'display_name', 'publisher', 'works_count', 'cited_by_count', 'is_oa', | |
'is_in_doaj', 'homepage_url', 'works_api_url', 'updated_date' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'venues_ids.csv.gz'), | |
'columns': ['venue_id', 'openalex', 'issn_l', 'issn', 'mag'] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'venues_counts_by_year.csv.gz'), | |
'columns': ['venue_id', 'year', 'works_count', 'cited_by_count'] | |
}, | |
}, | |
'works': { | |
'works': { | |
'name': os.path.join(CSV_DIR, 'works.csv.gz'), | |
'columns': [ | |
'id', 'doi', 'title', 'display_name', 'publication_year', 'publication_date', 'type', 'cited_by_count', | |
'is_retracted', 'is_paratext', 'cited_by_api_url', 'abstract_inverted_index' | |
] | |
}, | |
'host_venues': { | |
'name': os.path.join(CSV_DIR, 'works_host_venues.csv.gz'), | |
'columns': [ | |
'work_id', 'venue_id', 'url', 'is_oa', 'version', 'license' | |
] | |
}, | |
'alternate_host_venues': { | |
'name': os.path.join(CSV_DIR, 'works_alternate_host_venues.csv.gz'), | |
'columns': [ | |
'work_id', 'venue_id', 'url', 'is_oa', 'version', 'license' | |
] | |
}, | |
'authorships': { | |
'name': os.path.join(CSV_DIR, 'works_authorships.csv.gz'), | |
'columns': [ | |
'work_id', 'author_position', 'author_id', 'institution_id', 'raw_affiliation_string' | |
] | |
}, | |
'biblio': { | |
'name': os.path.join(CSV_DIR, 'works_biblio.csv.gz'), | |
'columns': [ | |
'work_id', 'volume', 'issue', 'first_page', 'last_page' | |
] | |
}, | |
'concepts': { | |
'name': os.path.join(CSV_DIR, 'works_concepts.csv.gz'), | |
'columns': [ | |
'work_id', 'concept_id', 'score' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'works_ids.csv.gz'), | |
'columns': [ | |
'work_id', 'openalex', 'doi', 'mag', 'pmid', 'pmcid' | |
] | |
}, | |
'mesh': { | |
'name': os.path.join(CSV_DIR, 'works_mesh.csv.gz'), | |
'columns': [ | |
'work_id', 'descriptor_ui', 'descriptor_name', 'qualifier_ui', 'qualifier_name', 'is_major_topic' | |
] | |
}, | |
'open_access': { | |
'name': os.path.join(CSV_DIR, 'works_open_access.csv.gz'), | |
'columns': [ | |
'work_id', 'is_oa', 'oa_status', 'oa_url' | |
] | |
}, | |
'referenced_works': { | |
'name': os.path.join(CSV_DIR, 'works_referenced_works.csv.gz'), | |
'columns': [ | |
'work_id', 'referenced_work_id' | |
] | |
}, | |
'related_works': { | |
'name': os.path.join(CSV_DIR, 'works_related_works.csv.gz'), | |
'columns': [ | |
'work_id', 'related_work_id' | |
] | |
}, | |
}, | |
} | |
def flatten_concepts(): | |
with gzip.open(csv_files['concepts']['concepts']['name'], 'wt', encoding='utf-8') as concepts_csv, \ | |
gzip.open(csv_files['concepts']['ancestors']['name'], 'wt', encoding='utf-8') as ancestors_csv, \ | |
gzip.open(csv_files['concepts']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv, \ | |
gzip.open(csv_files['concepts']['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(csv_files['concepts']['related_concepts']['name'], 'wt', encoding='utf-8') as related_concepts_csv: | |
concepts_writer = csv.DictWriter( | |
concepts_csv, fieldnames=csv_files['concepts']['concepts']['columns'], extrasaction='ignore' | |
) | |
concepts_writer.writeheader() | |
ancestors_writer = csv.DictWriter(ancestors_csv, fieldnames=csv_files['concepts']['ancestors']['columns']) | |
ancestors_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=csv_files['concepts']['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, fieldnames=csv_files['concepts']['ids']['columns']) | |
ids_writer.writeheader() | |
related_concepts_writer = csv.DictWriter(related_concepts_csv, fieldnames=csv_files['concepts']['related_concepts']['columns']) | |
related_concepts_writer.writeheader() | |
seen_concept_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'concepts', '*', '*.gz')): | |
print(jsonl_file_name) | |
with gzip.open(jsonl_file_name, 'r') as concepts_jsonl: | |
for concept_json in concepts_jsonl: | |
if not concept_json.strip(): | |
continue | |
concept = json.loads(concept_json) | |
if not (concept_id := concept.get('id')) or concept_id in seen_concept_ids: | |
continue | |
seen_concept_ids.add(concept_id) | |
concepts_writer.writerow(concept) | |
if concept_ids := concept.get('ids'): | |
concept_ids['concept_id'] = concept_id | |
concept_ids['umls_aui'] = json.dumps(concept_ids.get('umls_aui'), ensure_ascii=False) | |
concept_ids['umls_cui'] = json.dumps(concept_ids.get('umls_cui'), ensure_ascii=False) | |
ids_writer.writerow(concept_ids) | |
if ancestors := concept.get('ancestors'): | |
for ancestor in ancestors: | |
if ancestor_id := ancestor.get('id'): | |
ancestors_writer.writerow({ | |
'concept_id': concept_id, | |
'ancestor_id': ancestor_id | |
}) | |
if counts_by_year := concept.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['concept_id'] = concept_id | |
counts_by_year_writer.writerow(count_by_year) | |
if related_concepts := concept.get('related_concepts'): | |
for related_concept in related_concepts: | |
if related_concept_id := related_concept.get('id'): | |
related_concepts_writer.writerow({ | |
'concept_id': concept_id, | |
'related_concept_id': related_concept_id, | |
'score': related_concept.get('score') | |
}) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_venues(): | |
with gzip.open(csv_files['venues']['venues']['name'], 'wt', encoding='utf-8') as venues_csv, \ | |
gzip.open(csv_files['venues']['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(csv_files['venues']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv: | |
venues_writer = csv.DictWriter( | |
venues_csv, fieldnames=csv_files['venues']['venues']['columns'], extrasaction='ignore' | |
) | |
venues_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, fieldnames=csv_files['venues']['ids']['columns']) | |
ids_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=csv_files['venues']['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
seen_venue_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'venues', '*', '*.gz')): | |
print(jsonl_file_name) | |
with gzip.open(jsonl_file_name, 'r') as venues_jsonl: | |
for venue_json in venues_jsonl: | |
if not venue_json.strip(): | |
continue | |
venue = json.loads(venue_json) | |
if not (venue_id := venue.get('id')) or venue_id in seen_venue_ids: | |
continue | |
seen_venue_ids.add(venue_id) | |
venue['issn'] = json.dumps(venue.get('issn')) | |
venues_writer.writerow(venue) | |
if venue_ids := venue.get('ids'): | |
venue_ids['venue_id'] = venue_id | |
venue_ids['issn'] = json.dumps(venue_ids.get('issn')) | |
ids_writer.writerow(venue_ids) | |
if counts_by_year := venue.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['venue_id'] = venue_id | |
counts_by_year_writer.writerow(count_by_year) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_institutions(): | |
file_spec = csv_files['institutions'] | |
with gzip.open(file_spec['institutions']['name'], 'wt', encoding='utf-8') as institutions_csv, \ | |
gzip.open(file_spec['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(file_spec['geo']['name'], 'wt', encoding='utf-8') as geo_csv, \ | |
gzip.open(file_spec['associated_institutions']['name'], 'wt', encoding='utf-8') as associated_institutions_csv, \ | |
gzip.open(file_spec['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv: | |
institutions_writer = csv.DictWriter( | |
institutions_csv, fieldnames=file_spec['institutions']['columns'], extrasaction='ignore' | |
) | |
institutions_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, fieldnames=file_spec['ids']['columns']) | |
ids_writer.writeheader() | |
geo_writer = csv.DictWriter(geo_csv, fieldnames=file_spec['geo']['columns']) | |
geo_writer.writeheader() | |
associated_institutions_writer = csv.DictWriter( | |
associated_institutions_csv, fieldnames=file_spec['associated_institutions']['columns'] | |
) | |
associated_institutions_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=file_spec['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
seen_institution_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'institutions', '*', '*.gz')): | |
print(jsonl_file_name) | |
with gzip.open(jsonl_file_name, 'r') as institutions_jsonl: | |
for institution_json in institutions_jsonl: | |
if not institution_json.strip(): | |
continue | |
institution = json.loads(institution_json) | |
if not (institution_id := institution.get('id')) or institution_id in seen_institution_ids: | |
continue | |
seen_institution_ids.add(institution_id) | |
# institutions | |
institution['display_name_acroynyms'] = json.dumps(institution.get('display_name_acroynyms'), ensure_ascii=False) | |
institution['display_name_alternatives'] = json.dumps(institution.get('display_name_alternatives'), ensure_ascii=False) | |
institutions_writer.writerow(institution) | |
# idss | |
if institution_ids := institution.get('ids'): | |
institution_ids['institution_id'] = institution_id | |
ids_writer.writerow(institution_ids) | |
# geo | |
if institution_geo := institution.get('geo'): | |
institution_geo['institution_id'] = institution_id | |
geo_writer.writerow(institution_geo) | |
# associated_institutions | |
if associated_institutions := institution.get( | |
'associated_institutions', institution.get('associated_insitutions') # typo in api | |
): | |
for associated_institution in associated_institutions: | |
if associated_institution_id := associated_institution.get('id'): | |
associated_institutions_writer.writerow({ | |
'institution_id': institution_id, | |
'associated_institution_id': associated_institution_id, | |
'relationship': associated_institution.get('relationship') | |
}) | |
# counts_by_year | |
if counts_by_year := institution.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['institution_id'] = institution_id | |
counts_by_year_writer.writerow(count_by_year) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_authors(): | |
file_spec = csv_files['authors'] | |
with gzip.open(file_spec['authors']['name'], 'wt', encoding='utf-8') as authors_csv, \ | |
gzip.open(file_spec['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(file_spec['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv: | |
authors_writer = csv.DictWriter( | |
authors_csv, fieldnames=file_spec['authors']['columns'], extrasaction='ignore' | |
) | |
authors_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, fieldnames=file_spec['ids']['columns']) | |
ids_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=file_spec['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'authors', '*', '*.gz')): | |
print(jsonl_file_name) | |
with gzip.open(jsonl_file_name, 'r') as authors_jsonl: | |
for author_json in authors_jsonl: | |
if not author_json.strip(): | |
continue | |
author = json.loads(author_json) | |
if not (author_id := author.get('id')): | |
continue | |
# authors | |
author['display_name_alternatives'] = json.dumps(author.get('display_name_alternatives'), ensure_ascii=False) | |
author['last_known_institution'] = (author.get('last_known_institution') or {}).get('id') | |
authors_writer.writerow(author) | |
# ids | |
if author_ids := author.get('ids'): | |
author_ids['author_id'] = author_id | |
ids_writer.writerow(author_ids) | |
# counts_by_year | |
if counts_by_year := author.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['author_id'] = author_id | |
counts_by_year_writer.writerow(count_by_year) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_works(): | |
file_spec = csv_files['works'] | |
with gzip.open(file_spec['works']['name'], 'wt', encoding='utf-8') as works_csv, \ | |
gzip.open(file_spec['host_venues']['name'], 'wt', encoding='utf-8') as host_venues_csv, \ | |
gzip.open(file_spec['alternate_host_venues']['name'], 'wt', encoding='utf-8') as alternate_host_venues_csv, \ | |
gzip.open(file_spec['authorships']['name'], 'wt', encoding='utf-8') as authorships_csv, \ | |
gzip.open(file_spec['biblio']['name'], 'wt', encoding='utf-8') as biblio_csv, \ | |
gzip.open(file_spec['concepts']['name'], 'wt', encoding='utf-8') as concepts_csv, \ | |
gzip.open(file_spec['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(file_spec['mesh']['name'], 'wt', encoding='utf-8') as mesh_csv, \ | |
gzip.open(file_spec['open_access']['name'], 'wt', encoding='utf-8') as open_access_csv, \ | |
gzip.open(file_spec['referenced_works']['name'], 'wt', encoding='utf-8') as referenced_works_csv, \ | |
gzip.open(file_spec['related_works']['name'], 'wt', encoding='utf-8') as related_works_csv: | |
works_writer = init_dict_writer(works_csv, file_spec['works'], extrasaction='ignore') | |
host_venues_writer = init_dict_writer(host_venues_csv, file_spec['host_venues']) | |
alternate_host_venues_writer = init_dict_writer(alternate_host_venues_csv, file_spec['alternate_host_venues']) | |
authorships_writer = init_dict_writer(authorships_csv, file_spec['authorships']) | |
biblio_writer = init_dict_writer(biblio_csv, file_spec['biblio']) | |
concepts_writer = init_dict_writer(concepts_csv, file_spec['concepts']) | |
ids_writer = init_dict_writer(ids_csv, file_spec['ids'], extrasaction='ignore') | |
mesh_writer = init_dict_writer(mesh_csv, file_spec['mesh']) | |
open_access_writer = init_dict_writer(open_access_csv, file_spec['open_access']) | |
referenced_works_writer = init_dict_writer(referenced_works_csv, file_spec['referenced_works']) | |
related_works_writer = init_dict_writer(related_works_csv, file_spec['related_works']) | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'works', '*', '*.gz')): | |
print(jsonl_file_name) | |
with gzip.open(jsonl_file_name, 'r') as works_jsonl: | |
for work_json in works_jsonl: | |
if not work_json.strip(): | |
continue | |
work = json.loads(work_json) | |
if not (work_id := work.get('id')): | |
continue | |
# works | |
if (abstract := work.get('abstract_inverted_index')) is not None: | |
work['abstract_inverted_index'] = json.dumps(abstract, ensure_ascii=False) | |
works_writer.writerow(work) | |
# host_venues | |
if host_venue := (work.get('host_venue') or {}): | |
if host_venue_id := host_venue.get('id'): | |
host_venues_writer.writerow({ | |
'work_id': work_id, | |
'venue_id': host_venue_id, | |
'url': host_venue.get('url'), | |
'is_oa': host_venue.get('is_oa'), | |
'version': host_venue.get('version'), | |
'license': host_venue.get('license'), | |
}) | |
# alternate_host_venues | |
if alternate_host_venues := work.get('alternate_host_venues'): | |
for alternate_host_venue in alternate_host_venues: | |
if venue_id := alternate_host_venue.get('id'): | |
alternate_host_venues_writer.writerow({ | |
'work_id': work_id, | |
'venue_id': venue_id, | |
'url': alternate_host_venue.get('url'), | |
'is_oa': alternate_host_venue.get('is_oa'), | |
'version': alternate_host_venue.get('version'), | |
'license': alternate_host_venue.get('license'), | |
}) | |
# authorships | |
if authorships := work.get('authorships'): | |
for authorship in authorships: | |
if author_id := authorship.get('author', {}).get('id'): | |
institutions = authorship.get('institutions') | |
institution_ids = [i.get('id') for i in institutions] | |
institution_ids = [i for i in institution_ids if i] | |
institution_ids = institution_ids or [None] | |
for institution_id in institution_ids: | |
authorships_writer.writerow({ | |
'work_id': work_id, | |
'author_position': authorship.get('author_position'), | |
'author_id': author_id, | |
'institution_id': institution_id, | |
'raw_affiliation_string': authorship.get('raw_affiliation_string'), | |
}) | |
# biblio | |
if biblio := work.get('biblio'): | |
biblio['work_id'] = work_id | |
biblio_writer.writerow(biblio) | |
# concepts | |
for concept in work.get('concepts'): | |
if concept_id := concept.get('id'): | |
concepts_writer.writerow({ | |
'work_id': work_id, | |
'concept_id': concept_id, | |
'score': concept.get('score'), | |
}) | |
# ids | |
if ids := work.get('ids'): | |
ids['work_id'] = work_id | |
ids_writer.writerow(ids) | |
# mesh | |
for mesh in work.get('mesh'): | |
mesh['work_id'] = work_id | |
mesh_writer.writerow(mesh) | |
# open_access | |
if open_access := work.get('open_access'): | |
open_access['work_id'] = work_id | |
open_access_writer.writerow(open_access) | |
# referenced_works | |
for referenced_work in work.get('referenced_works'): | |
if referenced_work: | |
referenced_works_writer.writerow({ | |
'work_id': work_id, | |
'referenced_work_id': referenced_work | |
}) | |
# related_works | |
for related_work in work.get('related_works'): | |
if related_work: | |
related_works_writer.writerow({ | |
'work_id': work_id, | |
'related_work_id': related_work | |
}) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def init_dict_writer(csv_file, file_spec, **kwargs): | |
writer = csv.DictWriter( | |
csv_file, fieldnames=file_spec['columns'], **kwargs | |
) | |
writer.writeheader() | |
return writer | |
if __name__ == '__main__': | |
flatten_concepts() | |
flatten_venues() | |
flatten_institutions() | |
flatten_authors() | |
flatten_works() |
Hi @richard-orr, what is the motivation for using the function init_dict_writer() in flatten_works() but csv.DictWriter() in all other entities?
@JeremyBrent > Hi @richard-orr, what is the motivation for using the function init_dict_writer() in flatten_works() but csv.DictWriter() in all other entities?
No particular reason, both versions do the same thing. init_dict_writer
is a very short 2-line function and it wasn't until I got to Works and needed to open 11 writers that I got tired of repeating them. I just never went back and replaced the existing instances of
geo_writer = csv.DictWriter(geo_csv, fieldnames=file_spec['geo']['columns'])
geo_writer.writeheader()
with geo_writer = init_dict_writer(geo_csv, file_spec['geo'])
for consistency. I'm going to make a few changes to address a question about parallelism above and I'll clean up some inconsistencies like this at that time.
@portisto > any suggestion how to parallelise the flattening workload per given dataset (for authors and/or works)?
The easiest way would probably be to use multiprocessing.Pool. Instead of a loop like
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'works', '*', '*.gz')):
# flatten jsonl_file_name
You would move the flattening work into a function and use the Pool to handle the files in parallel:
from multiprocessing import Pool
NUM_THREADS=4
def flatten_file(jsonl_file_name):
# flatten jsonl_file_name
json_files = glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'works', '*', '*.gz')):
with Pool(processes=NUM_THREADS) as p:
p.map(flatten_file, json_files))
The trouble is that flatten_file
can't write to the same set of CSV files in each thread; you would need to open a separate csv file for each input file or use a Lock to ensure that complete lines are written separately and don't overlap.
I didn't parallelize this because I didn't want it to eat up all the CPU on unsuspecting users' machines, but I'd like to come back to it and try the above while keeping the default number of threads at 1.
Hi @richard-orr , I found that there may be encoding problems in your script. After trying your script, Chinese characters turned out to be in unicode like \u9678\u606d\u8559
in generated CSVs, thus causing some parsing problems in my research.
I finally found out how to fix this. Though csvs were opened with utf-8
, the json.dumps()
functions would write in unicode, if you don't specify ensure_ascii=False
. So I suggest adding this into all json.dumps()
.
@portisto > any suggestion how to parallelise the flattening workload per given dataset (for authors and/or works)?
if it helps
based on Richard's scripts, I have a parallized script to flatten works here
https://github.com/almugabo/openalex_qa/tree/main/___db_scripts
it takes about 3 hours with 20 processes on my machine.
Hi @richard-orr, what is the purpose of the following:
"""
FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0'))
....
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
"""
@almugabo your parallel work function works great! Thanks for providing access to the community. One question about it, you have a seen_work_ids set, but if each input file is going to separate core, and they are truly parallel processes, is this set doing anything? Theoretically this set should never be > len(works_in_one_file). This set would be able to handle duplicates within one input file but not across all of them, is that correct?
Hi @richard-orr , I found that there may be encoding problems in your script. After trying your script, Chinese characters turned out to be in unicode like
\u9678\u606d\u8559
in generated CSVs, thus causing some parsing problems in my research.I finally found out how to fix this. Though csvs were opened with
utf-8
, thejson.dumps()
functions would write in unicode, if you don't specifyensure_ascii=False
. So I suggest adding this into alljson.dumps()
.
Thank you! Most JSON parsers should be able to read the unicode escape sequences (\u9678\u606d\u8559
) just as well as UTF-8 encoded characters (陸恭蕙
) but the latter is much more readable. I followed your suggestion for text fields.
Hi @richard-orr, what is the purpose of the following: """ FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0')) .... files_done += 1 if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: break """
I added those lines to help with debugging the script. Since it takes hours to flatten the complete dataset, I wanted to be able to do a partial run like
$ OPENALEX_DEMO_FILES_PER_ENTITY=5 python flatten-openalex-jsonl.py
and have the script stop early but cleanly, after a certain number of input files. Unless you set the OPENALEX_DEMO_FILES_PER_ENTITY environment variable they won't have any effect.
Hi Jeremy, sorry for late reply. Correct. The seen_work_ids does nothing in the code. I had initially planned to use it to verify (and delete) duplicates in the results folders but finally didn't need to.
Hi @richard-orr , I found there's a typo in your script, 'acroynyms' should be 'acronyms':
- line 18
'display_name_acronyms', 'display_name_alternatives', 'works_count', 'cited_by_count', 'works_api_url',
- line 356
institution['display_name_acronyms'] = json.dumps(institution.get('display_name_acronyms'), ensure_ascii=False)
I suggest fixing them, or the Writer
would write 'none'
for all display_name_acronyms
.
ps. also found typo in the figure at postgres-schema-diagram.
There is something wrong when i try to flatten the data
Traceback (most recent call last):
File "convert.py", line 594, in <module>
flatten_venues()
File "convert.py", line 298, in flatten_venues
ids_writer.writerow(venue_ids)
File "/usr/lib/python3.8/csv.py", line 154, in writerow
return self.writer.writerow(self._dict_to_list(rowdict))
File "/usr/lib/python3.8/csv.py", line 149, in _dict_to_list
raise ValueError("dict contains fields not in fieldnames: "
ValueError: dict contains fields not in fieldnames: 'wikidata', 'fatcat'
@richard-orr Hi, i found the method which can probably fix it by adding the 'extrasaction='ignore'' to all the csv writers of flatten_venues function
Adapted for MySQL / MariaDB, using tab delimiters and each content type runs in parallel: https://github.com/TEC-IST/openalex-mysql-mariadb-toolkit/blob/main/flatten-works-to-tsv.py
This script has moved here: https://github.com/ourresearch/openalex-documentation-scripts/blob/main/flatten-openalex-jsonl.py
Changes are:
- venues renamed to sources
- add new publishers entity
- supports new locations schema within works (primary_location, locations, best_oa_location)
any suggestion how to parallelise the flattening workload per given dataset (for authors and/or works)?