Skip to content

Instantly share code, notes, and snippets.

@mr-eyes
Created March 27, 2021 10:34
Show Gist options
  • Save mr-eyes/ccc5043a34914ecfa42f590011a6501f to your computer and use it in GitHub Desktop.
Save mr-eyes/ccc5043a34914ecfa42f590011a6501f to your computer and use it in GitHub Desktop.
kProcessor index validator
# Validate kProcessor 1 index
from itertools import groupby
import os
import kProcessor as kp
import hashlib
class IntegralHasher:
DNA_REVERSE = [2, 3, 0, 1]
Q = int()
def merge_QR(self, Q, R):
return (Q << self.key_remainder_bits) | R
def BITMASK(self, n):
# define BITMASK(nbits) ((nbits) == 64 ? 0xffffffffffffffff : (1ULL << (nbits)) - 1ULL)
if n == 64:
return 0xffffffffffffffff
else:
return (1 << n) - 1
def __init__(self, kSize):
self.kSize = kSize
self.mask = ((1 << (2 * kSize)) - 1)
def set_q(self, q):
self.Q = q
self.key_remainder_bits = (self.kSize * 2) - q
def get_anatomy(self, hash):
R = hash & self.BITMASK(self.key_remainder_bits)
Q = hash >> self.key_remainder_bits
return (Q, R)
def compare_kmers(self, kmer, kmer_rev):
return kmer <= kmer_rev
def reverse_complement(self, kmer, ln):
rc = 0
base = 0
for i in range(ln):
base = kmer & 3
base = self.DNA_REVERSE[base]
kmer >>= 2
rc |= base
rc <<= 2
rc >>= 2
return rc
def str_to_int(self, kmer):
_map = {"A": 0, "C": 1, "T": 2, "G": 3}
strint = 0
for n in kmer:
curr = _map[n]
strint = strint | curr
strint = strint << 2
return strint >> 2
def int_to_bin(self, n):
ff = "{0:0" + str((self.kSize * 2)) + "b}"
return ff.format(n)
def int_to_str(self, kmer):
_map = {0: 'A', 1: 'C', 2: 'T', 3: 'G'}
kmer_str = ""
for i in range(self.kSize, 0, -1):
base = (kmer >> (i * 2 - 2)) & 3
ch = _map[base]
kmer_str += ch
return kmer_str
def str_to_canonical_int(self, str):
kmerI = self.str_to_int(str)
kmerIR = self.reverse_complement(kmerI, len(str))
if self.compare_kmers(kmerI, kmerIR):
return kmerI
else:
return kmerIR
def hash(self, kmer):
key = self.str_to_canonical_int(kmer)
key = (~key + (key << 21)) & self.mask # key = (key << 21) - key - 1
key = key ^ key >> 24
key = ((key + (key << 3)) + (key << 8)) & self.mask # key * 265
key = key ^ key >> 14
key = ((key + (key << 2)) + (key << 4)) & self.mask # key * 21
key = key ^ key >> 28
key = (key + (key << 31)) & self.mask
return key
def Ihash(self, key):
# Invert key = key + (key << 31)
tmp = (key - (key << 31))
key = (key - (tmp << 31)) & self.mask
# Invert key = key ^ (key >> 28)
tmp = key ^ key >> 28
key = key ^ tmp >> 28
# Invert key *= 21
key = (key * 14933078535860113213) & self.mask
# Invert key = key ^ (key >> 14)
tmp = key ^ key >> 14
tmp = key ^ tmp >> 14
tmp = key ^ tmp >> 14
key = key ^ tmp >> 14
# Invert key *= 265
key = (key * 15244667743933553977) & self.mask
# Invert key = key ^ (key >> 24)
tmp = key ^ key >> 24
key = key ^ tmp >> 24
# Invert key = (~key) + (key << 21)
tmp = ~key
tmp = ~(key - (tmp << 21))
tmp = ~(key - (tmp << 21))
key = ~(key - (tmp << 21)) & self.mask
return self.int_to_str(key)
def load_namesFile(names_file_path):
seq_to_group = dict()
with open(names_file_path) as namesFileReader:
for line in namesFileReader:
line = line.strip().split('\t')
seq_to_group[line[0]] = line[1]
return seq_to_group
def load_namesMap(idx_prefix):
namesMap_path = f"{idx_prefix}.namesMap"
groupName_to_groupID = dict()
with open(namesMap_path) as namesMapFileReader:
next(namesMapFileReader) #skip first line: total groups number
for line in namesMapFileReader:
line = line.strip().split(' ')
groupName_to_groupID[line[1]] = int(line[0])
return groupName_to_groupID
def map_seq_to_groupID(seq_to_groupName, groupName_to_groupID):
seq_to_groupID = dict()
for seq, groupName in seq_to_groupName.items():
seq_to_groupID[seq] = groupName_to_groupID[groupName]
return seq_to_groupID
def fasta_iter(fasta_name):
fh = open(fasta_name)
faiter = (x[1] for x in groupby(fh, lambda line: line[0] == ">"))
for header in faiter:
# drop the ">"
headerStr = header.__next__()[1:].strip()
# join all sequence lines to one.
seq = "".join(s.strip() for s in faiter.__next__())
yield (headerStr, seq)
def load_intVector(index_prefix):
color_to_groups = dict()
with open(f"{index_prefix}colors.intvectors") as READER:
number_of_colors = int(next(READER))
for line in READER:
line = list(map(int, line.strip().split(' ')))
color = line[0]
groups = set(line[2:])
color_to_groups[color] = groups
return color_to_groups
def reverse_complement(dna):
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}
return ''.join([complement[base] for base in dna[::-1]])
def canonical(kmer):
rev = reverse_complement(kmer)
if kmer < rev:
return kmer
else:
return rev
def seq_to_kmers(seq, kSize):
kmers = list()
for i in range(len(seq) - kSize + 1):
kmers.append(seq[i:i+kSize])
return kmers
if __name__ == '__main__':
# load kPorcessor index
index_prefix = "idx_genes"
ckf = kp.colored_kDataFrame.load(index_prefix)
kf = ckf.getkDataFrame()
K_SIZE = kf.ksize()
HASHER = IntegralHasher(K_SIZE)
data_dir = "/home/mabuelanin/dib-dev/kp_examples/indexing_test/data"
index_dir = "/home/mabuelanin/dib-dev/kp_examples/indexing_test"
seq_file = os.path.join(data_dir, "human_pc.part_001.part_004.fa")
names_file = os.path.join(data_dir, "human_pc.part_001.part_004.fa.names")
seq_to_groupName = load_namesFile(names_file)
groupName_to_groupID = load_namesMap(index_prefix)
seq_to_groupID = map_seq_to_groupID(seq_to_groupName, groupName_to_groupID)
# Python indexing
golden_kmers_index = dict()
for header, seq in fasta_iter(seq_file):
kmers = seq_to_kmers(seq, K_SIZE)
for kmer in kmers:
canonical_kmer = HASHER.int_to_str(HASHER.str_to_canonical_int(kmer))
golden_kmers_index[canonical_kmer] = set()
for header, seq in fasta_iter(seq_file):
kmers = seq_to_kmers(seq, K_SIZE)
group_ID = seq_to_groupID[header]
for kmer in kmers:
canonical_kmer = HASHER.int_to_str(HASHER.str_to_canonical_int(kmer))
golden_kmers_index[canonical_kmer].add(group_ID)
# load kProcessor colors to group IDs
kp_colors_to_groupIDs = load_intVector(index_prefix)
kProcessor_kmers_index = dict()
it = kf.begin()
while it != kf.end():
kmer = it.getKmer()
color = it.getCount()
sources = sorted(ckf.getKmerSourceFromColor(color))
kProcessor_kmers_index[kmer] = sources
it.next()
sorted_kProcessor_kmers_index = sorted(kProcessor_kmers_index.items())
sorted_golden_kmers_index = sorted(golden_kmers_index.items())
with open("kProcessor_index", 'w') as WRITER:
for kmer, sources in sorted_kProcessor_kmers_index:
if len(sources) > 0:
WRITER.write(f"{kmer}: {sorted(list(sources))}\n")
with open("golden_index", 'w') as WRITER:
for kmer, sources in sorted_golden_kmers_index:
if len(sources) > 0:
WRITER.write(f"{kmer}: {sorted(list(sources))}\n")
kp_md5_hash = hashlib.md5()
kp_md5_hash.update(open("kProcessor_index", "rb").read())
kProcessor_file_digest = kp_md5_hash.hexdigest()
golden_md5_hash = hashlib.md5()
golden_md5_hash.update(open("golden_index", "rb").read())
golden_file_digest = golden_md5_hash.hexdigest()
if kProcessor_file_digest == golden_file_digest:
print(f"[PASS] kProcessor index is correct")
else:
print(f"[FAIL] kProcessor index has mistakes, check the output files to debug")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment