Source code for hylite.Output_manager

#!/usr/bin/env python3

#    (c) Copyright 2013-2018 Murray Cox, Wandrille Duchemin, Pierre-Yves Dupont.
#
#
#    This file is part of HyLiTE.
#
#    HyLiTE is a free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License version 3 as published by
#    the Free Software Foundation.
#
#    HyLiTE is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with HyLiTE.  If not, see <http://www.gnu.org/licenses/>


#===============================#
# author: Wandrille Duchemin    #
#         Pierre-Yves Dupont    #
#         Murray Cox            #
# last modified: 9 January 2018 #
#===============================#


# from SNP import SNP
# from Read import Read
# from itertools import combinations
from .Parameters import Parameters

[docs]class Output_manager: ''' This class manages the outputs of HyLiTE ''' def __init__(self): ''' This class manages the outputs of HyLiTE ''' self.referencefile = None #The reference file is now mandatory and is used to find all genes self.all_genes = None #Save all genes to avoid to have to recompute them self.expression_data = None #Save all expression data in a class variable
[docs] def write_snp(self, handle, lsnp, orgnames, header): '''Write the SNPs in the given handle Args: - handle (file object): a handle were the data will be written - lsnp (list): a list of SNPs - orgnames (list): a list of the organisms names - header (bool): True if the header of the file must be written ''' if header: handle.write("GENE\tPOS\tREF\tALT\t"+'\t'.join(orgnames)+'\n') for snp in lsnp: handle.write('\t'.join([snp.gene, str(snp.position), snp.ref, snp.alt] + [",".join([str(pre) for pre in snp.presence[org]]) for org in orgnames] )+"\n") handle.close() return
[docs] def write_snp_summary(self, handle_in, handle_out): '''Summarize the SNPs informations by gene and write them in the handle Args: - handle_in (file): handle of the file containing all the snps - handle_out (file): handle of the file to write the snp summary ''' header = handle_in.readline()#we get the header line #its structure is: GENE\tPOS\tREF\tALT\tCHILD\tP1\tP2... lorgname = header.strip().split('\t')[4:] #list of the organisms name, beginning by the child summary = dict() #key will be gene name, value will be a dictionnary having category as key and count as value lgene = self.get_all_genes() for gene in lgene: summary[gene] = {} lcat= lorgname[:] #list of every category created lcat.append('MASKED') lcat.append('COMMON') for l in handle_in:#for every snp ll = l.strip().split('\t') gene = ll[0] presence = [[int(j) for j in i.split(",")] for i in ll[4:]] #the presence information #presence is a list of list if len([True for i in presence if -1 in i])>0: #at least one -1 cat = 'MASKED' elif len([True for i in presence if 1 in i]) == len(presence): cat = 'COMMON' else: cat = '+'.join([lorgname[i] for i in range(len(lorgname)) if sum(presence[i])>=1]) #the name of the organisms that have a presence of 1 on at least one gene copy. if cat not in lcat: #The category is new lcat.append(cat) if (gene in summary) is False:#new gene summary[gene]=dict() if (cat in summary[gene]) is False:#new category for the gene summary[gene][cat] = 0 summary[gene][cat] += 1#iteration handle_in.close() #we can now write the header line = ["GENE"] line +=lcat handle_out.write('\t'.join(line) + '\n') #header line # lgene = summary.keys() # lgene.sort() for gene in lgene: #for each gene line = [gene] for cat in lcat: #for each category line.append(str(summary[gene].get(cat, 0))) handle_out.write('\t'.join(line) + '\n')#we write the summary line handle_out.close() return
[docs] def simplify_category(self, cat): """ Args: - cat (str): a read category Returns: (str): the read category, simplified """ new_cat = "" N = cat.endswith("+N") ##new SNP if N: cat = cat[:-2] ##trimming the "new" tag if cat.count("+") >0: ##at least once chimeric new_cat = "UNK" elif cat.count("|") > 0:## new_cat = "UNINFORMATIVE" else: new_cat = cat.strip("(").strip(")") ##removing those pesky parenthesis if N:##new SNP new_cat += "+N"##adding the suffix return new_cat
[docs] def write_read(self, handle, lreads, header, lorg): '''Write the reads of a sample in a specified handle Args: - handle (file object): a handle to write in - lreads (list): a list of reads - header (bool): True if the header of the file must be written ''' param = Parameters()##getting the parameters of the run full = param.get_param('FULL_OUPUT') #or else the __repr__ method of the Reads should suffice for now if header: header_line = "GENE\tORGANISM\tSAMPLE\tSTART\tSTOP\tCAT\tNEW\n" handle.write(header_line) for r in lreads: if full: handle.write(r.__repr__() + "\n") else: s = r.__str__() sl = s.split() ##the category is second to last sl[-2] = self.simplify_category(sl[-2]) handle.write("\t".join(sl) + "\n") handle.close() return
[docs] def get_all_genes(self): '''Function finding all genes in the reference file and saving them in a class variable. The name of the genes is found based on the same method as BioPython FastaIterator: the gene name is the first word of the fasta defline. ''' if self.all_genes is None: lgene = [] with open(self.referencefile) as inf: for line in inf: if line.startswith(">"): gene = line.strip()[1:].split(None, 1)[0] #default id parsing for fasta as in FastaIterator in BioPyhton lgene.append(gene) self.all_genes = list(lgene) return lgene else: return list(self.all_genes)
[docs] def write_read_summary(self, handle_in, handle_out, lorg): '''Write a summary of the reads of each genes in a sample in a specified handle Args: - handle_in (file): handle of the file containing all the reads - handle_out (file): handle of the file to write the read summary - lorg (list): list of organism names (the first is the child) ''' handle_in.readline()#we read the header summary = dict() #key will be gene name, value will be a dictionnary having category as key and count as value lcat=list() #list of every category created lgene = self.get_all_genes() for gene in lgene: summary[gene] = {} for l in handle_in: #structure of a line: GENE\tORGANISM\tSAMPLE\tSTART\tSTOP\tCAT\tNEW ll = l.strip().split('\t') gene = ll[0] cat = ll[5] new = ll[6] #Simplify the allele notation if cat != 'UNK' and cat != 'UNINFORMATIVE': for org in lorg: org = org.name if cat == org: break if cat.startswith(org): if cat[len(org)] == "_": if cat[len(org)+1:].isalnum(): cat = org break if new=='True': cat += '+N' if cat not in lcat: lcat.append(cat) if (gene in summary) is False: summary[gene]=dict() if (cat in summary[gene]) is False: summary[gene][cat] = 0 summary[gene][cat] += 1 handle_in.close() ##### ##removing the '+N', dividing the cat correctly new_lcat_simple_parent = [] new_lcat_composite_parent = [] new_lcat_other = [] for c in lcat: if not c.endswith("+N"): if c.count("|") == 0 and c.count("+") == 0 and c != "UNINFORMATIVE" and c!= "UNK": ##not UNINFORMATIVE or UNK or chimeric new_lcat_simple_parent.append(c) elif c.count("|") == 0 and c.count("+") == 0:##uninformative or unk new_lcat_other.append(c) new_lcat_other.append(c + "+N") else:##composite parent new_lcat_composite_parent.append(c) new_lcat_composite_parent.append(c + "+N") new_lcat_composite_parent.sort() new_lcat_other.sort() ##getting the parent in the right order ordered_lcat_simple_parent = [] for org in lorg[1:]:##for each parent: for c in new_lcat_simple_parent: if org.name in c: if c not in ordered_lcat_simple_parent: ordered_lcat_simple_parent.append(c) ordered_lcat_simple_parent.append(c + "+N") new_lcat_simple_parent.remove(c) # break lcat = ordered_lcat_simple_parent + new_lcat_composite_parent + new_lcat_other #we can now print the header line line = ["GENE"] line += lcat handle_out.write('\t'.join(line) + '\n') for gene in lgene: #for each gene line = [gene] for cat in lcat: #for each category line.append(str(summary[gene].get(cat, 0))) handle_out.write('\t'.join(line) + '\n')#we write the summary line handle_out.close() return
[docs] def precompute_expression_data_towrite(self, lorganism, header): '''Precompute the expression data to be written in a file for the genes in each organism and each RNA-seq sample Args: - handle (file object): a handle to write in - lorganism (list): a list of organisms - header (bool): True if the header of the file must be written ''' lsample=list() #list of sample to write # initialization of self.expression_data as a dict key=gene, value=a dictionary of values if not self.expression_data: self.expression_data = {} for gene in self.get_all_genes(): self.expression_data[gene] = {} for org in lorganism: for sample in org.sample_name: if org.sample_type[sample] != "RNAseq": #if the sample is not RNA-seq we are not interested in its expression continue lsample.append(org.name +'%'+sample) for gene in list(org.dexpression[sample].keys()): self.expression_data[gene][lsample[-1]] = org.dexpression[sample][gene] return
[docs] def write_complete_expression_file(self, outdir, name): '''Write the precomputed expression data in a file. All genes present in the reference file will be present in the expression file Args: - outdir (str): the directory to write the file in - name (str): the name of the file ''' samples = set() for g in list(self.expression_data.keys()): for s in list(self.expression_data[g].keys()): samples.add(s) samples = sorted(list(samples)) with open(outdir + name + '.expression.txt', 'w') as out: out.write("GENE\t%s\n" % "\t".join(samples)) for gene in self.get_all_genes(): line = [gene] for sample in samples: line.append(str(self.expression_data[gene].get(sample, 0))) out.write("%s\n" % "\t".join(line))
[docs] def write_run_summary(self, handle_out, lhandle_in_read, handle_in_snp, lorg): """ Writes a summary of the run's result Takes: - handle_out (file object): a handle to write in - lhandle_in_read (list): list of (file object): handle to the read summary file - handle_in_snp (file_object): a handle to the snp summary file - lorg (list): list of organism names (the first is the child) """ dcounts = {'total_read': 0, 'unambiguous_parent': 0, 'UNINFORMATIVE': 0, 'UNKNOWN': 0, }#dictionnary of the different count we are interested in parent_names = [] for o in lorg[1:]: #for each parent dcounts[o.name] = 0 parent_names.append(o.name) ###we will begin with the read_summaries for handle_in_read in lhandle_in_read: header = [i.strip() for i in handle_in_read.readline().split()] catmap = [] ##mapping the different columns to the counts of interest for i in header[1:]: if i.endswith('+N'): ## removing the +N flag i = i[:-2] if i.count("(")>0: ## removing parenthesis i = i.strip("(").strip(")") if i in list(dcounts.keys()): catmap.append(i)##typically the parent elif i.count('|') > 0:##uninformative catmap.append('UNINFORMATIVE') else:##UNK catmap.append('UNKNOWN') ##now we can read the summary for l in handle_in_read:##for each line sl = [i.strip() for i in l.split()] for i, n in enumerate(sl[1:]):##for each category in the line dcounts['total_read'] += int(n)#always update total count dcounts[catmap[i]] += int(n)#updating the desired count if catmap[i] in parent_names:#updating the unambiguous parent count dcounts['unambiguous_parent'] += int(n) handle_in_read.close() ## now the snp summary dcounts['total_snp'] = 0 dcounts['child_snp'] = 0 handle_in_snp.readline()#reading the header for l in handle_in_snp: sl = [i.strip() for i in l.split()] nbs = [int(i) for i in sl[1:]]#the first field is the gene name dcounts['total_snp'] += sum(nbs) dcounts['child_snp'] += nbs[0] #child only snps are in the first field handle_in_snp.close() ##Now, we can write the run summary handle_out.write("Total number of child reads mapping on the reference:\t%d\n"%dcounts['total_read']) handle_out.write("Number of child read unambiguously assigned to a parent:\t%d\n"%dcounts['unambiguous_parent']) for p in parent_names: handle_out.write("Number of child read unambiguously assigned to " + p + ":\t%d\n"%dcounts[p]) handle_out.write("Number of child read with uninformative assignment:\t%d\n"%dcounts['UNINFORMATIVE']) handle_out.write("Number of child read with unknown or ambiguous assignement:\t%d\n"%dcounts['UNKNOWN']) handle_out.write("Total number of SNPs identified:\t%d\n"%dcounts['total_snp']) handle_out.write("Total number of child unique SNPs:\t%d\n"%dcounts['child_snp']) handle_out.close() return
if __name__ == "__main__": print('testing the simplify category function') OM = Output_manager() cats = ['(P1)', '(P1)+N', '(P1)|(P2)', '((P1)|(P2))+N', '(P2+P1)', '(P2+P1)+N', 'UNK'] for c in cats: print(c, "->", OM.simplify_category(c)) """ should get: (P1) -> P1 (P1)+N -> P1+N (P1)|(P2) -> COMMON ((P1)|(P2))+N -> COMMON+N (P2+P1) -> UNK (P2+P1)+N -> UNK+N UNK -> UNK """