Skip to content

Instantly share code, notes, and snippets.

@amergin
Created Aug 4, 2012
Embed
What would you like to do?
# -*- coding: utf-8 -*-
# Modified lightsocket import script, based on Lightsocket
# example script by James Thornton (http://jamesthornton.com)
# mundane modification stuff
import os
import commands
import sys
import datetime
import re # regexp
import time
# concurrency support for importing edges:
import multiprocessing
# random access to files:
import linecache
# lightsocket specific libraries
import zmq
import json
def getLineDict( header, line ):
columns = line.strip("\n").split("\t")
return dict(zip( header, columns ) )
class ImportClient(object):
def __init__(self, datasetLabel, server="tcp://localhost:5555"):
self.server = server
self.socket = self._connect()
self.count = 0
self.start_time = None
self.label = datasetLabel
self.edgeInsertionWorkers = 5
# info index will contain patient barcodes and posible other meta
# information
node_indices = {
'CLIN': self.label + '_n_CLIN', 'GEXP': self.label + '_n_GEXP',
'METH': self.label + '_n_METH', 'CNVR': self.label + '_n_CNVR',
'info': self.label + '_n_info'}
edge_indices = {
'ASSOCIATION': self.label + '_e_ASSOC', 'DISTANCE': self.label + '_e_DIST'
}
self.indices = {'nodes' : node_indices, 'edges': edge_indices }
self.nodeHash = dict()
def _connect(self):
print "Connecting to Lightsocket..."
context = zmq.Context(8)
socket = context.socket(zmq.REQ)
socket.connect(self.server)
return socket
def send_request(self, path,params,data):
request = dict(path=path, params=params,data=data)
message = json.dumps(request)
self.socket.send(message)
raw = self.socket.recv()
resp = json.loads(raw)
return resp
def start_timer(self):
self.start_time = time.time()
def stop_timer(self):
end_time = time.time()
run_time = end_time - self.start_time
print "Requests Per Second: ", self.count / run_time
def display_progress(self):
if self.start_time:
self.count = self.count + 1
if (self.count % 200000) == 0:
elapsed = time.time() - self.start_time
print self.count, elapsed, elapsed / \
self.count, self.count / elapsed
def startImport(self, nodeDumpFile, edgeDumpFile, patientDumpfile):
print "Starting import process"
self.start_timer()
self._createIndices()
self._createInfoNode(patientDumpfile)
self._importNodes(nodeDumpFile)
# initialize edgeproxy by sending header & edgeType information:
edgeFile = file(edgeDumpFile, 'r')
pathInit = "import/edges/initializeElements"
line = edgeFile.readline()
edgeFile.close()
header = []
edgeTypes = dict()
for cols in line.strip("\n").split("\t"):
ele = cols.split("__")
header.append(ele[0] )
edgeTypes[ele[0] ] = ele[1]
edgeTypes["distance" ] = "int"
self.send_request(pathInit, params={}, data={ 'edgeTypes':
edgeTypes, 'indices': self.indices['edges'] } )
# the time consuming part: importing edges. divide the edge dump
# reading and insertion into N parts and handle the insertion concurrently:
# get number of lines
status, output = commands.getstatusoutput("wc -l %s | awk '{ print $1 }'" %edgeDumpFile )
noLines = int( output )
print "Edge dump file contains %i lines" %noLines
# determine the partitions:
processes = []
startline = 0
endline = 0
for i in range(0, self.edgeInsertionWorkers ):
if( i is not 0 ):
startline = ( noLines / self.edgeInsertionWorkers ) * i
if( i is self.edgeInsertionWorkers-1 ):
endline = (noLines / self.edgeInsertionWorkers) \
* self.edgeInsertionWorkers + (noLines % self.edgeInsertionWorkers) - 1
else:
endline = ( noLines / self.edgeInsertionWorkers ) * (i+1) - 1
# every worker calls self._importEdges
p = multiprocessing.Process( \
target=self._importEdges, args=(i, edgeDumpFile, startline, endline, header) )
processes.append( p )
# start individual process
p.start()
# wait for all processes to finish before continuing here
for p in processes:
p.join()
self.stop_timer()
print "Finished importing process"
# creates info node containing the barcodes
def _createInfoNode(self, patientDumpfile):
patientFile = open( patientDumpfile, 'r')
path = "import/vertices/createInfo"
nodeTypes = dict()
header = []
for lineno,line in enumerate( patientFile ):
# create a dictionary that contains every column type
if (lineno is 0):
for cols in line.strip("\n").split("\t"):
ele = cols.split("__")
header.append(ele[0])
nodeTypes[ele[0] ] = ele[1]
continue
lineDict = getLineDict( header, line )
resp = self.send_request(
path, params={}, data={ 'types': nodeTypes, 'values': lineDict } )
patientFile.close()
def _flushIndices(self):
flushPath = "import/indices/flush"
#self.send_request(flushPath, params={}, data={})
def _createIndices(self):
print "Creating Indices"
path = "import/indices/create_manual"
for key, val in self.indices['nodes'].iteritems():
self.send_request(path, params={}, data={ 'type': 'node', 'name': { key: val } } )
# self.send_request(path,params={}, data={ 'edges':
# self.indices[edges], 'nodes': self.indices[nodes], 'datalabel':
# self.label } )
for key, val in self.indices['edges'].iteritems():
self.send_request(path, params={}, data={ 'type': 'edge', 'name': { key: val } } )
self._flushIndices()
print "(Empty) indices created"
def _importNodes(self, nodeDumpFile):
print "Creating and indexing nodes"
node_file = open(nodeDumpFile, 'r')
# lightsocket service path
path = "import/vertices/create"
pathInit = "import/vertices/initializeElements"
nodeTypes = dict()
header = []
for lineno,line in enumerate( node_file.readlines() ):
# create a dictionary that contains every column type, eg.
# nodeTypes['chr'] = 'varchar'
if (lineno is 0):
for cols in line.strip("\n").split("\t"):
ele = cols.split("__")
header.append(ele[0] )
nodeTypes[ele[0] ] = ele[1]
# send valuetypes for each column and expected index names
self.send_request(pathInit, params={}, data={ 'nodeTypes':
nodeTypes, 'indices': self.indices['nodes'] } )
continue
lineDict = getLineDict(header, line )
resp = self.send_request(
path, params={}, data={ 'values': lineDict } )
# store the created id under the node alias
self.nodeHash[lineDict.get( 'alias' ) ] = resp['results']['id']
self.display_progress()
print "Finished creating and indexing nodes"
#def _importEdges(self, edgeDumpFile):
def _importEdges(self, workerNumber, edgeDumpfile, startline, endline, header):
print "Worker #%i starting to import edges, line numbers %i-%i" %(workerNumber, startline, endline)
path = "import/edges/create"
# are one of the regions contained within the other or the same region?
def regionIsSubset(region1, region2):
assert(len(region1) == 2 and len(region2) == 2 )
return ( ( region1[0] >= region2[0] and region1[1] <= region2[1] ) or
(region2[0] >= region1[0] and region2[1] <= region1[1] ) )
for lineno in range( startline, endline + 1 ):
if( lineno is 1):
# no use for header
continue
# remember: linecache indexing starts from 1, not from index 0 like in open()
line = linecache.getline( edgeDumpFile, lineno + 1 )
# no empty lines accepted; linecache returns empty when line is not found
assert( line )
lineDict = getLineDict( header, line )
sourceNodeId = self.nodeHash.get(lineDict.get( 'alias1' ) )
targetNodeId = self.nodeHash.get(lineDict.get( 'alias2' ) )
f1chr = re.findall(r'\d+', lineDict.get('f1chr') )
f2chr = re.findall(r'\d+', lineDict.get('f2chr') )
distance = None
# calculate chromosomal region distance
if(len(f1chr) != 0 and len(f2chr) != 0 ):
try:
f1chr = int(f1chr[0] )
f2chr = int(f2chr[0] )
if(f1chr <= 22 and f2chr <= 22 and f1chr == f2chr ):
f2start = int(lineDict['f2start'] )
f2end = int(lineDict['f2end'] )
f1start = int(lineDict['f1start'] )
f1end = int(lineDict['f1end'] )
f1 = [f1start, f1end ]
f2 = [f2start, f2end ]
f1.sort()
f2.sort()
if(( f1[0] < f2[1] < f1[1] ) or ( f1[0] < f2[0] < f1[1] ) ):
# regions overlap
#print "DIST=0: f1,f2"
#print f1,f2
distance = 0
else:
if( regionIsSubset(f1, f2) ):
#print "SUBSET:"
#print f1,f2
distance = 0
else:
distance = sorted( [ abs( f2[
0] - f1[1] ), abs( f1[0] - f2[1] ) ] )[0]
except ValueError:
pass
if distance:
lineDict['distance'] = distance
self.send_request(path, params={}, data={ 'values':
lineDict, 'source': sourceNodeId, 'target': targetNodeId } )
self.display_progress()
print "Worker #%i Finished creating and indexing edges" %workerNumber
if __name__ == "__main__":
if( len( sys.argv ) is not 5 ):
print "Usage is py2.7 client.py datasetLabel nodeDumpFile edgeDumpFile patientDumpfile"
sys.exit(-1)
print "Data import started at %s" %( str(datetime.datetime.now()) )
datasetLabel = sys.argv[1]
nodeDumpFile = sys.argv[2]
edgeDumpFile = sys.argv[3]
patientDumpFile = sys.argv[4]
if not os.access( nodeDumpFile, os.R_OK ):
print "Could not find file %s" %(nodeDumpFile)
sys.exit(-1)
if not os.access( edgeDumpFile, os.R_OK ):
print "Could not find file %s" %(edgeDumpFile)
sys.exit(-1)
if not os.access( patientDumpFile, os.R_OK ):
print "Could not find file %s" %(patientDumpFile)
sys.exit(-1)
client = ImportClient(datasetLabel=datasetLabel)
client.startImport( nodeDumpFile, edgeDumpFile, patientDumpFile )
print "Data import ended at %s" %( str(datetime.datetime.now()) )
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment