This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Modified lightsocket import script, based on Lightsocket | |
# example script by James Thornton (http://jamesthornton.com) | |
# mundane modification stuff | |
import os | |
import commands | |
import sys | |
import datetime | |
import re # regexp | |
import time | |
# concurrency support for importing edges: | |
import multiprocessing | |
# random access to files: | |
import linecache | |
# lightsocket specific libraries | |
import zmq | |
import json | |
def getLineDict( header, line ): | |
columns = line.strip("\n").split("\t") | |
return dict(zip( header, columns ) ) | |
class ImportClient(object): | |
def __init__(self, datasetLabel, server="tcp://localhost:5555"): | |
self.server = server | |
self.socket = self._connect() | |
self.count = 0 | |
self.start_time = None | |
self.label = datasetLabel | |
self.edgeInsertionWorkers = 5 | |
# info index will contain patient barcodes and posible other meta | |
# information | |
node_indices = { | |
'CLIN': self.label + '_n_CLIN', 'GEXP': self.label + '_n_GEXP', | |
'METH': self.label + '_n_METH', 'CNVR': self.label + '_n_CNVR', | |
'info': self.label + '_n_info'} | |
edge_indices = { | |
'ASSOCIATION': self.label + '_e_ASSOC', 'DISTANCE': self.label + '_e_DIST' | |
} | |
self.indices = {'nodes' : node_indices, 'edges': edge_indices } | |
self.nodeHash = dict() | |
def _connect(self): | |
print "Connecting to Lightsocket..." | |
context = zmq.Context(8) | |
socket = context.socket(zmq.REQ) | |
socket.connect(self.server) | |
return socket | |
def send_request(self, path,params,data): | |
request = dict(path=path, params=params,data=data) | |
message = json.dumps(request) | |
self.socket.send(message) | |
raw = self.socket.recv() | |
resp = json.loads(raw) | |
return resp | |
def start_timer(self): | |
self.start_time = time.time() | |
def stop_timer(self): | |
end_time = time.time() | |
run_time = end_time - self.start_time | |
print "Requests Per Second: ", self.count / run_time | |
def display_progress(self): | |
if self.start_time: | |
self.count = self.count + 1 | |
if (self.count % 200000) == 0: | |
elapsed = time.time() - self.start_time | |
print self.count, elapsed, elapsed / \ | |
self.count, self.count / elapsed | |
def startImport(self, nodeDumpFile, edgeDumpFile, patientDumpfile): | |
print "Starting import process" | |
self.start_timer() | |
self._createIndices() | |
self._createInfoNode(patientDumpfile) | |
self._importNodes(nodeDumpFile) | |
# initialize edgeproxy by sending header & edgeType information: | |
edgeFile = file(edgeDumpFile, 'r') | |
pathInit = "import/edges/initializeElements" | |
line = edgeFile.readline() | |
edgeFile.close() | |
header = [] | |
edgeTypes = dict() | |
for cols in line.strip("\n").split("\t"): | |
ele = cols.split("__") | |
header.append(ele[0] ) | |
edgeTypes[ele[0] ] = ele[1] | |
edgeTypes["distance" ] = "int" | |
self.send_request(pathInit, params={}, data={ 'edgeTypes': | |
edgeTypes, 'indices': self.indices['edges'] } ) | |
# the time consuming part: importing edges. divide the edge dump | |
# reading and insertion into N parts and handle the insertion concurrently: | |
# get number of lines | |
status, output = commands.getstatusoutput("wc -l %s | awk '{ print $1 }'" %edgeDumpFile ) | |
noLines = int( output ) | |
print "Edge dump file contains %i lines" %noLines | |
# determine the partitions: | |
processes = [] | |
startline = 0 | |
endline = 0 | |
for i in range(0, self.edgeInsertionWorkers ): | |
if( i is not 0 ): | |
startline = ( noLines / self.edgeInsertionWorkers ) * i | |
if( i is self.edgeInsertionWorkers-1 ): | |
endline = (noLines / self.edgeInsertionWorkers) \ | |
* self.edgeInsertionWorkers + (noLines % self.edgeInsertionWorkers) - 1 | |
else: | |
endline = ( noLines / self.edgeInsertionWorkers ) * (i+1) - 1 | |
# every worker calls self._importEdges | |
p = multiprocessing.Process( \ | |
target=self._importEdges, args=(i, edgeDumpFile, startline, endline, header) ) | |
processes.append( p ) | |
# start individual process | |
p.start() | |
# wait for all processes to finish before continuing here | |
for p in processes: | |
p.join() | |
self.stop_timer() | |
print "Finished importing process" | |
# creates info node containing the barcodes | |
def _createInfoNode(self, patientDumpfile): | |
patientFile = open( patientDumpfile, 'r') | |
path = "import/vertices/createInfo" | |
nodeTypes = dict() | |
header = [] | |
for lineno,line in enumerate( patientFile ): | |
# create a dictionary that contains every column type | |
if (lineno is 0): | |
for cols in line.strip("\n").split("\t"): | |
ele = cols.split("__") | |
header.append(ele[0]) | |
nodeTypes[ele[0] ] = ele[1] | |
continue | |
lineDict = getLineDict( header, line ) | |
resp = self.send_request( | |
path, params={}, data={ 'types': nodeTypes, 'values': lineDict } ) | |
patientFile.close() | |
def _flushIndices(self): | |
flushPath = "import/indices/flush" | |
#self.send_request(flushPath, params={}, data={}) | |
def _createIndices(self): | |
print "Creating Indices" | |
path = "import/indices/create_manual" | |
for key, val in self.indices['nodes'].iteritems(): | |
self.send_request(path, params={}, data={ 'type': 'node', 'name': { key: val } } ) | |
# self.send_request(path,params={}, data={ 'edges': | |
# self.indices[edges], 'nodes': self.indices[nodes], 'datalabel': | |
# self.label } ) | |
for key, val in self.indices['edges'].iteritems(): | |
self.send_request(path, params={}, data={ 'type': 'edge', 'name': { key: val } } ) | |
self._flushIndices() | |
print "(Empty) indices created" | |
def _importNodes(self, nodeDumpFile): | |
print "Creating and indexing nodes" | |
node_file = open(nodeDumpFile, 'r') | |
# lightsocket service path | |
path = "import/vertices/create" | |
pathInit = "import/vertices/initializeElements" | |
nodeTypes = dict() | |
header = [] | |
for lineno,line in enumerate( node_file.readlines() ): | |
# create a dictionary that contains every column type, eg. | |
# nodeTypes['chr'] = 'varchar' | |
if (lineno is 0): | |
for cols in line.strip("\n").split("\t"): | |
ele = cols.split("__") | |
header.append(ele[0] ) | |
nodeTypes[ele[0] ] = ele[1] | |
# send valuetypes for each column and expected index names | |
self.send_request(pathInit, params={}, data={ 'nodeTypes': | |
nodeTypes, 'indices': self.indices['nodes'] } ) | |
continue | |
lineDict = getLineDict(header, line ) | |
resp = self.send_request( | |
path, params={}, data={ 'values': lineDict } ) | |
# store the created id under the node alias | |
self.nodeHash[lineDict.get( 'alias' ) ] = resp['results']['id'] | |
self.display_progress() | |
print "Finished creating and indexing nodes" | |
#def _importEdges(self, edgeDumpFile): | |
def _importEdges(self, workerNumber, edgeDumpfile, startline, endline, header): | |
print "Worker #%i starting to import edges, line numbers %i-%i" %(workerNumber, startline, endline) | |
path = "import/edges/create" | |
# are one of the regions contained within the other or the same region? | |
def regionIsSubset(region1, region2): | |
assert(len(region1) == 2 and len(region2) == 2 ) | |
return ( ( region1[0] >= region2[0] and region1[1] <= region2[1] ) or | |
(region2[0] >= region1[0] and region2[1] <= region1[1] ) ) | |
for lineno in range( startline, endline + 1 ): | |
if( lineno is 1): | |
# no use for header | |
continue | |
# remember: linecache indexing starts from 1, not from index 0 like in open() | |
line = linecache.getline( edgeDumpFile, lineno + 1 ) | |
# no empty lines accepted; linecache returns empty when line is not found | |
assert( line ) | |
lineDict = getLineDict( header, line ) | |
sourceNodeId = self.nodeHash.get(lineDict.get( 'alias1' ) ) | |
targetNodeId = self.nodeHash.get(lineDict.get( 'alias2' ) ) | |
f1chr = re.findall(r'\d+', lineDict.get('f1chr') ) | |
f2chr = re.findall(r'\d+', lineDict.get('f2chr') ) | |
distance = None | |
# calculate chromosomal region distance | |
if(len(f1chr) != 0 and len(f2chr) != 0 ): | |
try: | |
f1chr = int(f1chr[0] ) | |
f2chr = int(f2chr[0] ) | |
if(f1chr <= 22 and f2chr <= 22 and f1chr == f2chr ): | |
f2start = int(lineDict['f2start'] ) | |
f2end = int(lineDict['f2end'] ) | |
f1start = int(lineDict['f1start'] ) | |
f1end = int(lineDict['f1end'] ) | |
f1 = [f1start, f1end ] | |
f2 = [f2start, f2end ] | |
f1.sort() | |
f2.sort() | |
if(( f1[0] < f2[1] < f1[1] ) or ( f1[0] < f2[0] < f1[1] ) ): | |
# regions overlap | |
#print "DIST=0: f1,f2" | |
#print f1,f2 | |
distance = 0 | |
else: | |
if( regionIsSubset(f1, f2) ): | |
#print "SUBSET:" | |
#print f1,f2 | |
distance = 0 | |
else: | |
distance = sorted( [ abs( f2[ | |
0] - f1[1] ), abs( f1[0] - f2[1] ) ] )[0] | |
except ValueError: | |
pass | |
if distance: | |
lineDict['distance'] = distance | |
self.send_request(path, params={}, data={ 'values': | |
lineDict, 'source': sourceNodeId, 'target': targetNodeId } ) | |
self.display_progress() | |
print "Worker #%i Finished creating and indexing edges" %workerNumber | |
if __name__ == "__main__": | |
if( len( sys.argv ) is not 5 ): | |
print "Usage is py2.7 client.py datasetLabel nodeDumpFile edgeDumpFile patientDumpfile" | |
sys.exit(-1) | |
print "Data import started at %s" %( str(datetime.datetime.now()) ) | |
datasetLabel = sys.argv[1] | |
nodeDumpFile = sys.argv[2] | |
edgeDumpFile = sys.argv[3] | |
patientDumpFile = sys.argv[4] | |
if not os.access( nodeDumpFile, os.R_OK ): | |
print "Could not find file %s" %(nodeDumpFile) | |
sys.exit(-1) | |
if not os.access( edgeDumpFile, os.R_OK ): | |
print "Could not find file %s" %(edgeDumpFile) | |
sys.exit(-1) | |
if not os.access( patientDumpFile, os.R_OK ): | |
print "Could not find file %s" %(patientDumpFile) | |
sys.exit(-1) | |
client = ImportClient(datasetLabel=datasetLabel) | |
client.startImport( nodeDumpFile, edgeDumpFile, patientDumpFile ) | |
print "Data import ended at %s" %( str(datetime.datetime.now()) ) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment