Source code for hylite.Snp_Generator

#!/usr/bin/env python3

#    (c) Copyright 2013-2018 Murray Cox, Wandrille Duchemin, Pierre-Yves Dupont.
#
#
#    This file is part of HyLiTE.
#
#    HyLiTE is a free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License version 3 as published by
#    the Free Software Foundation.
#
#    HyLiTE is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with HyLiTE.  If not, see <http://www.gnu.org/licenses/>


#===============================#
# author: Wandrille Duchemin    #
#         Pierre-Yves Dupont    #
#         Murray Cox            #
# last modified: 9 January 2018 #
#===============================#


from .Parameters import Parameters
from .Pileup_Reader import Pileup_Reader
from .Lane import Lane #import Read too
from .Read import Read
from .SNP import SNP
from .Organism import Organism


import sys
from scipy.stats import binom



[docs]class Snp_Generator(): ''' This class contains the main loops of HyLiTE algorithm. It perform SNP call, expression count and read categorization on a single read of the pileup file. Attributes: - pilereader (Pileup_Reader) - current_gene (str) - current_position (int) - lsnp (list): list of SNP - lorganism (list): list of Organism - dread (dict): dict of list of Read (keys are sample name) - current_reference (str): reference for the current position on the current gene - dlane (dict): keys are organism name, value is the list of Lane for said organism - ppf_memory (dict): keys are coverage value, values are associated expected minnimum count for non-error bases ''' def __init__(self, pileupfile, lsnp, lorganism, dread): ''' Args: - pileupfile (str): name of the .pileup file containing all the organism alignments - lsnp (list): list of SNP, presumably empty at the moment - lorganism (list): list of organisms CHILD IS ALWAYS THE FIRST ELEMENT OF THE LIST - lread (list): list of reads, presumably empty at the moment ''' self.pilereader = Pileup_Reader(pileupfile) self.lsnp=lsnp self.lorganism=lorganism self.dread=dread self.dlane = dict((x.name, [Lane(i) for i in range(len(x.sample_name))]) for x in self.lorganism) #python 2.7: {x.name : [Lane(i) for i in range(len(x.sample_name))] for x in self.lorganism} #keys are organism name, value is the list of Lane for said organism self.current_gene=None self.current_position=None self.ppf_memory = dict() return
[docs] def detect_snp(self, ref, count, ploidy): '''Call the SNPs Args: - ref (str): the reference letter, - count (dict): a dictionary containing the count for each letter - ploidy (int): the ploidy of the concerned organism Returns: - list. a list (of length equal to the ploidy) containing the possible allele (including reference) at this location ''' coverage = sum(count.values()) # H0 = "this base is an error" # we estimate error occurs with probability p, then the number of each error type in our pile follow a binomial law of parameters <size of the pile> and <p> #Then the expected count for a non error character should be given by this formulae: if coverage not in self.ppf_memory: self.ppf_memory[coverage] = max(2, 1+binom.ppf(1.-Parameters().get_param('ALPHA'), coverage, Parameters().get_param('EXPECTED_ERROR_RATE'))) expected_count = self.ppf_memory[coverage] #note that we define a minimum of 2 occurrences to call a non-error character as a safety measure for the low coverage regions accepted=dict() for letter in list(count.keys()): #The quality don't seem to be relevant of the errors position so we just use the raw count if count[letter] >= expected_count: #the letter is an acceptable allele and does not results from error accepted[letter]=count[letter] allele = list() #we will now keep only the <ploidy> best allele for i in range(min(ploidy, len(accepted))): max_key = max(accepted, key=accepted.get) allele.append(max_key) accepted.pop(max_key) #print self.current_position,coverage,expected_count, count, allele if len(allele)==0: #not impossible, just very improbable allele.append(ref)# we could also return empty list, which would mean : bad coverage/data quality return allele
[docs] def create_snp(self, lallele): '''Create the necessary number of SNP for this location (possibly 0). Args: - lallele (list): a list containing a list of the allele for each organism (empty if the coverage is bad). Returns: - list. a list of the index of the new SNPs in self.lsnp ''' new_snp = dict() #will contain the the newly_created SNPs with Alternate base as key for i, allele in enumerate(lallele): #for each organism for j, a in enumerate(allele): #for each allele if a != self.current_reference and new_snp.get(a, False) is False: #not the ref and not yet in new_snp -> the SNP is new new_snp[a]=SNP(self.current_gene, self.current_position, self.current_reference, a) #creation of the new_snp if a != self.current_reference:#not the reference if i !=0:#not the child new_snp[a].has_SNP(self.lorganism[i].name, j)#updating the Snp else:#child new_snp[a].has_SNP(self.lorganism[i].name, -1)#updating the Snp #Adding absence data for i, allele in enumerate(lallele): #for each organism for j, a in enumerate(allele): #for each allele for snp in list(new_snp.keys()): if a != snp: if i != 0:#not the child new_snp[snp].no_SNP(self.lorganism[i].name, j)#absence else:#child new_snp[snp].no_SNP(self.lorganism[i].name, -1)#absence #We now have every SNP at this position #We will add the bad coverage data for i, allele in enumerate(lallele): #for each organism if len(allele)==0: #this means bad coverage for organism i for snp in list(new_snp.values()): for j in range(self.lorganism[i].ploidy): snp.no_coverage(self.lorganism[i].name) ## ## #to trash ## if a == self.current_reference: #this is not a SNP ## continue ## if new_snp.get(a,False) is False: #the SNP is new ## new_snp[a]=SNP(self.current_gene,self.current_position,self.current_reference,a) #creation of the new_snp ## new_snp[a].has_SNP(self.lorganism[i].name)#updating the Snp ## ## #We now have every SNP at this position ## #We will add the bad coverage data ## for i,allele in enumerate(lallele): #for each organism ## if len(allele)==0: #this means bad coverage for organism i ## for snp in new_snp.values(): ## snp.no_coverage(self.lorganism[i].name) #We now have to had the information about absence of the SNP #and finally add the new snps to lsnp and stock their index new_index = list() for snp in list(new_snp.values()): #snp is now complete new_index.append(len(self.lsnp)) self.lsnp.append(snp) return new_index
[docs] def decompose_pileup_line(self, line): '''Update the different Lanes with the current line information Args: - line (str): a line coming from a pileup file ''' splitted_line = line.strip().split('\t') #structure of the line: #GENE POS REF COV1 PILE1 QUAL1 COV2 PILE2 QUAL2 ... self.current_gene= splitted_line[0] self.current_position= int(splitted_line[1]) self.current_reference = splitted_line[2] #a lane is 3 column long i=1 #count the number of treated lane for org in self.lorganism: #for each organism for j, lane in enumerate(self.dlane[org.name]):#for each sample_index, sample lane.update(int(splitted_line[i*3]), splitted_line[i*3 +1 ], splitted_line[i*3 +2], org.name, org.sample_name[j], self.current_gene, self.current_position, self.current_reference) i+=1 return
[docs] def treat_pileup_line(self, line): '''Decompose the line; call SNPs; categorize and stock the reads of the different samples Args: - line (str): a line of the .pileup file ''' self.decompose_pileup_line(line) #information gathered #reads have been created, stacked up for finishing, pile treated ##################################### # SNPs calling lallele = list() #first begin by creating a dictionary of the count of each possible letter for org in self.lorganism: #for each organism allele_count = {'A':0,'T':0,'C':0,'G':0} for lane in self.dlane[org.name]: #for each lane lane_count = lane.get_count() for l in list(allele_count.keys()): allele_count[l]+= lane_count[l] #We check the coverage of this region if org.ploidy == 1:#the organism is an haploid min_coverage = Parameters().get_param('MIN_COVERAGE_HAPLOID') else: min_coverage = Parameters().get_param('MIN_COVERAGE_POLYPLOID') covered = sum(allele_count.values())>=min_coverage #boolean set to True if the coverage is good enough if covered: allele = self.detect_snp(self.current_reference, allele_count, org.ploidy) else: allele = [] lallele.append(allele) # lallele contain a list of the allele of each organism at the current position #TAG UPDATE #we want to update the tags of the parents for i in range(1, len(lallele)): #for each parent (the child is the first) if len(lallele[i]) ==0 or self.lorganism[i].ploidy==1:#bad coverage or haploid parent, do nothing continue elif len(lallele[i]) ==1: lallele[i] = [lallele[i][0], lallele[i][0]] #only one genotype at this position, we don't update the tags else: lallele[i] = self.update_tags(lallele[i], self.lorganism[i].name) #the tags are updated, the list is alleles are completed too #creating the snps new_index = self.create_snp(lallele) #for snp in new_index: # print 'snp detected',self.current_position,snp,self.lsnp[snp].alt,self.lsnp[snp].presence ############################################ #we now have created the new snps and we have their index in new_index #we can now add the snps to the open reads of the child only for i in new_index: #for each new_snp if self.lsnp[i].masked==True: #at least one organism has bad coverage for this position: all the SNPs at this position are bad covered break for lane in self.dlane[self.lorganism[0].name]: #for each lane of the child lane.add_snp(i, self.lsnp[i].alt) ############################################ #all the reads display their snps, we can now update the fingerprint of the parents for parent_index in range(1, len(self.lorganism)): #for each parent index. REMEMBER: CHILD ALWAYS COMES FIRST for snp_index in new_index: #for each new SNP if self.lsnp[snp_index].masked!=True: #The SNP isn't masked #conditions here will more complicated for a polyploid parent ... actually, no :-) self.lorganism[parent_index].add_snp(self.current_gene, self.current_position, snp_index, self.lsnp[snp_index].presence[self.lorganism[parent_index].name], None) ############################## #We will now take care of the finishing reads for org_index in range(len(self.lorganism)): #for each organism for sample_index in range(len(self.lorganism[org_index].sample_name)): #for each sample finish = self.dlane[self.lorganism[org_index].name][sample_index].finish_reads() #this return the list of the finished reads self.lorganism[org_index].add_expression(self.lorganism[org_index].sample_name[sample_index], self.current_gene, len(finish)) if org_index == 0: #this mean that we are dealing with the child for r in finish: #for each finishing read fprints = dict((o.name, o.get_all_fingerprint(self.current_gene, r.start, r.stop)) for o in self.lorganism[1:])#{o.name : o.get_all_fingerprint(self.current_gene,r.start,r.stop) for o in self.lorganism[1:]} #generation of all the finger prints of all the parents r.categorize(fprints, self.lsnp) self.dread[self.lorganism[org_index].sample_name[sample_index]].append(r) #piles have been read #SNPs have been detected and categorized #reads have been updated #closed read have been categorized and stocked return
[docs] def give_line(self): ''' Yields: - str. a line from the pileup file ''' for l in self.pilereader.give_line(): yield l
[docs] def close_pileup(self): ''' Close the pileup file ''' self.pilereader.close() return
[docs] def restaure(self): ''' Re-open the pileup file ''' filename = self.pilereader.filename self.pilereader = Pileup_Reader(filename) return
[docs] def get_nb_reads(self): ''' Returns: - int. the number of opened reads ''' return [[lane.get_nb_reads() for lane in self.dlane[o.name]] for o in self.lorganism]
[docs] def update_tags(self, allele, org_name): ''' Update the tags of the opened reads of one parent according to their content and the list of detected alleles at this position Args: - allele (list): list of the detected alleles at this position - org_name (str): name of the organism to update Returns: - list. list of detected alleles, ordered, accepting doublons !!!This is not made for more than diploid yet ''' dallele = dict()# dict of nucleotides content of each tag. i.e. each key is a tag, each value is a dictionnary that has key : nucleotide, value: nb of times encountered tag_to_allele = dict() #key is tag, value is associated allele #first look if at least one opened read has a gene copy assigned for lane in self.dlane[org_name]: #for each lane for i, r in enumerate(lane.opened_read): if r.tag != -1 : #tagged read if (r.tag in dallele) is False: dallele[r.tag] = dict() if lane.pile[i] in allele: #represent a valid character at this position dallele[r.tag][lane.pile[i]] = dallele[r.tag].get(lane.pile[i], 0) + 1 #print "tag updating" #print dallele #now we have to assign an allele to each represented tag for t in list(dallele.keys()):#for each tag if sum(dallele[t].values()) < 2: continue #less than two reads support this information, it seems hazardous to base our classification on it... for a in list(dallele[t].keys()):#for each allele if (dallele[t][a]*1.)/sum(dallele[t].values()) > 0.5: #the allele are the most frequent (remember: this is the DIPLOID case) tag_to_allele[t] = a # we definitely assign the allele a to this tag tag_to_allele[t] = tag_to_allele.get(t, "*") #either stay the same or get the value "*" if no clear majority have been detected # "*" mark the "unsure" label. to be used later #check if there is still several allele: if len(tag_to_allele) == 2: #works for two if tag_to_allele[0] == tag_to_allele[1]: #should not occur unless the data is horrible return [tag_to_allele[0], tag_to_allele[1]] #directly return without adding any tag to the reads elif len(tag_to_allele) == 2: if list(tag_to_allele.values())[0] != "*": #the assigned allele is not ambiguous #basically this means that the second allele ought to be the one beared by the second tag (i.e gene copy) tag_to_allele[1-list(tag_to_allele.keys())[0]] = allele[1-allele.find(list(tag_to_allele.keys())[0])] #works for 2 else: #no tags are assigned #we arbitrarily assign alleles to tags tag_to_allele[0] = allele[0] tag_to_allele[1] = allele[1] # at this point, we have two assigned label to the two tag, either the two allele (best case scenario) OR one good allele and a "*" (the case with two "*" is frankly impossible given that we use alleles that have already been detected with these reads) for lane in self.dlane[org_name]: #for each lane lane.update_tags([tag_to_allele[0], tag_to_allele[1]]) #print "->",tag_to_allele return [tag_to_allele[0], tag_to_allele[1]]