Created
August 6, 2012 17:33
-
-
Save amergin/3276978 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Modified lightsocket import script, based on Lightsocket | |
# example script by James Thornton (http://jamesthornton.com) | |
# mundane modification stuff | |
import os | |
import commands | |
import sys | |
import datetime | |
import re # regexp | |
import time | |
# make sure appropriate classes are available: | |
jars = [ '/home/lineo4j/neo4j-community-1.8.M06/lib/geronimo-jta_1.1_spec-1.1.1.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/lucene-core-3.5.0.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-cypher-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-graph-algo-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-graph-matching-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-jmx-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-kernel-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-lucene-index-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-shell-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-udc-1.8.M06.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/org.apache.servicemix.bundles.jline-0.9.94_1.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/scala-library-2.9.1-1.jar', | |
'/home/lineo4j/neo4j-community-1.8.M06/lib/server-api-1.8.M06.jar' ] | |
for jar in jars: | |
sys.path.append(jar) | |
# java libraries for jython | |
from java.lang import Long, Double, Integer, String | |
from org.neo4j.graphdb import RelationshipType | |
from org.neo4j.unsafe.batchinsert import BatchInserter | |
from org.neo4j.unsafe.batchinsert import BatchInserters | |
from org.neo4j.unsafe.batchinsert import BatchInserterImpl | |
from org.neo4j.unsafe.batchinsert import BatchInserterIndexProvider | |
from org.neo4j.unsafe.batchinsert import BatchInserterIndex | |
from org.neo4j.unsafe.batchinsert import LuceneBatchInserterIndexProvider | |
from org.neo4j.graphdb import RelationshipType | |
def getLineDict( header, line, elementTypes ): | |
columns = line.strip("\n").split("\t") | |
returnDict = dict() | |
for key,val in dict(zip(header,columns)).iteritems(): | |
#print "key=" + str(key) | |
#print "val=" + str(val) | |
#print "elementTypes=" + str(elementTypes) | |
valtype = elementTypes.get(key) | |
#print "elementTypes=" + str(elementTypes) | |
#print "key:" + str(key) | |
#print "val:" + str(val) | |
#print "typeof val:" + str(type(val)) | |
if( valtype == 'varchar' or valtype == 'longtext' ): | |
returnDict[key] = String(val) | |
elif( valtype == 'int' ): | |
returnDict[key] = Integer(val) | |
elif( valtype == 'double' ): | |
if( val == 'NULL' ): | |
# drop this attribute from node if NULL | |
continue | |
returnDict[key] = Double(val) | |
return returnDict | |
# subclass my own relationshiptype | |
class DefaultRelationshipType(RelationshipType): | |
def name(self): | |
return String('ASSOCIATION') | |
class ImportClient(object): | |
def __init__(self, datasetLabel, directory, nodeDumpFile, edgeDumpFile, patientDumpfile): | |
self.label = datasetLabel | |
self.dir = directory | |
self.start_time = None | |
configDict = { | |
"neostore.nodestore.db.mapped_memory": "500M", | |
"neostore.relationshipstore.db.mapped_memory": "4G", | |
"neostore.propertystore.db.mapped_memory": "100M", | |
"neostore.propertystore.db.strings.mapped_memory": "500M", | |
"neostore.propertystore.db.arrays.mapped_memory": "0M", | |
"neostore.propertystore.db.index.keys.mapped_memory": "400M", | |
"neostore.propertystore.db.index.mapped_memory": "300M" } | |
self.db = BatchInserters.inserter( directory, configDict ) | |
self.indexProvider = LuceneBatchInserterIndexProvider( self.db ) | |
# key-valuepair where key is alias of node and value the nodeId | |
self.nodeHash = dict() | |
node_indices = { | |
'CLIN': self.label + '_n_CLIN', 'GEXP': self.label + '_n_GEXP', | |
'METH': self.label + '_n_METH', 'CNVR': self.label + '_n_CNVR', | |
'info': self.label + '_n_info'} | |
edge_indices = { | |
'ASSOCIATION': self.label + '_e_ASSOC', 'DISTANCE': self.label + '_e_DIST' | |
} | |
self.indexNames = {'nodes' : node_indices, 'edges': edge_indices } | |
self.indices = {'nodes': dict(), 'edges': dict(), 'info': dict()} | |
self.nodeDumpFile = nodeDumpFile | |
self.edgeDumpFile = edgeDumpFile | |
self.patientDumpFile = patientDumpFile | |
self.flushInterval = 500000 | |
self.edgeType = DefaultRelationshipType() | |
def start_timer(self): | |
self.start_time = time.time() | |
def stop_timer(self): | |
end_time = time.time() | |
run_time = end_time - self.start_time | |
print "Runtime: " + str( run_time ) | |
def _finish(self): | |
self.indexProvider.shutdown() | |
self.db.shutdown() | |
def _createIndices(self): | |
for key,val in self.indexNames['nodes'].iteritems(): | |
self.indices['nodes'][ String(key) ] = self.indexProvider.nodeIndex( val, {'provider': 'lucene', 'type': 'fulltext'} ) | |
for key,val in self.indexNames['edges'].iteritems(): | |
self.indices['edges'][ String(key) ] = self.indexProvider.relationshipIndex( val, {'provider': 'lucene', 'type': 'fulltext'} ) | |
# creates a single node per dataset that contains | |
# patient barcodes and later possibly other stuff | |
def _createInfoNode(self): | |
patient_file = open( self.patientDumpFile, 'r' ) | |
elementTypes = dict() | |
header = [] | |
for lineno, line in enumerate( patient_file ): | |
if( lineno == 0 ): | |
for cols in line.strip("\n").split("\t"): | |
ele = cols.split("__") | |
header.append( ele[0] ) | |
elementTypes[ ele[0] ] = ele[1] | |
continue | |
lineDict = getLineDict( header, line, elementTypes ) | |
nodeId = self.db.createNode( lineDict ) | |
# should only be header + 1 line in file | |
self.indices['nodes'][String('info')].add( nodeId, lineDict ) | |
def _importNodes(self): | |
print "Creating and indexing nodes" | |
node_file = open(self.nodeDumpFile, 'r') | |
nodeTypes = dict() | |
header = [] | |
addedCount = 0 | |
lineDict = dict() | |
for lineno,line in enumerate( node_file ): | |
# create a dictionary that contains every column type, eg. | |
# nodeTypes['chr'] = 'varchar' | |
if ( lineno == 0 ): | |
for cols in line.strip("\n").split("\t"): | |
ele = cols.split("__") | |
header.append( ele[0] ) | |
nodeTypes[ ele[0] ] = ele[1] | |
continue | |
lineDict = getLineDict(header, line, nodeTypes) | |
#print "lineDict=" + str(lineDict) | |
#print "header=" + str(header) | |
#print "line=" + str(line) | |
#print "source: " + lineDict.get('source') | |
#print "nodeID: " + str(nodeId) | |
nodeId = self.db.createNode( lineDict ) | |
addedCount += 1 | |
# store the created id under the node alias | |
self.nodeHash[ lineDict.get( 'alias' ) ] = nodeId | |
# add to suitable index: | |
#print "self.indices.nodes:" | |
#print self.indices['nodes'] | |
#print "source:" | |
#print lineDict.get('source') | |
#print "type:" + str(type(lineDict.get('source'))) | |
#print self.indices['nodes']['GEXP'] | |
#print "indice:" | |
self.indices['nodes'][ lineDict.get('source') ].add( nodeId, lineDict ) | |
if (addedCount % self.flushInterval == 0): | |
self._flushIndices(indexType='nodes') | |
addedCount = 0 | |
self.display_progress(lineno) | |
node_file.close() | |
self._flushIndices(indexType='nodes') | |
print "Finished creating and indexing nodes" | |
def _importEdges(self): | |
print "Creating and indexing relationships" | |
# are one of the regions contained within the other or the same region? | |
def regionIsSubset(region1, region2): | |
assert(len(region1) == 2 and len(region2) == 2 ) | |
return ( ( region1[0] >= region2[0] and region1[1] <= region2[1] ) or | |
(region2[0] >= region1[0] and region2[1] <= region1[1] ) ) | |
header = [] | |
edgeTypes = dict() | |
addedCount = 0 | |
edge_file = open( self.edgeDumpFile, 'r') | |
for lineno,line in enumerate( edge_file ): | |
if( lineno == 0 ): | |
for cols in line.strip("\n").split("\t"): | |
ele = cols.split("__") | |
header.append( ele[0] ) | |
edgeTypes[ ele[0] ] = ele[1] | |
continue | |
lineDict = getLineDict(header, line, edgeTypes) | |
sourceNodeId = self.nodeHash.get( lineDict.get( 'alias1' ) ) | |
targetNodeId = self.nodeHash.get( lineDict.get( 'alias2' ) ) | |
f1chr = re.findall(r'\d+', str(lineDict.get('f1chr')) ) | |
f2chr = re.findall(r'\d+', str(lineDict.get('f2chr')) ) | |
distance = None | |
# calculate chromosomal region distance | |
if(len(f1chr) != 0 and len(f2chr) != 0 ): | |
try: | |
f1chr = f1chr[0] #int(f1chr[0] ) | |
f2chr = f2chr[0] #int(f2chr[0] ) | |
if( (f1chr == f2chr) and ( f1chr <= 22 and f2chr <= 22 ) ): | |
f2start = lineDict['f2start'] #int(lineDict['f2start'] ) | |
f2end = lineDict['f2end'] #int(lineDict['f2end'] ) | |
f1start = lineDict['f1start'] #int(lineDict['f1start'] ) | |
f1end = lineDict['f1end'] #int(lineDict['f1end'] ) | |
f1 = [ f1start, f1end ] | |
f2 = [ f2start, f2end ] | |
f1.sort() | |
f2.sort() | |
if(( f1[0] < f2[1] < f1[1] ) or ( f1[0] < f2[0] < f1[1] ) ): | |
# regions overlap | |
#print "DIST=0: f1,f2" | |
#print f1,f2 | |
distance = 0 | |
else: | |
if( regionIsSubset(f1, f2) ): | |
#print "SUBSET:" | |
#print f1,f2 | |
distance = 0 | |
else: | |
distance = sorted( [ abs( f2[ | |
0] - f1[1] ), abs( f1[0] - f2[1] ) ] )[0] | |
except ValueError: | |
pass | |
if distance: | |
lineDict['distance'] = Integer( distance ) | |
edgeId = self.db.createRelationship( sourceNodeId, targetNodeId, self.edgeType, lineDict ) | |
# add to suitable indices: | |
self.indices['edges'][String('ASSOCIATION')].add( edgeId, lineDict ) | |
if lineDict.get('distance'): | |
self.indices['edges'][String('DISTANCE')].add( edgeId, lineDict ) | |
if (addedCount % self.flushInterval == 0): | |
self._flushIndices(indexType='edges') | |
addedCount = 0 | |
self.display_progress(lineno) | |
edge_file.close() | |
self._flushIndices(indexType='edges') | |
print "Finished creating and indexing relationships" | |
def _flushIndices(self, indexType='edges'): | |
for key,val in self.indices[indexType].iteritems(): | |
val.flush() | |
def startImport(self): | |
print "Starting import process" | |
self.start_timer() | |
self._createIndices() | |
self._createInfoNode() | |
self._importNodes() | |
self._importEdges() | |
self._finish() | |
print "-------------------------------" | |
print "Finished importing" | |
self.stop_timer() | |
def display_progress(self, lineno): | |
if self.start_time: | |
if ( lineno % 200000 ) == 0: | |
elapsed = time.time() - self.start_time | |
print "%i Elements in %i seconds" %(lineno+1, int(elapsed)) | |
if __name__ == "__main__": | |
if( len( sys.argv ) is not 6 ): | |
print "Usage is py2.7 client.py datasetLabel targetDirectory nodeDumpFile edgeDumpFile patientDumpfile" | |
sys.exit(-1) | |
print "Data import started at %s" %( str(datetime.datetime.now()) ) | |
datasetLabel = sys.argv[1] | |
directory = sys.argv[2] | |
nodeDumpFile = sys.argv[3] | |
edgeDumpFile = sys.argv[4] | |
patientDumpFile = sys.argv[5] | |
if not os.access( nodeDumpFile, os.R_OK ): | |
print "Could not find file %s" %(nodeDumpFile) | |
sys.exit(-1) | |
if not os.access( edgeDumpFile, os.R_OK ): | |
print "Could not find file %s" %(edgeDumpFile) | |
sys.exit(-1) | |
if not os.access( patientDumpFile, os.R_OK ): | |
print "Could not find file %s" %(patientDumpFile) | |
sys.exit(-1) | |
client = ImportClient(datasetLabel, directory, nodeDumpFile, edgeDumpFile, patientDumpFile) | |
client.startImport() | |
print "Data import ended at %s" %( str(datetime.datetime.now()) ) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment