Created
June 23, 2011 23:25
-
-
Save hpiwowar/1043881 to your computer and use it in GitHub Desktop.
For annotating bibtex files using article info from customized spreadsheets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Initially written by Heather Piwowar, June 2011 | |
# Public domain: have at it! | |
# For annotating bibtex files, pulling in article info from customized spreadsheets | |
import csv | |
import re | |
import codecs | |
from pybtex.database.input import bibtex as bibtex_in | |
from pybtex.database.output import bibtex as bibtex_out | |
from operator import itemgetter, attrgetter | |
import pprint | |
import sys | |
from sample_bibtex import group_boundary_pairs | |
from sample_bibtex import get_group | |
from collections import defaultdict | |
def read_bib(filename): | |
parser = bibtex_in.Parser() | |
bib_data = parser.parse_file(filename) | |
#print(bib_data.entries['1'].fields['email']) | |
return(bib_data) | |
def read_spreadsheet(repository): | |
dataset_sample_fh = csv.DictReader(open(repository + "_datasets.csv", "r")) | |
spreadsheet_list = [] | |
for row in dataset_sample_fh: | |
spreadsheet_list.append(row) | |
return(spreadsheet_list) | |
def get_spreadsheet_row(spreadsheet_list, old_data_id, repository): | |
response = None | |
for row in spreadsheet_list: | |
try: | |
if repository=="Pangaea": | |
if old_data_id in row["data citation (dataset doi) OLD"]: | |
return(row) | |
elif repository=="GEOROC": | |
if old_data_id == row["GEOROC-ID"]: | |
return(row) | |
elif repository=="ArrayExpress": | |
if old_data_id == row["accession"]: | |
return(row) | |
elif repository=="GEO": | |
if old_data_id == row["accession"]: | |
return(row) | |
elif repository=="TreeBase": | |
if old_data_id == row["data number (legacy ID)"]: | |
return(row) | |
except TypeError: | |
response = None | |
return(response) | |
def get_id_from_filename(filename, repository): | |
try: | |
if repository=="Pangaea": | |
data_doi_from_filename = re.search(r"(?P<doi>10.\d+\S+).txt", filename).group("doi") | |
old_data_id = data_doi_from_filename.replace(":", "/") | |
elif repository=="GEOROC": | |
old_data_id = filename.replace(".txt", "") | |
old_data_id = old_data_id.replace("GEOROC/", "") | |
elif repository=="ArrayExpress": | |
old_data_id = filename.replace(".txt", "") | |
old_data_id = old_data_id.replace("ArrayExpress/", "") | |
elif repository=="GEO": | |
old_data_id = re.search(r"GEO/(?P<id>GSE\d+)", filename).group("id") | |
elif repository=="TreeBase": | |
old_data_id = re.search(r"TreeBase/(?P<id>S\d+)", filename).group("id") | |
except AttributeError: | |
old_data_id = filename | |
return(old_data_id) | |
def get_number_citations(row): | |
try: | |
number_citations = int(row['WoS Cited by how many?']) | |
except ValueError: | |
number_citations = None | |
return(number_citations) | |
def get_new_data_id(row, repository): | |
if repository=="Pangaea": | |
data_id = row["data citation (dataset doi) NEW"].replace("http://doi.pangaea.de/", "") | |
elif repository=="GEOROC": | |
data_id = row["GEOROC-ID"] | |
elif repository=="ArrayExpress": | |
data_id = row["accession"] | |
elif repository=="GEO": | |
data_id = row["accession"] | |
elif repository=="TreeBase": | |
data_id = row["data number (legacy ID)"] | |
return(data_id) | |
def get_data_collection_article(row): | |
data_collection_article = row["article reference"] | |
return(data_collection_article) | |
def write_bib_out(bib_data, filename): | |
w = bibtex_out.Writer() | |
# stream = sys.stdout | |
stream = codecs.open(filename, "w", "utf-8") | |
w.write_stream(bib_data, stream) | |
stream.close() | |
def annotate_bib_with_citation_info(bib_data, spreadsheet_list, repository): | |
for entry in bib_data.entries: | |
filename = bib_data.entries[entry].fields["filename"] | |
old_data_id = get_id_from_filename(filename, repository) | |
spreadsheet_row = get_spreadsheet_row(spreadsheet_list, old_data_id, repository) | |
if not spreadsheet_row: | |
print "no spreadsheet row", filename, old_data_id | |
continue | |
number_citations = get_number_citations(spreadsheet_row) | |
if not number_citations: | |
print "no spreadsheet row", filename, old_data_id | |
continue | |
bib_data.entries[entry].fields["number_total_citations_to_dataset"] = str(number_citations) | |
if "annote" not in bib_data.entries[entry].fields.keys(): | |
bib_data.entries[entry].fields["annote"] = "" | |
bib_data.entries[entry].fields["data_id"] = get_new_data_id(spreadsheet_row, repository) | |
bib_data.entries[entry].fields["annote"] += "; data_id:" + bib_data.entries[entry].fields["data_id"] | |
bib_data.entries[entry].fields["data_collection_article"] = get_data_collection_article(spreadsheet_row) | |
bib_data.entries[entry].fields["annote"] += "; data_collection_article:" + bib_data.entries[entry].fields["data_collection_article"] | |
bib_data.entries[entry].fields["repository"] = repository | |
if "mendeley-tags" not in bib_data.entries[entry].fields.keys(): | |
bib_data.entries[entry].fields["mendeley-tags"] = "" | |
bib_data.entries[entry].fields["mendeley-tags"] += "; " + repository | |
return(bib_data) | |
def run_annotation(repository): | |
bib_data = read_bib(repository + "_raw.bib") | |
spreadsheet_list = read_spreadsheet(repository) | |
bib_data_out = annotate_bib_with_citation_info(bib_data, spreadsheet_list, repository) | |
write_bib_out(bib_data_out, repository + "_annotated.bib") | |
def print_groups(repository): | |
groups_dict = defaultdict(list) | |
spreadsheet_list = read_spreadsheet(repository) | |
for spreadsheet_dict in spreadsheet_list: | |
try: | |
number_citations = int(get_number_citations(spreadsheet_dict)) | |
group = get_group(number_citations, group_boundary_pairs) | |
(low, high) = group | |
groups_dict[low] += [number_citations] | |
except: | |
pass | |
for group in groups_dict: | |
print "\t".join([repository, str(group), str(len(groups_dict[group])), str(sum(groups_dict[group]))]) | |
return(groups_dict) | |
def print_dataset_id_and_number_citations(repository): | |
groups_dict = defaultdict(list) | |
spreadsheet_list = read_spreadsheet(repository) | |
for spreadsheet_dict in spreadsheet_list: | |
try: | |
number_citations = int(get_number_citations(spreadsheet_dict)) | |
data_id = get_new_data_id(spreadsheet_dict, repository) | |
print "\t".join([repository, data_id, str(number_citations)]) | |
except: | |
pass | |
#DIR = "Pangaea" | |
#DIR = "GEOROC" | |
#DIR = "GEO" | |
DIR = "TreeBase" | |
#run_annotation(DIR) | |
for repo in ["TreeBase", "Pangaea", "GEO"]: | |
#print_groups(repo) | |
print_dataset_id_and_number_citations(repo) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment