Skip to content

Instantly share code, notes, and snippets.

@hpiwowar
Created June 23, 2011 23:25
Show Gist options
  • Save hpiwowar/1043881 to your computer and use it in GitHub Desktop.
Save hpiwowar/1043881 to your computer and use it in GitHub Desktop.
For annotating bibtex files using article info from customized spreadsheets
#!/usr/bin/env python
# Initially written by Heather Piwowar, June 2011
# Public domain: have at it!
# For annotating bibtex files, pulling in article info from customized spreadsheets
import csv
import re
import codecs
from pybtex.database.input import bibtex as bibtex_in
from pybtex.database.output import bibtex as bibtex_out
from operator import itemgetter, attrgetter
import pprint
import sys
from sample_bibtex import group_boundary_pairs
from sample_bibtex import get_group
from collections import defaultdict
def read_bib(filename):
parser = bibtex_in.Parser()
bib_data = parser.parse_file(filename)
#print(bib_data.entries['1'].fields['email'])
return(bib_data)
def read_spreadsheet(repository):
dataset_sample_fh = csv.DictReader(open(repository + "_datasets.csv", "r"))
spreadsheet_list = []
for row in dataset_sample_fh:
spreadsheet_list.append(row)
return(spreadsheet_list)
def get_spreadsheet_row(spreadsheet_list, old_data_id, repository):
response = None
for row in spreadsheet_list:
try:
if repository=="Pangaea":
if old_data_id in row["data citation (dataset doi) OLD"]:
return(row)
elif repository=="GEOROC":
if old_data_id == row["GEOROC-ID"]:
return(row)
elif repository=="ArrayExpress":
if old_data_id == row["accession"]:
return(row)
elif repository=="GEO":
if old_data_id == row["accession"]:
return(row)
elif repository=="TreeBase":
if old_data_id == row["data number (legacy ID)"]:
return(row)
except TypeError:
response = None
return(response)
def get_id_from_filename(filename, repository):
try:
if repository=="Pangaea":
data_doi_from_filename = re.search(r"(?P<doi>10.\d+\S+).txt", filename).group("doi")
old_data_id = data_doi_from_filename.replace(":", "/")
elif repository=="GEOROC":
old_data_id = filename.replace(".txt", "")
old_data_id = old_data_id.replace("GEOROC/", "")
elif repository=="ArrayExpress":
old_data_id = filename.replace(".txt", "")
old_data_id = old_data_id.replace("ArrayExpress/", "")
elif repository=="GEO":
old_data_id = re.search(r"GEO/(?P<id>GSE\d+)", filename).group("id")
elif repository=="TreeBase":
old_data_id = re.search(r"TreeBase/(?P<id>S\d+)", filename).group("id")
except AttributeError:
old_data_id = filename
return(old_data_id)
def get_number_citations(row):
try:
number_citations = int(row['WoS Cited by how many?'])
except ValueError:
number_citations = None
return(number_citations)
def get_new_data_id(row, repository):
if repository=="Pangaea":
data_id = row["data citation (dataset doi) NEW"].replace("http://doi.pangaea.de/", "")
elif repository=="GEOROC":
data_id = row["GEOROC-ID"]
elif repository=="ArrayExpress":
data_id = row["accession"]
elif repository=="GEO":
data_id = row["accession"]
elif repository=="TreeBase":
data_id = row["data number (legacy ID)"]
return(data_id)
def get_data_collection_article(row):
data_collection_article = row["article reference"]
return(data_collection_article)
def write_bib_out(bib_data, filename):
w = bibtex_out.Writer()
# stream = sys.stdout
stream = codecs.open(filename, "w", "utf-8")
w.write_stream(bib_data, stream)
stream.close()
def annotate_bib_with_citation_info(bib_data, spreadsheet_list, repository):
for entry in bib_data.entries:
filename = bib_data.entries[entry].fields["filename"]
old_data_id = get_id_from_filename(filename, repository)
spreadsheet_row = get_spreadsheet_row(spreadsheet_list, old_data_id, repository)
if not spreadsheet_row:
print "no spreadsheet row", filename, old_data_id
continue
number_citations = get_number_citations(spreadsheet_row)
if not number_citations:
print "no spreadsheet row", filename, old_data_id
continue
bib_data.entries[entry].fields["number_total_citations_to_dataset"] = str(number_citations)
if "annote" not in bib_data.entries[entry].fields.keys():
bib_data.entries[entry].fields["annote"] = ""
bib_data.entries[entry].fields["data_id"] = get_new_data_id(spreadsheet_row, repository)
bib_data.entries[entry].fields["annote"] += "; data_id:" + bib_data.entries[entry].fields["data_id"]
bib_data.entries[entry].fields["data_collection_article"] = get_data_collection_article(spreadsheet_row)
bib_data.entries[entry].fields["annote"] += "; data_collection_article:" + bib_data.entries[entry].fields["data_collection_article"]
bib_data.entries[entry].fields["repository"] = repository
if "mendeley-tags" not in bib_data.entries[entry].fields.keys():
bib_data.entries[entry].fields["mendeley-tags"] = ""
bib_data.entries[entry].fields["mendeley-tags"] += "; " + repository
return(bib_data)
def run_annotation(repository):
bib_data = read_bib(repository + "_raw.bib")
spreadsheet_list = read_spreadsheet(repository)
bib_data_out = annotate_bib_with_citation_info(bib_data, spreadsheet_list, repository)
write_bib_out(bib_data_out, repository + "_annotated.bib")
def print_groups(repository):
groups_dict = defaultdict(list)
spreadsheet_list = read_spreadsheet(repository)
for spreadsheet_dict in spreadsheet_list:
try:
number_citations = int(get_number_citations(spreadsheet_dict))
group = get_group(number_citations, group_boundary_pairs)
(low, high) = group
groups_dict[low] += [number_citations]
except:
pass
for group in groups_dict:
print "\t".join([repository, str(group), str(len(groups_dict[group])), str(sum(groups_dict[group]))])
return(groups_dict)
def print_dataset_id_and_number_citations(repository):
groups_dict = defaultdict(list)
spreadsheet_list = read_spreadsheet(repository)
for spreadsheet_dict in spreadsheet_list:
try:
number_citations = int(get_number_citations(spreadsheet_dict))
data_id = get_new_data_id(spreadsheet_dict, repository)
print "\t".join([repository, data_id, str(number_citations)])
except:
pass
#DIR = "Pangaea"
#DIR = "GEOROC"
#DIR = "GEO"
DIR = "TreeBase"
#run_annotation(DIR)
for repo in ["TreeBase", "Pangaea", "GEO"]:
#print_groups(repo)
print_dataset_id_and_number_citations(repo)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment