Created
March 27, 2021 10:34
-
-
Save mr-eyes/ccc5043a34914ecfa42f590011a6501f to your computer and use it in GitHub Desktop.
kProcessor index validator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Validate kProcessor 1 index | |
from itertools import groupby | |
import os | |
import kProcessor as kp | |
import hashlib | |
class IntegralHasher: | |
DNA_REVERSE = [2, 3, 0, 1] | |
Q = int() | |
def merge_QR(self, Q, R): | |
return (Q << self.key_remainder_bits) | R | |
def BITMASK(self, n): | |
# define BITMASK(nbits) ((nbits) == 64 ? 0xffffffffffffffff : (1ULL << (nbits)) - 1ULL) | |
if n == 64: | |
return 0xffffffffffffffff | |
else: | |
return (1 << n) - 1 | |
def __init__(self, kSize): | |
self.kSize = kSize | |
self.mask = ((1 << (2 * kSize)) - 1) | |
def set_q(self, q): | |
self.Q = q | |
self.key_remainder_bits = (self.kSize * 2) - q | |
def get_anatomy(self, hash): | |
R = hash & self.BITMASK(self.key_remainder_bits) | |
Q = hash >> self.key_remainder_bits | |
return (Q, R) | |
def compare_kmers(self, kmer, kmer_rev): | |
return kmer <= kmer_rev | |
def reverse_complement(self, kmer, ln): | |
rc = 0 | |
base = 0 | |
for i in range(ln): | |
base = kmer & 3 | |
base = self.DNA_REVERSE[base] | |
kmer >>= 2 | |
rc |= base | |
rc <<= 2 | |
rc >>= 2 | |
return rc | |
def str_to_int(self, kmer): | |
_map = {"A": 0, "C": 1, "T": 2, "G": 3} | |
strint = 0 | |
for n in kmer: | |
curr = _map[n] | |
strint = strint | curr | |
strint = strint << 2 | |
return strint >> 2 | |
def int_to_bin(self, n): | |
ff = "{0:0" + str((self.kSize * 2)) + "b}" | |
return ff.format(n) | |
def int_to_str(self, kmer): | |
_map = {0: 'A', 1: 'C', 2: 'T', 3: 'G'} | |
kmer_str = "" | |
for i in range(self.kSize, 0, -1): | |
base = (kmer >> (i * 2 - 2)) & 3 | |
ch = _map[base] | |
kmer_str += ch | |
return kmer_str | |
def str_to_canonical_int(self, str): | |
kmerI = self.str_to_int(str) | |
kmerIR = self.reverse_complement(kmerI, len(str)) | |
if self.compare_kmers(kmerI, kmerIR): | |
return kmerI | |
else: | |
return kmerIR | |
def hash(self, kmer): | |
key = self.str_to_canonical_int(kmer) | |
key = (~key + (key << 21)) & self.mask # key = (key << 21) - key - 1 | |
key = key ^ key >> 24 | |
key = ((key + (key << 3)) + (key << 8)) & self.mask # key * 265 | |
key = key ^ key >> 14 | |
key = ((key + (key << 2)) + (key << 4)) & self.mask # key * 21 | |
key = key ^ key >> 28 | |
key = (key + (key << 31)) & self.mask | |
return key | |
def Ihash(self, key): | |
# Invert key = key + (key << 31) | |
tmp = (key - (key << 31)) | |
key = (key - (tmp << 31)) & self.mask | |
# Invert key = key ^ (key >> 28) | |
tmp = key ^ key >> 28 | |
key = key ^ tmp >> 28 | |
# Invert key *= 21 | |
key = (key * 14933078535860113213) & self.mask | |
# Invert key = key ^ (key >> 14) | |
tmp = key ^ key >> 14 | |
tmp = key ^ tmp >> 14 | |
tmp = key ^ tmp >> 14 | |
key = key ^ tmp >> 14 | |
# Invert key *= 265 | |
key = (key * 15244667743933553977) & self.mask | |
# Invert key = key ^ (key >> 24) | |
tmp = key ^ key >> 24 | |
key = key ^ tmp >> 24 | |
# Invert key = (~key) + (key << 21) | |
tmp = ~key | |
tmp = ~(key - (tmp << 21)) | |
tmp = ~(key - (tmp << 21)) | |
key = ~(key - (tmp << 21)) & self.mask | |
return self.int_to_str(key) | |
def load_namesFile(names_file_path): | |
seq_to_group = dict() | |
with open(names_file_path) as namesFileReader: | |
for line in namesFileReader: | |
line = line.strip().split('\t') | |
seq_to_group[line[0]] = line[1] | |
return seq_to_group | |
def load_namesMap(idx_prefix): | |
namesMap_path = f"{idx_prefix}.namesMap" | |
groupName_to_groupID = dict() | |
with open(namesMap_path) as namesMapFileReader: | |
next(namesMapFileReader) #skip first line: total groups number | |
for line in namesMapFileReader: | |
line = line.strip().split(' ') | |
groupName_to_groupID[line[1]] = int(line[0]) | |
return groupName_to_groupID | |
def map_seq_to_groupID(seq_to_groupName, groupName_to_groupID): | |
seq_to_groupID = dict() | |
for seq, groupName in seq_to_groupName.items(): | |
seq_to_groupID[seq] = groupName_to_groupID[groupName] | |
return seq_to_groupID | |
def fasta_iter(fasta_name): | |
fh = open(fasta_name) | |
faiter = (x[1] for x in groupby(fh, lambda line: line[0] == ">")) | |
for header in faiter: | |
# drop the ">" | |
headerStr = header.__next__()[1:].strip() | |
# join all sequence lines to one. | |
seq = "".join(s.strip() for s in faiter.__next__()) | |
yield (headerStr, seq) | |
def load_intVector(index_prefix): | |
color_to_groups = dict() | |
with open(f"{index_prefix}colors.intvectors") as READER: | |
number_of_colors = int(next(READER)) | |
for line in READER: | |
line = list(map(int, line.strip().split(' '))) | |
color = line[0] | |
groups = set(line[2:]) | |
color_to_groups[color] = groups | |
return color_to_groups | |
def reverse_complement(dna): | |
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'} | |
return ''.join([complement[base] for base in dna[::-1]]) | |
def canonical(kmer): | |
rev = reverse_complement(kmer) | |
if kmer < rev: | |
return kmer | |
else: | |
return rev | |
def seq_to_kmers(seq, kSize): | |
kmers = list() | |
for i in range(len(seq) - kSize + 1): | |
kmers.append(seq[i:i+kSize]) | |
return kmers | |
if __name__ == '__main__': | |
# load kPorcessor index | |
index_prefix = "idx_genes" | |
ckf = kp.colored_kDataFrame.load(index_prefix) | |
kf = ckf.getkDataFrame() | |
K_SIZE = kf.ksize() | |
HASHER = IntegralHasher(K_SIZE) | |
data_dir = "/home/mabuelanin/dib-dev/kp_examples/indexing_test/data" | |
index_dir = "/home/mabuelanin/dib-dev/kp_examples/indexing_test" | |
seq_file = os.path.join(data_dir, "human_pc.part_001.part_004.fa") | |
names_file = os.path.join(data_dir, "human_pc.part_001.part_004.fa.names") | |
seq_to_groupName = load_namesFile(names_file) | |
groupName_to_groupID = load_namesMap(index_prefix) | |
seq_to_groupID = map_seq_to_groupID(seq_to_groupName, groupName_to_groupID) | |
# Python indexing | |
golden_kmers_index = dict() | |
for header, seq in fasta_iter(seq_file): | |
kmers = seq_to_kmers(seq, K_SIZE) | |
for kmer in kmers: | |
canonical_kmer = HASHER.int_to_str(HASHER.str_to_canonical_int(kmer)) | |
golden_kmers_index[canonical_kmer] = set() | |
for header, seq in fasta_iter(seq_file): | |
kmers = seq_to_kmers(seq, K_SIZE) | |
group_ID = seq_to_groupID[header] | |
for kmer in kmers: | |
canonical_kmer = HASHER.int_to_str(HASHER.str_to_canonical_int(kmer)) | |
golden_kmers_index[canonical_kmer].add(group_ID) | |
# load kProcessor colors to group IDs | |
kp_colors_to_groupIDs = load_intVector(index_prefix) | |
kProcessor_kmers_index = dict() | |
it = kf.begin() | |
while it != kf.end(): | |
kmer = it.getKmer() | |
color = it.getCount() | |
sources = sorted(ckf.getKmerSourceFromColor(color)) | |
kProcessor_kmers_index[kmer] = sources | |
it.next() | |
sorted_kProcessor_kmers_index = sorted(kProcessor_kmers_index.items()) | |
sorted_golden_kmers_index = sorted(golden_kmers_index.items()) | |
with open("kProcessor_index", 'w') as WRITER: | |
for kmer, sources in sorted_kProcessor_kmers_index: | |
if len(sources) > 0: | |
WRITER.write(f"{kmer}: {sorted(list(sources))}\n") | |
with open("golden_index", 'w') as WRITER: | |
for kmer, sources in sorted_golden_kmers_index: | |
if len(sources) > 0: | |
WRITER.write(f"{kmer}: {sorted(list(sources))}\n") | |
kp_md5_hash = hashlib.md5() | |
kp_md5_hash.update(open("kProcessor_index", "rb").read()) | |
kProcessor_file_digest = kp_md5_hash.hexdigest() | |
golden_md5_hash = hashlib.md5() | |
golden_md5_hash.update(open("golden_index", "rb").read()) | |
golden_file_digest = golden_md5_hash.hexdigest() | |
if kProcessor_file_digest == golden_file_digest: | |
print(f"[PASS] kProcessor index is correct") | |
else: | |
print(f"[FAIL] kProcessor index has mistakes, check the output files to debug") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment