Skip to content

Instantly share code, notes, and snippets.

@caseydm
Created November 13, 2023 16:21
Show Gist options
  • Save caseydm/3b3791a8720dd660842bbf6000f41503 to your computer and use it in GitHub Desktop.
Save caseydm/3b3791a8720dd660842bbf6000f41503 to your computer and use it in GitHub Desktop.
get openalex authors
import csv
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def safe_nested_get(dct, keys, default=None):
"""
Safely get a nested value from a dictionary.
:param dct: The dictionary to search.
:param keys: A list of keys to traverse.
:param default: The default value to return if any key is not found.
:return: The value found or the default value.
"""
for key in keys:
if dct is not None and key in dct:
dct = dct[key]
else:
return default
return dct
def fetch_data(url, max_retries=3, backoff_factor=1):
for attempt in range(max_retries):
try:
r = requests.get(url)
if r.status_code == 200:
return r.json()
else:
# If status code is not 200, log it and retry
print(f"Request failed with status code {r.status_code}. Attempt {attempt + 1} of {max_retries}")
except requests.RequestException as e:
# Log exception and retry
print(f"Request failed due to exception: {e}. Attempt {attempt + 1} of {max_retries}")
# Delay before the next retry (exponential backoff)
time.sleep(backoff_factor * (2 ** attempt))
# If all retries fail, return None or handle as appropriate
print("All retries failed.")
return None
def process_item(item):
found_affiliation = False
for authorship in item["authorships"]:
if authorship["raw_affiliation_string"]:
found_affiliation = True
break
if not found_affiliation:
source_display_name_path = ["primary_location", "source", "display_name"]
publisher_name_path = ["primary_location", "source", "host_organization_name"]
source_display_name = safe_nested_get(item, source_display_name_path, "unknown")
publisher_name = safe_nested_get(item, publisher_name_path, "unknown")
print(f'{item["id"]} with source {source_display_name} ({publisher_name}) has no affiliations')
# write to csv
with open('missing_affiliations_out.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow([item["id"], item["doi"], source_display_name, publisher_name])
def main():
count = 0
with open('missing_affiliations.csv', 'r') as f:
reader = csv.DictReader(f)
work_ids = []
futures = []
with ThreadPoolExecutor(max_workers=10) as executor:
for row in reader:
count += 1
work_ids.append(f'W{row["work_id"]}')
if len(work_ids) == 50:
url = f'https://api.openalex.org/works?filter=openalex_id:{"|".join(work_ids)}&select=id,primary_location,authorships,doi&per-page=50&mailto=team@ourresearch.org'
future = executor.submit(fetch_data, url)
futures.append(future)
work_ids = []
if count % 100 == 0:
print(f'processed {count} works')
for future in as_completed(futures):
data = future.result()
if data:
for item in data['results']:
process_item(item)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment