Skip to content

Instantly share code, notes, and snippets.

@amergin
Created August 6, 2012 17:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amergin/3276978 to your computer and use it in GitHub Desktop.
Save amergin/3276978 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# Modified lightsocket import script, based on Lightsocket
# example script by James Thornton (http://jamesthornton.com)
# mundane modification stuff
import os
import commands
import sys
import datetime
import re # regexp
import time
# make sure appropriate classes are available:
jars = [ '/home/lineo4j/neo4j-community-1.8.M06/lib/geronimo-jta_1.1_spec-1.1.1.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/lucene-core-3.5.0.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-cypher-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-graph-algo-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-graph-matching-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-jmx-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-kernel-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-lucene-index-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-shell-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/neo4j-udc-1.8.M06.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/org.apache.servicemix.bundles.jline-0.9.94_1.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/scala-library-2.9.1-1.jar',
'/home/lineo4j/neo4j-community-1.8.M06/lib/server-api-1.8.M06.jar' ]
for jar in jars:
sys.path.append(jar)
# java libraries for jython
from java.lang import Long, Double, Integer, String
from org.neo4j.graphdb import RelationshipType
from org.neo4j.unsafe.batchinsert import BatchInserter
from org.neo4j.unsafe.batchinsert import BatchInserters
from org.neo4j.unsafe.batchinsert import BatchInserterImpl
from org.neo4j.unsafe.batchinsert import BatchInserterIndexProvider
from org.neo4j.unsafe.batchinsert import BatchInserterIndex
from org.neo4j.unsafe.batchinsert import LuceneBatchInserterIndexProvider
from org.neo4j.graphdb import RelationshipType
def getLineDict( header, line, elementTypes ):
columns = line.strip("\n").split("\t")
returnDict = dict()
for key,val in dict(zip(header,columns)).iteritems():
#print "key=" + str(key)
#print "val=" + str(val)
#print "elementTypes=" + str(elementTypes)
valtype = elementTypes.get(key)
#print "elementTypes=" + str(elementTypes)
#print "key:" + str(key)
#print "val:" + str(val)
#print "typeof val:" + str(type(val))
if( valtype == 'varchar' or valtype == 'longtext' ):
returnDict[key] = String(val)
elif( valtype == 'int' ):
returnDict[key] = Integer(val)
elif( valtype == 'double' ):
if( val == 'NULL' ):
# drop this attribute from node if NULL
continue
returnDict[key] = Double(val)
return returnDict
# subclass my own relationshiptype
class DefaultRelationshipType(RelationshipType):
def name(self):
return String('ASSOCIATION')
class ImportClient(object):
def __init__(self, datasetLabel, directory, nodeDumpFile, edgeDumpFile, patientDumpfile):
self.label = datasetLabel
self.dir = directory
self.start_time = None
configDict = {
"neostore.nodestore.db.mapped_memory": "500M",
"neostore.relationshipstore.db.mapped_memory": "4G",
"neostore.propertystore.db.mapped_memory": "100M",
"neostore.propertystore.db.strings.mapped_memory": "500M",
"neostore.propertystore.db.arrays.mapped_memory": "0M",
"neostore.propertystore.db.index.keys.mapped_memory": "400M",
"neostore.propertystore.db.index.mapped_memory": "300M" }
self.db = BatchInserters.inserter( directory, configDict )
self.indexProvider = LuceneBatchInserterIndexProvider( self.db )
# key-valuepair where key is alias of node and value the nodeId
self.nodeHash = dict()
node_indices = {
'CLIN': self.label + '_n_CLIN', 'GEXP': self.label + '_n_GEXP',
'METH': self.label + '_n_METH', 'CNVR': self.label + '_n_CNVR',
'info': self.label + '_n_info'}
edge_indices = {
'ASSOCIATION': self.label + '_e_ASSOC', 'DISTANCE': self.label + '_e_DIST'
}
self.indexNames = {'nodes' : node_indices, 'edges': edge_indices }
self.indices = {'nodes': dict(), 'edges': dict(), 'info': dict()}
self.nodeDumpFile = nodeDumpFile
self.edgeDumpFile = edgeDumpFile
self.patientDumpFile = patientDumpFile
self.flushInterval = 500000
self.edgeType = DefaultRelationshipType()
def start_timer(self):
self.start_time = time.time()
def stop_timer(self):
end_time = time.time()
run_time = end_time - self.start_time
print "Runtime: " + str( run_time )
def _finish(self):
self.indexProvider.shutdown()
self.db.shutdown()
def _createIndices(self):
for key,val in self.indexNames['nodes'].iteritems():
self.indices['nodes'][ String(key) ] = self.indexProvider.nodeIndex( val, {'provider': 'lucene', 'type': 'fulltext'} )
for key,val in self.indexNames['edges'].iteritems():
self.indices['edges'][ String(key) ] = self.indexProvider.relationshipIndex( val, {'provider': 'lucene', 'type': 'fulltext'} )
# creates a single node per dataset that contains
# patient barcodes and later possibly other stuff
def _createInfoNode(self):
patient_file = open( self.patientDumpFile, 'r' )
elementTypes = dict()
header = []
for lineno, line in enumerate( patient_file ):
if( lineno == 0 ):
for cols in line.strip("\n").split("\t"):
ele = cols.split("__")
header.append( ele[0] )
elementTypes[ ele[0] ] = ele[1]
continue
lineDict = getLineDict( header, line, elementTypes )
nodeId = self.db.createNode( lineDict )
# should only be header + 1 line in file
self.indices['nodes'][String('info')].add( nodeId, lineDict )
def _importNodes(self):
print "Creating and indexing nodes"
node_file = open(self.nodeDumpFile, 'r')
nodeTypes = dict()
header = []
addedCount = 0
lineDict = dict()
for lineno,line in enumerate( node_file ):
# create a dictionary that contains every column type, eg.
# nodeTypes['chr'] = 'varchar'
if ( lineno == 0 ):
for cols in line.strip("\n").split("\t"):
ele = cols.split("__")
header.append( ele[0] )
nodeTypes[ ele[0] ] = ele[1]
continue
lineDict = getLineDict(header, line, nodeTypes)
#print "lineDict=" + str(lineDict)
#print "header=" + str(header)
#print "line=" + str(line)
#print "source: " + lineDict.get('source')
#print "nodeID: " + str(nodeId)
nodeId = self.db.createNode( lineDict )
addedCount += 1
# store the created id under the node alias
self.nodeHash[ lineDict.get( 'alias' ) ] = nodeId
# add to suitable index:
#print "self.indices.nodes:"
#print self.indices['nodes']
#print "source:"
#print lineDict.get('source')
#print "type:" + str(type(lineDict.get('source')))
#print self.indices['nodes']['GEXP']
#print "indice:"
self.indices['nodes'][ lineDict.get('source') ].add( nodeId, lineDict )
if (addedCount % self.flushInterval == 0):
self._flushIndices(indexType='nodes')
addedCount = 0
self.display_progress(lineno)
node_file.close()
self._flushIndices(indexType='nodes')
print "Finished creating and indexing nodes"
def _importEdges(self):
print "Creating and indexing relationships"
# are one of the regions contained within the other or the same region?
def regionIsSubset(region1, region2):
assert(len(region1) == 2 and len(region2) == 2 )
return ( ( region1[0] >= region2[0] and region1[1] <= region2[1] ) or
(region2[0] >= region1[0] and region2[1] <= region1[1] ) )
header = []
edgeTypes = dict()
addedCount = 0
edge_file = open( self.edgeDumpFile, 'r')
for lineno,line in enumerate( edge_file ):
if( lineno == 0 ):
for cols in line.strip("\n").split("\t"):
ele = cols.split("__")
header.append( ele[0] )
edgeTypes[ ele[0] ] = ele[1]
continue
lineDict = getLineDict(header, line, edgeTypes)
sourceNodeId = self.nodeHash.get( lineDict.get( 'alias1' ) )
targetNodeId = self.nodeHash.get( lineDict.get( 'alias2' ) )
f1chr = re.findall(r'\d+', str(lineDict.get('f1chr')) )
f2chr = re.findall(r'\d+', str(lineDict.get('f2chr')) )
distance = None
# calculate chromosomal region distance
if(len(f1chr) != 0 and len(f2chr) != 0 ):
try:
f1chr = f1chr[0] #int(f1chr[0] )
f2chr = f2chr[0] #int(f2chr[0] )
if( (f1chr == f2chr) and ( f1chr <= 22 and f2chr <= 22 ) ):
f2start = lineDict['f2start'] #int(lineDict['f2start'] )
f2end = lineDict['f2end'] #int(lineDict['f2end'] )
f1start = lineDict['f1start'] #int(lineDict['f1start'] )
f1end = lineDict['f1end'] #int(lineDict['f1end'] )
f1 = [ f1start, f1end ]
f2 = [ f2start, f2end ]
f1.sort()
f2.sort()
if(( f1[0] < f2[1] < f1[1] ) or ( f1[0] < f2[0] < f1[1] ) ):
# regions overlap
#print "DIST=0: f1,f2"
#print f1,f2
distance = 0
else:
if( regionIsSubset(f1, f2) ):
#print "SUBSET:"
#print f1,f2
distance = 0
else:
distance = sorted( [ abs( f2[
0] - f1[1] ), abs( f1[0] - f2[1] ) ] )[0]
except ValueError:
pass
if distance:
lineDict['distance'] = Integer( distance )
edgeId = self.db.createRelationship( sourceNodeId, targetNodeId, self.edgeType, lineDict )
# add to suitable indices:
self.indices['edges'][String('ASSOCIATION')].add( edgeId, lineDict )
if lineDict.get('distance'):
self.indices['edges'][String('DISTANCE')].add( edgeId, lineDict )
if (addedCount % self.flushInterval == 0):
self._flushIndices(indexType='edges')
addedCount = 0
self.display_progress(lineno)
edge_file.close()
self._flushIndices(indexType='edges')
print "Finished creating and indexing relationships"
def _flushIndices(self, indexType='edges'):
for key,val in self.indices[indexType].iteritems():
val.flush()
def startImport(self):
print "Starting import process"
self.start_timer()
self._createIndices()
self._createInfoNode()
self._importNodes()
self._importEdges()
self._finish()
print "-------------------------------"
print "Finished importing"
self.stop_timer()
def display_progress(self, lineno):
if self.start_time:
if ( lineno % 200000 ) == 0:
elapsed = time.time() - self.start_time
print "%i Elements in %i seconds" %(lineno+1, int(elapsed))
if __name__ == "__main__":
if( len( sys.argv ) is not 6 ):
print "Usage is py2.7 client.py datasetLabel targetDirectory nodeDumpFile edgeDumpFile patientDumpfile"
sys.exit(-1)
print "Data import started at %s" %( str(datetime.datetime.now()) )
datasetLabel = sys.argv[1]
directory = sys.argv[2]
nodeDumpFile = sys.argv[3]
edgeDumpFile = sys.argv[4]
patientDumpFile = sys.argv[5]
if not os.access( nodeDumpFile, os.R_OK ):
print "Could not find file %s" %(nodeDumpFile)
sys.exit(-1)
if not os.access( edgeDumpFile, os.R_OK ):
print "Could not find file %s" %(edgeDumpFile)
sys.exit(-1)
if not os.access( patientDumpFile, os.R_OK ):
print "Could not find file %s" %(patientDumpFile)
sys.exit(-1)
client = ImportClient(datasetLabel, directory, nodeDumpFile, edgeDumpFile, patientDumpFile)
client.startImport()
print "Data import ended at %s" %( str(datetime.datetime.now()) )
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment