extract author/org/paper from aminer json data toimport neo4j
#!/usr/bin/env python | |
# encoding: utf-8 | |
import json | |
import os | |
import sys | |
import uuid | |
import traceback | |
from concurrent import futures | |
num_workers = 12 | |
folder = '/mnt/lun2/docker_image/neo4j-aminer/out' | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
def extract_author_paper(filename): | |
prefix = folder + '/' + filename.split('_')[-1][:-4] + '_' | |
orgs = [] | |
authors = [] | |
p_a = [] | |
a_o = [] | |
cite_rel = [] | |
all_paper = [] | |
paper_info = {} | |
paper_id = [] | |
name_id = {} | |
org_id = {} | |
for i, node in enumerate(open(filename)): | |
node = json.loads(node) | |
if 'id' not in node: | |
continue | |
if "authors" in node: | |
for a in node['authors']: | |
if 'name' not in a or not a['name']: | |
continue | |
name = a['name'][:500].replace(',', ' ') | |
if name not in name_id: | |
nameId = str(uuid.uuid5(uuid.NAMESPACE_DNS, name.encode('utf-8'))) | |
name_id[name] = nameId | |
nameId = name_id[name] | |
if 'org' in a and a['org']: | |
org = a['org'][:500].replace(",", ' ') | |
if org not in org_id: | |
orgId = str(uuid.uuid5(uuid.NAMESPACE_OID, org.encode('utf-8'))) | |
org_id[org] = orgId | |
orgId = org_id[org] | |
orgs.append(','.join([orgId, org, 'Org'])) | |
a_o.append(','.join((nameId, orgId, 'WORK_IN'))) | |
p_a.append(','.join([nameId, node['id'], 'WRITE'])) | |
authors.append(','.join([nameId, name, 'Author'])) | |
pid = node['id'] | |
title = node.get('title', '').replace('\n', '').replace(',', ' ') | |
year = node.get('year', '') | |
publisher = node.get('publisher', '').replace(',', ' ') | |
venue = node.get('venue', '').replace(',', ' ') | |
keywords = ';'.join(node.get('keywords', [])).replace(',', ' ') | |
fos = ';'.join(node.get('fos', [])).replace(',', ' ') | |
n_citation = str(node.get('n_citation', 0)).replace(',', ' ') | |
page_start = node.get('page_start', '').replace(',', ' ') | |
page_end = node.get('page_end', '').replace(',', ' ') | |
doc_type = node.get('doc_type', '').replace(',', ' ') | |
lang = node.get('lang', '').replace(',', ' ') | |
volume = node.get('volume', '').replace(',', ' ') | |
issue = node.get('issue', '').replace(',', ' ') | |
issn = node.get('issn', '').replace(',', ' ') | |
isbn = node.get('isbn', '').replace(',', ' ') | |
doi = node.get('doi', '').replace(',', ' ') | |
pdf = node.get('pdf', '').replace(',', ' ') | |
url = ';'.join(node.get('url', [])).replace(',', ' ') | |
node_info = [pid, title, str(year), publisher, venue, keywords, fos, n_citation, page_start, page_end, | |
doc_type, lang, volume, issue, issn, isbn, doi, pdf, url, 'Paper'] | |
all_paper.append(','.join(node_info)) | |
paper_id.append(pid) # [pid] = node_info | |
# paper_info[pid] = ','.join(node_info) | |
if "references" in node: | |
for j in node['references']: | |
cite_rel.append(','.join([node['id'], j, 'REFERENCE'])) | |
# paper_id.append(j) | |
if i % 500000 == 0: | |
print("the {} txt: {} lines finished".format(prefix[:-1], i)) | |
all_paper = list(set(all_paper)) | |
orgs = list(set(orgs)) | |
a_o = list(set(a_o)) | |
p_a = list(set(p_a)) | |
authors = list(set(authors)) | |
a_p_out = open(prefix + "paper_author.csv", 'w') | |
a_out = open(prefix + "author.csv", 'w') | |
o_out = open(prefix + "org.csv", 'w') | |
a_o_out = open(prefix + "author_org.csv", 'w') | |
rel_file = open(prefix + "cite_relation.csv", 'w') | |
node_file = open(prefix + "paper.csv", 'w') | |
a_p_out.write(":START_ID,:END_ID,:TYPE\n") | |
a_o_out.write(":START_ID,:END_ID,:TYPE\n") | |
rel_file.write(':START_ID,:END_ID,:TYPE\n') | |
node_file.write('paperId:ID,title,year,publisher,venue,keywords,fos,n_citation,page_start,page_end,doc_type,lang,volume,issue,issn,isbn,doi,pdf,url,:LABEL\n') | |
a_out.write("authorId:ID,name,:LABEL\n") | |
o_out.write("orgId:ID,name,:LABEL\n") | |
o_out.write('\n'.join(orgs).replace("'", '').replace('"', '').encode("utf-8")) | |
o_out.close() | |
del orgs | |
a_o_out.write('\n'.join(a_o).replace("'", '').replace('"', '').encode("utf-8")) | |
a_o_out.close() | |
del a_o | |
a_p_out.write("\n".join(p_a).replace("'", '').replace('"', '').encode("utf-8")) | |
a_p_out.close() | |
del p_a | |
a_out.write("\n".join(authors).replace("'", '').replace('"', '').encode("utf-8")) | |
a_out.close() | |
del authors | |
node_file.write("\n".join(all_paper).replace("'", '').replace('"', '').encode("utf-8")) | |
node_file.close() | |
del all_paper | |
rel_file.write('\n'.join(cite_rel).encode('utf-8')) | |
rel_file.close() | |
del cite_rel | |
def main(): | |
source = '/mnt/lun2/aminer/www.aminer.cn/aminer_papers' | |
all_file = [os.path.join(source, i) for i in os.listdir(source)] | |
with futures.ProcessPoolExecutor(num_workers) as executor: | |
f_tasks = {executor.submit(extract_author_paper, filename): filename for filename in all_file} | |
for f in f_tasks: | |
if f.running(): | |
print("{} is running".format(f)) | |
for f in futures.as_completed(f_tasks): | |
fn = f_tasks[f] | |
try: | |
f.result() | |
print("{} has finished.".format(fn[44:])) | |
except Exception as e: | |
print traceback.format_exc() | |
print("{} has an exception: {}".format(fn[44:], e)) | |
if __name__ == '__main__': | |
main() | |
# extract_author_paper('/data/aminer_papers/aminer_papers_10.txt') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment