From ea87f510603d7ce7013457e521df4e778e3bbede Mon Sep 17 00:00:00 2001 From: Nicolas Francillonne <nicolas.francillonne@inra.fr> Date: Fri, 9 Apr 2021 11:19:03 +0200 Subject: [PATCH] Reprise des script de Mathieu --- scripts/awk.sh | 3 + scripts/bestHits.py | 197 +++++++++++++++++++--------------- scripts/cliques.py | 91 ++++++++++++++++ scripts/extractFLF.py | 113 +++++++++++++++++++ scripts/extractMatch.sh | 3 + scripts/fastaLength.py | 87 +++++++++++++++ scripts/getChromSize.py | 11 ++ scripts/getnewCliques.py | 67 ++++++++++++ scripts/reformat.py | 28 +++++ scripts/run_Snakemake_Chr4.sh | 7 ++ scripts/stats.py | 89 +++++++++++++++ scripts/test_bestHits.py | 61 +++++++++++ scripts/test_cliques.py | 48 +++++++++ 13 files changed, 721 insertions(+), 84 deletions(-) create mode 100755 scripts/awk.sh create mode 100755 scripts/cliques.py create mode 100755 scripts/extractFLF.py create mode 100755 scripts/extractMatch.sh create mode 100755 scripts/fastaLength.py create mode 100755 scripts/getChromSize.py create mode 100755 scripts/getnewCliques.py create mode 100755 scripts/reformat.py create mode 100755 scripts/run_Snakemake_Chr4.sh create mode 100755 scripts/stats.py create mode 100755 scripts/test_bestHits.py create mode 100755 scripts/test_cliques.py diff --git a/scripts/awk.sh b/scripts/awk.sh new file mode 100755 index 0000000..8852b08 --- /dev/null +++ b/scripts/awk.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +awk '$3 =="match"' $1 \ No newline at end of file diff --git a/scripts/bestHits.py b/scripts/bestHits.py index d3fc423..3b19911 100755 --- a/scripts/bestHits.py +++ b/scripts/bestHits.py @@ -1,86 +1,115 @@ #!/usr/bin/env python - -import sys - -## - -test_file = sys.argv[1] - - -hits = {} -''' -#with open(snakefile.input[0], 'r') as align: -with open(test_file, 'r') as align: - for line in align.readlines(): - #print(line.split()[2]) - (qname, qstart, qstop, sname, sstart, sstop, evalue, blast_score, idty) = line.split() - len_match = abs(int(qstop)-int(qstart)) - - if qname not in hits: - hits[qname] = {} - hits[qname][blast_score] = {} - - hits[qname][blast_score] = {'qstart': qstart, - 'qstop': qstop, - 'match': sname, - 'sstart': sstart, - 'evalue': evalue, - 'blast_score': blast_score, - 'len_match': len_match, - 'identity': idty - } - - - -candidates = {} - -for copy in hits: - best_hit = max([int(i) for i in hits[copy].keys()]) - candidates[copy] = hits[copy][str(best_hit)] -''' -#print(candidates) - -def getBestHit(): - with open(test_file, 'r') as align: - for line in align: - (qname, qstart, qstop, sname, sstart, sstop, evalue, blast_score, idty) = line.split() - if qname not in hits: - hits[qname] = {} - if blast_score not in hits[qname]: - hits[qname][blast_score] = {} - - for i in hits: - hits[i] = (sname, best_score(i)) - - return hits - -def best_score(qname): - return max([int(i) for i in hits[qname].keys()]) - - -''' - for i in hits: - if hits[i][blast_score] != hits[qname][best_hit]: - hits[qname][blast_score].pop() - - - #print(f"{best_hit}/////{blast_score}") - - - - - - - else: - best_hit = max([int(i) for i in hits[copy].keys()]) - hits[qname][best_hit] = {'qstart': qstart, - 'qstop': qstop, - 'match': sname} -''' - - -print(getBestHit()) - - -#with open(snakefile.ouput[0],'a') as output: +import re +import argparse +import logging +import os + + +class BestBijectiveHits(object): + + def __init__(self, tabFileAccession1="", tabFileAccession2="", outputFile=""): + self.tabFileAccession1 = tabFileAccession1 + self.tabFileAccession2 = tabFileAccession2 + self.outputFile = outputFile + + def setAttributesFromCmdLine(self): + parser = argparse.ArgumentParser(usage="BestBijectivesHits.py [options]", + description="Script anlyzing matcher results and giving best bijective pairs", + epilog="z") + + parser.add_argument('-fi', '--first_input', help= 'first input file', required= True, metavar='file') + parser.add_argument('-si', '--second_input', help='second input file', required=True, metavar='file') + parser.add_argument('-o', '--output', help='output file', required=True) + parser.add_argument('-cf', '--coverageFlank', help='', required=True, type=float) + parser.add_argument('-cm', '--coverageMatch', help='', required=True, type=float) + parser.add_argument('-e', '--extension', type = int) + #parser.add_argument('-v', '--verbosity', help='verbosity', required=True, metavar='file') + args=parser.parse_args() + self._setAttributesFromArguments(args) + + def _setAttributesFromArguments(self, args): + self.tabFile1 = args.first_input + self.tabFile2 = args.second_input + self.output = args.output + self.cov_f = args.coverageFlank + self.cov_m = args.coverageMatch + self.lenExt = args.extension + #logging.getLogger().setLevel(logging.INFO) + + logging.basicConfig(format='%(levelname)s - %(asctime)s - %(message)s', level=logging.INFO) + logging.info("BBH is running") + + def getBestHit(self, input_file): + + if not os.path.exists(input_file): + logging.warning("No input file") + + with open(input_file, 'r') as align: + next(align) + hits = {} + for line in align: + qname = line.split()[0] + sname = line.split()[6] + mcov = line.split()[5] + start = int(line.split()[1]) + stop = int(line.split()[2]) + blast_score = line.split()[12] + length = (int(re.search("(\d+)-(\d+)", qname).group(2))-int(re.search("(\d+)-(\d+)", qname).group(1))) + + if qname not in hits: + hits[qname] = (sname, blast_score, mcov, start, stop, length) + + elif int(blast_score) > int(hits[qname][1]): + hits[qname] = (sname, blast_score, mcov, start, stop, length) + + # elif int(blast_score) == int(hits[qname][1]): + # #print(mcov) + # a = 0 + return hits + + def getBijective(self, first_hits, second_hits, cov_f, cov_m): + hit_pairs = [] + cov_f = float(cov_f) + cov_m = float(cov_m) + for i in first_hits: + if self.checkFlankCov(first_hits, i, cov_f, cov_m, self.lenExt): + hit = first_hits[i][0] + if self.checkFlankCov(second_hits, hit, cov_f, cov_m, self.lenExt) and second_hits[hit][0] == i: + hit_pairs.append((i, hit)) + + return hit_pairs + + def checkFlankCov(self, hit_dict, copy, cov_flank_param, cov_match_param, lenExt): + pos_start = hit_dict[copy][3] + pos_stop = hit_dict[copy][4] + cov_match = hit_dict[copy][2] + length_ext_copy = hit_dict[copy][5] + + left_flank_cov = (lenExt - pos_start)/lenExt + right_flank_cov = 1-((length_ext_copy - pos_stop)/lenExt) + + if float(cov_match) >= cov_match_param: + if left_flank_cov >= cov_flank_param and right_flank_cov >= cov_flank_param: ##float(cov_match) >= 0.0 and + return True + + def writeOutput(self, hit_pairs, output): + + with open(output, 'w') as output_file: + for i in hit_pairs: + output_file.write(i[0] + '\t' + i[1] + '\n') + + def run(self): + print(self.lenExt, type(self.lenExt)) + first_hits = self.getBestHit(self.tabFile1) + second_hits = self.getBestHit(self.tabFile2) + hit_pair = self.getBijective(first_hits, second_hits, self.cov_f, self.cov_m) + + if hit_pair: + self.writeOutput(hit_pair, self.output) + + +if __name__ == '__main__': + iBBH = BestBijectiveHits() + iBBH.setAttributesFromCmdLine() + iBBH.run() diff --git a/scripts/cliques.py b/scripts/cliques.py new file mode 100755 index 0000000..bc705e2 --- /dev/null +++ b/scripts/cliques.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python + +import logging +import argparse +import re +import networkx as nx + + +class Cliques(object): + + def __init__(self, filename="", filename2="", bestHit="", ac = []): + self.filename = filename + self.filename2 = filename2 + self.bestHit = bestHit + self.ac = ac + + def setAttributesFromCmdLine(self): + parser = argparse.ArgumentParser(usage="Cliques.py [options]", + description="", + epilog="") + + parser.add_argument('-i','--input', help= 'first input file', required= True, metavar='file') + parser.add_argument('-c', '--copy_output', help='coreET output file', required=True, metavar='file') + parser.add_argument('-s', '--stats_output', help='dispensableET output file', required=True) + parser.add_argument('-a', '--list_of_accessions', nargs="*", help='', required=True, type=str) + #parser.add_argument('-v', '--verbosity', help='verbosity', required=True, metavar='file') + args=parser.parse_args() + self.setAttributesFromArgs(args) + + def setAttributesFromArgs(self, args): + self.filename = args.stats_output + self.filename2 = args.copy_output + self.bestHit = args.input + self.ac = args.list_of_accessions + + logging.basicConfig(format='%(levelname)s - %(asctime)s - %(message)s', level=logging.INFO) + logging.info("Cliques is running") + + def getCliques(self, input_file): + graph = nx.read_edgelist(input_file) + + cliques = nx.clique.find_cliques(graph) + + return cliques + + def isCore(self, clique, nb_accessions): + nb_accessions = len(nb_accessions) + if len(clique) == nb_accessions: + return 'core' + else: + return 'dispensable' + + def getCliquesAccessions(self, clique): + accessions = [] + for i in clique: + accessions.append(i.split("/")[0]) + + return accessions + + def run(self): + with open(self.filename, 'w') as stats_cliques: + stats_cliques.write(f"id\tSize\tcore/dispensable\taccessions\tclq\n") + with open(self.filename2, 'w') as tab_cliques: + tab_cliques.write( + f"id\taccession\tchromosome\tstart\tstop\tcopy_name\tconsensus\tsize\n") + clik = self.getCliques(self.bestHit) + id = 0 + for clq in clik: + id +=1 + size =len(clq) + print(self.ac, type(self.ac)) + status = self.isCore(clq, self.ac) + miss = self.getCliquesAccessions(clq) + stats_cliques.write(f"{id}\t{size}\t{status}\t{miss}\t{clq}\n") + for i in clq: + accession = i.split("/")[0] + copy_name = i.split("/")[2] + chromosome = i.split("/")[1] + consensus = i.split("/")[3] + pos = re.search("(\d+)-(\d+)", i.split("/")[4]) + start = pos.group(1) + stop = pos.group(2) + + tab_cliques.write( + f"{id}\t{accession}\t{chromosome}\t{start}\t{stop}\t{copy_name}\t{consensus}\t{size}\n") + + +if __name__ == '__main__': + iC = Cliques() + iC.setAttributesFromCmdLine() + iC.run() diff --git a/scripts/extractFLF.py b/scripts/extractFLF.py new file mode 100755 index 0000000..b55b998 --- /dev/null +++ b/scripts/extractFLF.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python + +import sys +import re + +gff_file = snakemake.input[0] +flf_file = snakemake.output[0] +flc_file = snakemake.output[1] +other_file = snakemake.output[2] + +#gff_file = sys.argv[1] +#flf_file = sys.argv[2] +#flc_file = sys.argv[3] +#other_file = sys.argv[4] + +with open(gff_file, 'r') as gff: + with open(flf_file, 'w') as flf: + with open(flc_file, 'w') as flc: + with open(other_file, 'w') as other: + match_counter = 0 + consecutive_match = 0 + i = 0 + lines = 0 + flc_nb = 0 + all = 0 + ct2 = 0 + match = 0 + min_max_cons = [] + for line in gff: + if line.split()[2] == 'match': + start = line.split()[3] + stop = line.split()[4] + next_line = next(gff) + match_counter += 1 + + target_length = float(re.search("Length=(\d+)", line).group(1)) + len_match = float(stop) - float(start) + if 0.95 <= len_match / target_length <= 1.05: + match += 1 + + if next_line.split()[4] == stop and next_line.split()[3] == start: + tl = re.search("Target=.+?;(.+?)=", line) + len_match = float(stop) - float(start) + if tl.group(1) == "TargetLength": + target_length = float(re.search("Length=(\d+)", line).group(1)) + if 0.95 <= len_match / target_length <= 1.05: + min_max_cons.append(len_match) + flf.write(line) + lines += 1 + else: + other.write(line) + all += 1 + + + else: + len_match_part = 0 + ct = 0 + ct2 += 1 + + while next_line.split()[2] != 'match': + match_part_start = float(next_line.split()[3]) + match_part_stop = float(next_line.split()[4]) + len_match_part += match_part_stop - match_part_start + try: + next_line = next(gff) + except: + next_line = '/ / match' + ct += 1 + + tl = re.search("Target=.+?;(.+?)=", line) + if tl.group(1) == "TargetLength": + target_length = float(re.search("Length=(\d+)", line).group(1)) + if 0.95 <= len_match_part / target_length <= 1.05: + flc.write(line) + flc_nb += 1 + else: + other.write(line) + all += 1 + + +def getLenMatchPart(): + len_match_part = 0 + ct = 0 + ct2 += 1 + + while next_line.split()[2] != 'match': + match_part_start = float(next_line.split()[3]) + match_part_stop = float(next_line.split()[4]) + len_match_part += match_part_stop - match_part_start + try: + next_line = next(gff) + except: + next_line = '/ / match' + ct += 1 + + return len_match_part + + +def isFLC(file, copies_ct): + if tl.group(1) == "TargetLength": + target_length = float(re.search("Length=(\d+)", line).group(1)) + if 0.95 <= len_match_part / target_length <= 1.05: + file.write(line) + copies_ct += 1 + else: + other.write(line) + all += 1 + + +print(match_counter, lines, flc_nb) +print(min(min_max_cons)) +print(max(min_max_cons)) +print(all, ct2, match) diff --git a/scripts/extractMatch.sh b/scripts/extractMatch.sh new file mode 100755 index 0000000..8852b08 --- /dev/null +++ b/scripts/extractMatch.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +awk '$3 =="match"' $1 \ No newline at end of file diff --git a/scripts/fastaLength.py b/scripts/fastaLength.py new file mode 100755 index 0000000..6ae8126 --- /dev/null +++ b/scripts/fastaLength.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python + +import sys, os + +class Bioseq: + + def __init__(self): + self.header="" + self.sequence="" + + def read(self,file): + if file: + line=file.readline() + if line == "": + self.header = None + self.sequence = None + return + if line[0]=='>': + self.header=line.lstrip('>').rstrip() + else: + print("error, line is",line.rstrip()) + return + seq="" + while file: + prev_pos=file.tell() + line=file.readline() + if line=="": + break + if line[0]=='>': + file.seek(prev_pos) + break + seq+=line.rstrip() + self.sequence=seq + return + + def load(self,filename): + file=open(filename) + self.read(file) + + def save(self,filename): + file=open(filename,"w") + self.write(file) + + def write(self,file): + file.write('>'+self.header+'\n') + i=0 + while i<len(self.sequence): + file.write(self.sequence[i:i+60]+'\n') + i=i+60 + + def subseq(self,s,e=0): + if e==0 : + e=len(self.sequence) + if s>e : + print("error: start must be < or = to end") + return + if s<=0 : + print("error: start must be > 0") + return + sub=Bioseq() + sub.header=self.header+" fragment "+str(s)+".."+str(e) + sub.sequence=self.sequence[(s-1):e] + return sub + + def length(self): + return len(self.sequence) + +#------------------------------------------------------------------------------------------------------------ +# Get total size in nucleotide of a fasta file and the count per sequence in a file *.len +def fastaLength(fasta_filename): + file_db = open(fasta_filename) + file_len= open(snakemake.output[0],"w") + seq = Bioseq() + total_length = 0 + while 1: + seq.read(file_db) + if seq.sequence == None: + break + l = seq.length() + total_length += l + file_len.write('{}\t{}\n'.format(seq.header, l)) + file_len.close() + file_db.close() + + return total_length + +fastaLength(snakemake.input[0]) \ No newline at end of file diff --git a/scripts/getChromSize.py b/scripts/getChromSize.py new file mode 100755 index 0000000..8bdbb41 --- /dev/null +++ b/scripts/getChromSize.py @@ -0,0 +1,11 @@ +from f import Bioseq +import sys + +sys + + + + + +with open (snakemake.output[0], 'w') as genome_file: + genome_file.write(f"{genome}\t{size}") diff --git a/scripts/getnewCliques.py b/scripts/getnewCliques.py new file mode 100755 index 0000000..9f012e5 --- /dev/null +++ b/scripts/getnewCliques.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python + +import sys + +#TODO: Utiliser les champs ms pour determiner les nouvelles cliques ---> essayer de trouver une meilleure solution + +clq_copy_A = sys.argv[1] +clq_copy_B = sys.argv[2] +clique_diff = sys.argv[3] + + +with open(clq_copy_A, 'r') as statsA: + ms_dict = {} + next(statsA) + for line in statsA: + ms = line.split()[5] + if ms not in ms_dict: + ms_dict[ms] = line.split()[0] + #TODO: check avec un dictionnaire si les valeurs de ms_correspondent et donc si les cliques sont deja existantes + + +with open(clq_copy_B, 'r') as statsB: + with open(clique_diff, 'w') as output: + next(statsB) + id_list =[] + ms_count = {} + for line in statsB: + ms = line.split()[5] + if ms not in ms_dict.keys(): + ms_count[line.split()[0]] = {'ct': 0, 'size': int(line.split()[7])} + output.write(line) + + if line.split()[0] not in id_list: + id_list.append(line.split()[0]) + +with open(clique_diff, 'r')as a: + for line in a: + if line.split()[0] in ms_count: + #print(line) + ms_count[line.split()[0]]['ct'] += 1 + + +list_new = [] +list_expanded = [] + +for i in ms_count: + if ms_count[i]['ct'] == ms_count[i]['size']: + list_new.append(i) + else: + list_expanded.append(i) +print(f"new_cliques_nb = {len(list_new)}\nexpanded_cliques+some new_cliques_nb = {len(list_expanded)}\n") +print(f"new_cliques: {list_new}\n") +print(f"expanded_cliques+some new_cliques\t{list_expanded}") + +# print(len(list_new), len(list_expanded)) +# print(list_new) +# print(list_expanded) + + + +#print(ms_count) + +# with open('cliques_stats_filter5.csv', 'r') as doc: +# with open('cliques_stats_filter5_diff.csv', 'w') as doc2: +# for line in doc: +# if line.split()[0] in id_list: +# doc2.write(line) diff --git a/scripts/reformat.py b/scripts/reformat.py new file mode 100755 index 0000000..8968796 --- /dev/null +++ b/scripts/reformat.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +from Bio import SeqIO +import re + +accession = snakemake.params[0] + + +# Open the gff file and stores in a list the identification data for each copy +with open(snakemake.input[1], 'r') as gff: + header_list = [] + for line in gff: + chrom = line.split()[0] + gff_id = re.search("ID=(.+?)_.+Target=(.+?)\s", line) + copy = (gff_id.group(1)) + consensus = gff_id.group(2) + header_list.append(f"{chrom}/{copy}/{consensus}") + +# Open the sequences file and reformat it in fasta60 format while updating headers +with open(snakemake.input[0], 'r') as file: + with open(snakemake.output[0], "w") as reformated: + header_iterator = 0 + for record in SeqIO.parse(file, "fasta-2line"): + pos = re.search("^.+:(.+)", record.id).group(1) + record.description = f"{accession}/{ header_list[header_iterator]}/{pos}" + record.id = f"{accession}/{ header_list[header_iterator]}/{pos}" + SeqIO.write(record, reformated, "fasta") + header_iterator += 1 diff --git a/scripts/run_Snakemake_Chr4.sh b/scripts/run_Snakemake_Chr4.sh new file mode 100755 index 0000000..7a89914 --- /dev/null +++ b/scripts/run_Snakemake_Chr4.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +export PATH=/home/mbuy/software/miniconda3/envs/Snakemake/bin:/home/mbuy/software/miniconda3/condabin:/usr/local/go/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/home/mbuy/software/TE_finder/bin:/home/mbuy/software/TE_finder/bin +#snakemake -s Snakemake/Snakefile --use-conda +#snakemake -s Snakefile_Chr4 --use-conda --configfile config/configCuratedLibwot4gen.yaml --core=1 +#snakemake -s Snakefile_Chr4 --use-conda --configfile config/config_chr4.yaml --core=1 +snakemake -s Snakefile_Test --use-conda --configfile config/config_Test.yaml --core=1 diff --git a/scripts/stats.py b/scripts/stats.py new file mode 100755 index 0000000..7d29679 --- /dev/null +++ b/scripts/stats.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +import sys, statistics + +cliques_stats = sys.argv[1] +cliques_copy = sys.argv[2] +cliques_heterogen = sys.argv[3] + +with open(cliques_stats, 'r') as clq_info: + next(clq_info) + core = 0 + dispensable = 0 + mean_clq_size = 0 + nb_clq = 0 + KBS_Col = 0 + Ler_Col = 0 + KBS_Ler = 0 + + list_clq = [] + for line in clq_info: + nb_clq += 1 + if line.split()[2] == 'core': + core += 1 + else: + dispensable += 1 + + list_clq.append(line.split()[0]) + + +with open(cliques_copy, 'r') as clq: + next(clq) + nb_copy = 0 + nb_copy_per_accessions = {} + heterogen = {} + list_length = [] + for line in clq: + nb_copy += 1 + list_length.append(int(line.split()[4])-int(line.split()[3])) + # if int(line.split()[4])-int(line.split()[3]) -1000 == : + # print(line) + if line.split()[1] not in nb_copy_per_accessions: + nb_copy_per_accessions[line.split()[1]] = 1 + else: + nb_copy_per_accessions[line.split()[1]] += 1 + + if line.split()[0] in list_clq: + if str(line.split()[0]) in heterogen: + if str(line.split()[6]) not in heterogen[str(line.split()[0])]: + heterogen[str(line.split()[0])][line.split()[6]] = {'copy_length': int(line.split()[4])-int(line.split()[3]), + 'size_clique': line.split()[7]} + else: + heterogen[str(line.split()[0])] = {} + heterogen[str(line.split()[0])][line.split()[6]] ={'copy_length': int(line.split()[4])-int(line.split()[3]), + 'size_clique': line.split()[7]} + + het = 0 + non_het = 0 + with open(cliques_heterogen, 'w') as het_file: + het_file.write(f"Name and size of different consensus in clique separated by /\n" + f"*******************************************\n") + het_file.write(f"id_clq\tconsensus\testimated_consensus_size\tnb_consensus\tsize_clq\n") + for i in heterogen: + + if len(heterogen[i]) >= 2: + nb_cons = (len(heterogen[i])) + cons = '' + size ='' + for j in heterogen[i]: + cons += str(j)+'/' + size += str(heterogen[i][j]['copy_length'])+'/' + size_clq = heterogen[i][j]['size_clique'] + #print(i, heterogen[i]) + + het_file.write(f"{i}\t{cons}\t{size}\t{nb_cons}\t{size_clq}\n") + het += 1 + else: + non_het += 1 + + print(het, non_het) + +print(f"nb cliques = {nb_clq}\n" + f"nb copy = {nb_copy}\n" + f"core = {core}\n" + f"dispensable = {dispensable}\n" + f"%core = {core/nb_clq}\n" + f"{nb_copy_per_accessions}\n" + f"FLF size:\n" + f"min = {min(list_length)}\nmax = {max(list_length)}\nmedian = {statistics.median(list_length)}\n " + f"mean = {statistics.mean(list_length)}" + ) diff --git a/scripts/test_bestHits.py b/scripts/test_bestHits.py new file mode 100755 index 0000000..2886632 --- /dev/null +++ b/scripts/test_bestHits.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 + +import os +import unittest +from bestHits import BestBijectiveHits as bbh + + + +class TestBestHits(unittest.TestCase): + + def test_getBestHit(self): + + with open ('testTabFile.tab','w') as test_file: + test_file.write('query.name\tquery.start\tquery.end\tquery.length.%\tmatch.length.%\tsubject.name\t' + 'subject.start\tsubject.end\tsubject.length.%\tE.value\tScore\tIdentity\tpath\n') + i=1 + score=900 + while i <= 3 : + test_file.write(f"expected{i}\tqstart\tqend\tqlength\tqlength%\t0.95\tname{i}\tsstart\tsend\tslength\t" + f"slength%\tevalue\t{score}\tidty\tpath\n") + test_file.write(f"expected{i}\tqstart\tqend\tqlength\tqlength%\t0.95\tname{i}\tsstart\tsend\tslength\t" + f"slength%\tevalue\t{score-250}\tidty\tpath\n") + i+=1 + score+=100 + + exp={} + exp['expected1'] = ('name1', '900' , '0.95') + exp['expected2'] = ('name2', '1000', '0.95') + exp['expected3'] = ('name3', '1100', '0.95') + + + obs = bbh.getBestHit(self, 'testTabFile.tab') + self.assertEqual(obs, exp) + os.remove('testTabFile.tab') + + def test_getBijective(self): + + test_bij = {} + test_bij['expected1'] = ('expected4', 900, 0.95) + test_bij['expected2'] = ('expected5', 950, 0.95) + test_bij['expected3'] = ('expected6', 1000, 0.95) + + test_bij2 = {} + test_bij2['expected4'] = ('expected1', 900, 0.95) + test_bij2['expected5'] = ('expected3', 800, 0.95) + test_bij2['expected6'] = ('expected2', 1100, 0.95) + + + expected_pair = [('expected1', 'expected4')] + + obs = bbh.getBijective(self, test_bij, test_bij2) + self.assertEqual(obs, expected_pair) + + def test_writeOutput(self): + a=1 + + +if __name__ == '__main__': + unittest.main(argv=['first-arg-is-ignored'], exit=False) + #unittest.main() + diff --git a/scripts/test_cliques.py b/scripts/test_cliques.py new file mode 100755 index 0000000..f510c60 --- /dev/null +++ b/scripts/test_cliques.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +import unittest +import os +from cliques import Cliques + +class Test_Cliques(unittest.TestCase): + + def test_getCoreAndDispensableET(self): + + with open('testBijectiveMatch.txt', 'w') as test_file: + + nb_accession = 4 + + test_file.write(f"Ac1:Copy1\tAc3:Copy2\n" + f"Ac4:Copy3\tAc3:Copy2\n" + f"Ac2:Copy2\tAc4:Copy3\n" + f"Ac1:Copy1\tAc2:Copy2\n" + f"Ac1:Copy1\tAc4:Copy3\n" + f"Ac2:Copy2\tAc3:Copy2\n" + + f"Ac1:Copy2\tAc3:Copy4\n" + f"Ac2:Copy4\tAc3:Copy4\n" + f"Ac1:Copy2\tAc2:Copy4\n" + f"Ac3:Copy4\tAc4:Copy1\n" + f"Ac2:Copy4\tAc4:Copy1\n" + + f"Ac2:Copy3\tAc1:Copy3\n" + f"Ac3:Copy1\tAc4:Copy2\n" + f"Ac1:Copy4\tAc4:Copy4\n") + + expected_core = [['Ac1:Copy1', 'Ac3:Copy2', 'Ac4:Copy3', 'Ac2:Copy2']] + expected_dispensable = [['Ac1:Copy2', 'Ac3:Copy4', 'Ac2:Copy4'], + ['Ac2:Copy4', 'Ac3:Copy4', 'Ac4:Copy1'], + ['Ac2:Copy3', 'Ac1:Copy3'], + ['Ac3:Copy1', 'Ac4:Copy2'], + ['Ac1:Copy4', 'Ac4:Copy4']] + + obs = Cliques.getCoreAndDispensableET(self, 'testBijectiveMatch.txt', nb_accession) + print(f"core ========= {obs[0]}") + print(f"disp ========= {obs[1]}") + self.assertEqual(obs, (expected_core, expected_dispensable)) + os.remove('testBijectiveMatch.txt') + + + +if __name__ == '__main__': + unittest.main(argv=['first-arg-is-ignored'], exit=False) -- GitLab