Skip to content

Instantly share code, notes, and snippets.

@teichmaj
Created February 22, 2015 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teichmaj/f913441837a416294f99 to your computer and use it in GitHub Desktop.
Save teichmaj/f913441837a416294f99 to your computer and use it in GitHub Desktop.
Calculating Levi distance with PyCSP in concurrency
#!/usr/bin/python
# -*- coding: utf-8 -*-
# levishtein.py
# This program calculates the Levishtein Distance using CSP processes
# Version: 1
# Date: 03/12/2010
# Author: Jan Teichmann
"""
This program calculates the Levishtein Distance using dynamic programming
and CSP processes. Input file in fasta format.
"""
from pycsp import *
import sys
import numpy # for the distMatrix
@process
def fileReader(jobOut, inputFile):
stringID = ''
strings = {} # dictionary collecting the input strings
for line in inputFile:
line = line.strip() # removes whitespaces and newlines
if '>' in line:
stringID = line[1:]
strings[stringID] = ''
continue
strings[stringID] += line # string extension
IDs = strings.keys()
for i in IDs:
for j in IDs:
jobOut(([i, strings[i]], [j, strings[j]]))
retire(jobOut) # no more strings to process -> retire channel
@process
def worker(jobIn, resultOut):
try:
while True:
(string1, string2) = jobIn()
len1 = len(string1[1])
len2 = len(string2[1])
# for the initial line in the distMatrix len+1
distMatrix = numpy.zeros((len1 + 1, len2 + 1))
distMatrix[0, :] = range(len2 + 1)
distMatrix[:, 0] = range(len1 + 1)
for i in range(len1):
for j in range(len2):
distMatrix[i+1, j+1] = min(distMatrix[i, j+1] + 1, # insertion
distMatrix[i+1, j] + 1, # deletion
distMatrix[i, j] + int(string1[1][i] != string2[1][j])) # replacement or match
# insertion and deletion punishs +1
# if letter of string1 == letter of string2 -> int(False) = 0
resultOut((string1[0], string2[0], distMatrix[-1, -1]))
except ChannelPoisonException:
retire(jobIn, resultOut)
@process
def resultCollector(resultsIn):
try:
while True:
(ID1, ID2, leviDist) = resultsIn()
print '%s - %s: %i' % (ID1, ID2, leviDist)
except ChannelPoisonException:
retire(resultsIn)
if __name__ == "__main__":
try:
inputFile = sys.argv[2]
networkSize = int(sys.argv[1])
except:
raise IOError('program call: python levishtein.py number_of_threads in_file')
try:
fileHandle = open(inputFile, 'r')
except:
raise IOError('can not open/find specified input file: %s' % inputFile)
job = Channel()
levDist = Channel()
Parallel(
fileReader(job.writer(), fileHandle),
networkSize * worker(job.reader(), levDist.writer()),
resultCollector(levDist.reader())
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment