Skip to content

Instantly share code, notes, and snippets.

@MarioZZJ
Forked from richard-orr/flatten-openalex-jsonl.py
Last active November 12, 2022 07:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MarioZZJ/3373ecd492af16728ac6c726d8639acd to your computer and use it in GitHub Desktop.
Save MarioZZJ/3373ecd492af16728ac6c726d8639acd to your computer and use it in GitHub Desktop.
flatten openalex JSON Lines files to TSV readable by Spark
# python 3.8+ required
import csv
import glob
import gzip
import json
import os
SNAPSHOT_DIR = 'openalex-snapshot'
CSV_DIR = 'tsv-files'
FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0'))
csv_files = {
'institutions': {
'institutions': {
'name': os.path.join(CSV_DIR, 'institutions.tsv'),
'columns': [
'id', 'ror', 'display_name', 'country_code', 'type', 'homepage_url', 'image_url', 'image_thumbnail_url',
'display_name_acronyms', 'display_name_alternatives', 'works_count', 'cited_by_count', 'works_api_url',
'updated_date'
] # id modified
},
'ids': {
'name': os.path.join(CSV_DIR, 'institutions_ids.tsv'),
'columns': [
'institution_id', 'openalex', 'ror', 'grid', 'wikipedia', 'wikidata', 'mag'
] # institution_id modified
},
'geo': {
'name': os.path.join(CSV_DIR, 'institutions_geo.tsv'),
'columns': [
'institution_id', 'city', 'geonames_city_id', 'region', 'country_code', 'country', 'latitude',
'longitude'
] # institution_id modified
},
'associated_institutions': {
'name': os.path.join(CSV_DIR, 'institutions_associated_institutions.tsv'),
'columns': [
'institution_id', 'associated_institution_id', 'relationship'
] # institution_id, associated_institution_id modified
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'institutions_counts_by_year.tsv'),
'columns': [
'institution_id', 'year', 'works_count', 'cited_by_count'
] # institution_id modified
}
},
'authors': {
'authors': {
'name': os.path.join(CSV_DIR, 'authors.tsv'),
'columns': [
'id', 'orcid', 'display_name', 'display_name_alternatives', 'works_count', 'cited_by_count',
'last_known_institution', 'works_api_url', 'updated_date'
] # id modified
},
'ids': {
'name': os.path.join(CSV_DIR, 'authors_ids.tsv'),
'columns': [
'author_id', 'openalex', 'orcid', 'scopus', 'twitter', 'wikipedia', 'mag'
] # author_id modified
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'authors_counts_by_year.tsv'),
'columns': [
'author_id', 'year', 'works_count', 'cited_by_count'
] # author_id modified
}
},
'concepts': {
'concepts': {
'name': os.path.join(CSV_DIR, 'concepts.tsv'),
'columns': [
'id', 'wikidata', 'display_name', 'level', 'description', 'works_count', 'cited_by_count', 'image_url',
'image_thumbnail_url', 'works_api_url', 'updated_date' # id modified
]
},
'ancestors': {
'name': os.path.join(CSV_DIR, 'concepts_ancestors.tsv'),
'columns': ['concept_id', 'ancestor_id'] # id modified
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'concepts_counts_by_year.tsv'),
'columns': ['concept_id', 'year', 'works_count', 'cited_by_count']
},
'ids': {
'name': os.path.join(CSV_DIR, 'concepts_ids.tsv'),
'columns': ['concept_id', 'openalex', 'wikidata', 'wikipedia', 'umls_aui', 'umls_cui', 'mag'] # id modified
},
'related_concepts': {
'name': os.path.join(CSV_DIR, 'concepts_related_concepts.tsv'),
'columns': ['concept_id', 'related_concept_id', 'score'] # id modified
}
},
'venues': {
'venues': {
'name': os.path.join(CSV_DIR, 'venues.tsv'),
'columns': [
'id', 'issn_l', 'issn', 'display_name', 'publisher', 'works_count', 'cited_by_count', 'is_oa',
'is_in_doaj', 'homepage_url', 'works_api_url', 'updated_date'
] # id modified
},
'ids': {
'name': os.path.join(CSV_DIR, 'venues_ids.tsv'),
'columns': ['venue_id', 'openalex', 'issn_l', 'issn', 'mag'] # id modified
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'venues_counts_by_year.tsv'),
'columns': ['venue_id', 'year', 'works_count', 'cited_by_count'] # id modified
},
},
'works': {
'works': {
'name': os.path.join(CSV_DIR, 'works.tsv'),
'columns': [
'id', 'doi', 'title', 'display_name', 'publication_year', 'publication_date', 'type', 'cited_by_count',
'is_retracted', 'is_paratext', 'cited_by_api_url', 'abstract_inverted_index'
] # id modified
},
'host_venues': {
'name': os.path.join(CSV_DIR, 'works_host_venues.tsv'),
'columns': [
'work_id', 'venue_id', 'url', 'is_oa', 'version', 'license'
] # work_id, venue_id modified
},
'alternate_host_venues': {
'name': os.path.join(CSV_DIR, 'works_alternate_host_venues.tsv'),
'columns': [
'work_id', 'venue_id', 'url', 'is_oa', 'version', 'license'
] # work_id, venue_id modified
},
'authorships': {
'name': os.path.join(CSV_DIR, 'works_authorships.tsv'),
'columns': [
'work_id', 'author_position', 'author_id', 'institution_id', 'raw_affiliation_string'
] # work_id, author_id, institution_id modified
},
'biblio': {
'name': os.path.join(CSV_DIR, 'works_biblio.tsv'),
'columns': [
'work_id', 'volume', 'issue', 'first_page', 'last_page'
] # work_id modified
},
'concepts': {
'name': os.path.join(CSV_DIR, 'works_concepts.tsv'),
'columns': [
'work_id', 'concept_id', 'score'
] # work_id, concept_id modified
},
'ids': {
'name': os.path.join(CSV_DIR, 'works_ids.tsv'),
'columns': [
'work_id', 'openalex', 'doi', 'mag', 'pmid', 'pmcid'
] # work_id modified
},
'mesh': {
'name': os.path.join(CSV_DIR, 'works_mesh.tsv'),
'columns': [
'work_id', 'descriptor_ui', 'descriptor_name', 'qualifier_ui', 'qualifier_name', 'is_major_topic'
] # work_id modified
},
'open_access': {
'name': os.path.join(CSV_DIR, 'works_open_access.tsv'),
'columns': [
'work_id', 'is_oa', 'oa_status', 'oa_url'
] # work_id modified
},
'referenced_works': {
'name': os.path.join(CSV_DIR, 'works_referenced_works.tsv'),
'columns': [
'work_id', 'referenced_work_id'
] # work_id, referenced_work_id modified
},
'related_works': {
'name': os.path.join(CSV_DIR, 'works_related_works.tsv'),
'columns': [
'work_id', 'related_work_id'
] # work_id, related_work_id modified
},
},
}
def flatten_concepts():
with open(csv_files['concepts']['concepts']['name'], 'w', encoding='utf-8') as concepts_csv, \
open(csv_files['concepts']['ancestors']['name'], 'w', encoding='utf-8') as ancestors_csv, \
open(csv_files['concepts']['counts_by_year']['name'], 'w', encoding='utf-8') as counts_by_year_csv, \
open(csv_files['concepts']['ids']['name'], 'w', encoding='utf-8') as ids_csv, \
open(csv_files['concepts']['related_concepts']['name'], 'w', encoding='utf-8') as related_concepts_csv:
concepts_writer = csv.DictWriter(
concepts_csv, fieldnames=csv_files['concepts']['concepts']['columns'], extrasaction='ignore', delimiter='\t'
)
concepts_writer.writeheader()
ancestors_writer = csv.DictWriter(ancestors_csv, fieldnames=csv_files['concepts']['ancestors']['columns'], delimiter='\t')
ancestors_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=csv_files['concepts']['counts_by_year']['columns'], delimiter='\t')
counts_by_year_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv, fieldnames=csv_files['concepts']['ids']['columns'], delimiter='\t')
ids_writer.writeheader()
related_concepts_writer = csv.DictWriter(related_concepts_csv, fieldnames=csv_files['concepts']['related_concepts']['columns'], delimiter='\t')
related_concepts_writer.writeheader()
seen_concept_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'concepts', '*', '*.gz')):
print(jsonl_file_name)
with gzip.open(jsonl_file_name, 'r') as concepts_jsonl:
for concept_json in concepts_jsonl:
if not concept_json.strip():
continue
concept = json.loads(concept_json)
if not (concept_id := concept.get('id')):
continue
concept_id = concept_id.replace('https://openalex.org/C','')
concept['id'] = concept_id
if concept_id in seen_concept_ids:
continue
seen_concept_ids.add(concept_id)
concepts_writer.writerow(concept)
if concept_ids := concept.get('ids'):
concept_ids['concept_id'] = concept_id
concept_ids['umls_aui'] = json.dumps(concept_ids.get('umls_aui'), ensure_ascii=False)
concept_ids['umls_cui'] = json.dumps(concept_ids.get('umls_cui'), ensure_ascii=False)
ids_writer.writerow(concept_ids)
if ancestors := concept.get('ancestors'):
for ancestor in ancestors:
if ancestor_id := ancestor.get('id'):
ancestors_writer.writerow({
'concept_id': concept_id,
'ancestor_id': ancestor_id.replace('https://openalex.org/C','')
})
if counts_by_year := concept.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['concept_id'] = concept_id
counts_by_year_writer.writerow(count_by_year)
if related_concepts := concept.get('related_concepts'):
for related_concept in related_concepts:
if related_concept_id := related_concept.get('id'):
related_concepts_writer.writerow({
'concept_id': concept_id,
'related_concept_id': related_concept_id.replace('https://openalex.org/C',''),
'score': related_concept.get('score')
})
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_venues():
with open(csv_files['venues']['venues']['name'], 'w', encoding='utf-8') as venues_csv, \
open(csv_files['venues']['ids']['name'], 'w', encoding='utf-8') as ids_csv, \
open(csv_files['venues']['counts_by_year']['name'], 'w', encoding='utf-8') as counts_by_year_csv:
venues_writer = csv.DictWriter(
venues_csv, fieldnames=csv_files['venues']['venues']['columns'], extrasaction='ignore', delimiter='\t'
)
venues_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv, fieldnames=csv_files['venues']['ids']['columns'], delimiter='\t')
ids_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=csv_files['venues']['counts_by_year']['columns'], delimiter='\t')
counts_by_year_writer.writeheader()
seen_venue_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'venues', '*', '*.gz')):
print(jsonl_file_name)
with gzip.open(jsonl_file_name, 'r') as venues_jsonl:
for venue_json in venues_jsonl:
if not venue_json.strip():
continue
venue = json.loads(venue_json)
if not (venue_id := venue.get('id')):
continue
venue_id = venue_id.replace('https://openalex.org/V','')
venue['id'] = venue_id
if venue_id in seen_venue_ids:
continue
seen_venue_ids.add(venue_id)
venue['issn'] = json.dumps(venue.get('issn'))
venues_writer.writerow(venue)
if venue_ids := venue.get('ids'):
venue_ids['venue_id'] = venue_id
venue_ids['issn'] = json.dumps(venue_ids.get('issn'))
ids_writer.writerow(venue_ids)
if counts_by_year := venue.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['venue_id'] = venue_id
counts_by_year_writer.writerow(count_by_year)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_institutions():
file_spec = csv_files['institutions']
with open(file_spec['institutions']['name'], 'w', encoding='utf-8') as institutions_csv, \
open(file_spec['ids']['name'], 'w', encoding='utf-8') as ids_csv, \
open(file_spec['geo']['name'], 'w', encoding='utf-8') as geo_csv, \
open(file_spec['associated_institutions']['name'], 'w', encoding='utf-8') as associated_institutions_csv, \
open(file_spec['counts_by_year']['name'], 'w', encoding='utf-8') as counts_by_year_csv:
institutions_writer = csv.DictWriter(
institutions_csv, fieldnames=file_spec['institutions']['columns'], extrasaction='ignore', delimiter='\t'
)
institutions_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv, fieldnames=file_spec['ids']['columns'], delimiter='\t')
ids_writer.writeheader()
geo_writer = csv.DictWriter(geo_csv, fieldnames=file_spec['geo']['columns'], delimiter='\t')
geo_writer.writeheader()
associated_institutions_writer = csv.DictWriter(
associated_institutions_csv, fieldnames=file_spec['associated_institutions']['columns'], delimiter='\t'
)
associated_institutions_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=file_spec['counts_by_year']['columns'], delimiter='\t')
counts_by_year_writer.writeheader()
seen_institution_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'institutions', '*', '*.gz')):
print(jsonl_file_name)
with gzip.open(jsonl_file_name, 'r') as institutions_jsonl:
for institution_json in institutions_jsonl:
if not institution_json.strip():
continue
institution = json.loads(institution_json)
if not (institution_id := institution.get('id')):
continue
institution_id = institution_id.replace('https://openalex.org/I','')
institution['id'] = institution_id
if institution_id in seen_institution_ids:
continue
seen_institution_ids.add(institution_id)
# institutions
institution['display_name_acronyms'] = json.dumps(institution.get('display_name_acronyms'), ensure_ascii=False)
institution['display_name_alternatives'] = json.dumps(institution.get('display_name_alternatives'), ensure_ascii=False)
institutions_writer.writerow(institution)
# idss
if institution_ids := institution.get('ids'):
institution_ids['institution_id'] = institution_id
ids_writer.writerow(institution_ids)
# geo
if institution_geo := institution.get('geo'):
institution_geo['institution_id'] = institution_id
geo_writer.writerow(institution_geo)
# associated_institutions
if associated_institutions := institution.get(
'associated_institutions', institution.get('associated_insitutions') # typo in api
):
for associated_institution in associated_institutions:
if associated_institution_id := associated_institution.get('id'):
associated_institutions_writer.writerow({
'institution_id': institution_id,
'associated_institution_id': associated_institution_id.replace('https://openalex.org/I',''),
'relationship': associated_institution.get('relationship')
})
# counts_by_year
if counts_by_year := institution.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['institution_id'] = institution_id
counts_by_year_writer.writerow(count_by_year)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_authors():
file_spec = csv_files['authors']
with open(file_spec['authors']['name'], 'w', encoding='utf-8') as authors_csv, \
open(file_spec['ids']['name'], 'w', encoding='utf-8') as ids_csv, \
open(file_spec['counts_by_year']['name'], 'w', encoding='utf-8') as counts_by_year_csv:
authors_writer = csv.DictWriter(
authors_csv, fieldnames=file_spec['authors']['columns'], extrasaction='ignore', delimiter='\t'
)
authors_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv, fieldnames=file_spec['ids']['columns'], delimiter='\t')
ids_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=file_spec['counts_by_year']['columns'], delimiter='\t')
counts_by_year_writer.writeheader()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'authors', '*', '*.gz')):
print(jsonl_file_name)
with gzip.open(jsonl_file_name, 'r') as authors_jsonl:
for author_json in authors_jsonl:
if not author_json.strip():
continue
author = json.loads(author_json)
if not (author_id := author.get('id')):
continue
author_id = author_id.replace('https://openalex.org/A','')
author['id'] = author_id
# authors
author['display_name_alternatives'] = json.dumps(author.get('display_name_alternatives'), ensure_ascii=False)
author['last_known_institution'] = (author.get('last_known_institution') or {}).get('id')
if author['last_known_institution']:
author['last_known_institution'] = author['last_known_institution'].replace('https://openalex.org/I','')
authors_writer.writerow(author)
# ids
if author_ids := author.get('ids'):
author_ids['author_id'] = author_id
ids_writer.writerow(author_ids)
# counts_by_year
if counts_by_year := author.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['author_id'] = author_id
counts_by_year_writer.writerow(count_by_year)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_works():
file_spec = csv_files['works']
with open(file_spec['works']['name'], 'w', encoding='utf-8') as works_csv, \
open(file_spec['host_venues']['name'], 'w', encoding='utf-8') as host_venues_csv, \
open(file_spec['alternate_host_venues']['name'], 'w', encoding='utf-8') as alternate_host_venues_csv, \
open(file_spec['authorships']['name'], 'w', encoding='utf-8') as authorships_csv, \
open(file_spec['biblio']['name'], 'w', encoding='utf-8') as biblio_csv, \
open(file_spec['concepts']['name'], 'w', encoding='utf-8') as concepts_csv, \
open(file_spec['ids']['name'], 'w', encoding='utf-8') as ids_csv, \
open(file_spec['mesh']['name'], 'w', encoding='utf-8') as mesh_csv, \
open(file_spec['open_access']['name'], 'w', encoding='utf-8') as open_access_csv, \
open(file_spec['referenced_works']['name'], 'w', encoding='utf-8') as referenced_works_csv, \
open(file_spec['related_works']['name'], 'w', encoding='utf-8') as related_works_csv:
works_writer = init_dict_writer(works_csv, file_spec['works'], extrasaction='ignore')
host_venues_writer = init_dict_writer(host_venues_csv, file_spec['host_venues'])
alternate_host_venues_writer = init_dict_writer(alternate_host_venues_csv, file_spec['alternate_host_venues'])
authorships_writer = init_dict_writer(authorships_csv, file_spec['authorships'])
biblio_writer = init_dict_writer(biblio_csv, file_spec['biblio'])
concepts_writer = init_dict_writer(concepts_csv, file_spec['concepts'])
ids_writer = init_dict_writer(ids_csv, file_spec['ids'], extrasaction='ignore')
mesh_writer = init_dict_writer(mesh_csv, file_spec['mesh'])
open_access_writer = init_dict_writer(open_access_csv, file_spec['open_access'])
referenced_works_writer = init_dict_writer(referenced_works_csv, file_spec['referenced_works'])
related_works_writer = init_dict_writer(related_works_csv, file_spec['related_works'])
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'works', '*', '*.gz')):
print(jsonl_file_name)
with gzip.open(jsonl_file_name, 'r') as works_jsonl:
for work_json in works_jsonl:
if not work_json.strip():
continue
work = json.loads(work_json)
if not (work_id := work.get('id')):
continue
work_id = work_id.replace('https://openalex.org/W','')
work['id'] = work_id
# works
if (abstract := work.get('abstract_inverted_index')) is not None:
work['abstract_inverted_index'] = json.dumps(abstract, ensure_ascii=False)
works_writer.writerow(work)
# host_venues
if host_venue := (work.get('host_venue') or {}):
if host_venue_id := host_venue.get('id'):
host_venues_writer.writerow({
'work_id': work_id,
'venue_id': host_venue_id.replace('https://openalex.org/V',''),
'url': host_venue.get('url'),
'is_oa': host_venue.get('is_oa'),
'version': host_venue.get('version'),
'license': host_venue.get('license'),
})
# alternate_host_venues
if alternate_host_venues := work.get('alternate_host_venues'):
for alternate_host_venue in alternate_host_venues:
if venue_id := alternate_host_venue.get('id'):
alternate_host_venues_writer.writerow({
'work_id': work_id,
'venue_id': venue_id.replace('https://openalex.org/V',''),
'url': alternate_host_venue.get('url'),
'is_oa': alternate_host_venue.get('is_oa'),
'version': alternate_host_venue.get('version'),
'license': alternate_host_venue.get('license'),
})
# authorships
if authorships := work.get('authorships'):
for authorship in authorships:
if author_id := authorship.get('author', {}).get('id'):
institutions = authorship.get('institutions')
institution_ids = [i.get('id') for i in institutions]
institution_ids = [i for i in institution_ids]
institution_ids = institution_ids or [None]
for institution_id in institution_ids:
authorships_writer.writerow({
'work_id': work_id,
'author_position': authorship.get('author_position'),
'author_id': author_id.replace('https://openalex.org/A',''),
'institution_id': institution_id.replace('https://openalex.org/I','') if institution_id else institution_id,
'raw_affiliation_string': authorship.get('raw_affiliation_string'),
})
# biblio
if biblio := work.get('biblio'):
biblio['work_id'] = work_id
biblio_writer.writerow(biblio)
# concepts
for concept in work.get('concepts'):
if concept_id := concept.get('id'):
concepts_writer.writerow({
'work_id': work_id,
'concept_id': concept_id.replace('https://openalex.org/C',''),
'score': concept.get('score'),
})
# ids
if ids := work.get('ids'):
ids['work_id'] = work_id
ids_writer.writerow(ids)
# mesh
for mesh in work.get('mesh'):
mesh['work_id'] = work_id
mesh_writer.writerow(mesh)
# open_access
if open_access := work.get('open_access'):
open_access['work_id'] = work_id
open_access_writer.writerow(open_access)
# referenced_works
for referenced_work in work.get('referenced_works'):
if referenced_work:
referenced_works_writer.writerow({
'work_id': work_id,
'referenced_work_id': referenced_work.replace('https://openalex.org/W',''),
})
# related_works
for related_work in work.get('related_works'):
if related_work:
related_works_writer.writerow({
'work_id': work_id,
'related_work_id': related_work.replace('https://openalex.org/W',''),
})
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def init_dict_writer(csv_file, file_spec, **kwargs):
writer = csv.DictWriter(
csv_file, fieldnames=file_spec['columns'], delimiter='\t', **kwargs
)
writer.writeheader()
return writer
if __name__ == '__main__':
flatten_concepts()
flatten_venues()
flatten_institutions()
flatten_authors()
flatten_works()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment