#!/usr/bin/env python3
# (c) Copyright 2013-2018 Murray Cox, Wandrille Duchemin, Pierre-Yves Dupont.
#
#
# This file is part of HyLiTE.
#
# HyLiTE is a free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
# HyLiTE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with HyLiTE. If not, see <http://www.gnu.org/licenses/>
#===============================#
# author: Wandrille Duchemin #
# Pierre-Yves Dupont #
# Murray Cox #
# last modified: 9 January 2018 #
#===============================#
from .Parameters import Parameters
from .Pileup_Reader import Pileup_Reader
from .Lane import Lane #import Read too
from .Read import Read
from .SNP import SNP
from .Organism import Organism
import sys
from scipy.stats import binom
[docs]class Snp_Generator():
'''
This class contains the main loops of HyLiTE algorithm.
It perform SNP call, expression count and read categorization on a single read of the pileup file.
Attributes:
- pilereader (Pileup_Reader)
- current_gene (str)
- current_position (int)
- lsnp (list): list of SNP
- lorganism (list): list of Organism
- dread (dict): dict of list of Read (keys are sample name)
- current_reference (str): reference for the current position on the current gene
- dlane (dict): keys are organism name, value is the list of Lane for said organism
- ppf_memory (dict): keys are coverage value, values are associated expected minnimum count for non-error bases
'''
def __init__(self, pileupfile, lsnp, lorganism, dread):
'''
Args:
- pileupfile (str): name of the .pileup file containing all the organism alignments
- lsnp (list): list of SNP, presumably empty at the moment
- lorganism (list): list of organisms CHILD IS ALWAYS THE FIRST ELEMENT OF THE LIST
- lread (list): list of reads, presumably empty at the moment
'''
self.pilereader = Pileup_Reader(pileupfile)
self.lsnp=lsnp
self.lorganism=lorganism
self.dread=dread
self.dlane = dict((x.name, [Lane(i) for i in range(len(x.sample_name))]) for x in self.lorganism) #python 2.7: {x.name : [Lane(i) for i in range(len(x.sample_name))] for x in self.lorganism}
#keys are organism name, value is the list of Lane for said organism
self.current_gene=None
self.current_position=None
self.ppf_memory = dict()
return
[docs] def detect_snp(self, ref, count, ploidy):
'''Call the SNPs
Args:
- ref (str): the reference letter,
- count (dict): a dictionary containing the count for each letter
- ploidy (int): the ploidy of the concerned organism
Returns:
- list. a list (of length equal to the ploidy) containing the possible allele (including reference) at this location
'''
coverage = sum(count.values())
# H0 = "this base is an error"
# we estimate error occurs with probability p, then the number of each error type in our pile follow a binomial law of parameters <size of the pile> and <p>
#Then the expected count for a non error character should be given by this formulae:
if coverage not in self.ppf_memory:
self.ppf_memory[coverage] = max(2, 1+binom.ppf(1.-Parameters().get_param('ALPHA'), coverage, Parameters().get_param('EXPECTED_ERROR_RATE')))
expected_count = self.ppf_memory[coverage]
#note that we define a minimum of 2 occurrences to call a non-error character as a safety measure for the low coverage regions
accepted=dict()
for letter in list(count.keys()):
#The quality don't seem to be relevant of the errors position so we just use the raw count
if count[letter] >= expected_count: #the letter is an acceptable allele and does not results from error
accepted[letter]=count[letter]
allele = list() #we will now keep only the <ploidy> best allele
for i in range(min(ploidy, len(accepted))):
max_key = max(accepted, key=accepted.get)
allele.append(max_key)
accepted.pop(max_key)
#print self.current_position,coverage,expected_count, count, allele
if len(allele)==0: #not impossible, just very improbable
allele.append(ref)# we could also return empty list, which would mean : bad coverage/data quality
return allele
[docs] def create_snp(self, lallele):
'''Create the necessary number of SNP for this location (possibly 0).
Args:
- lallele (list): a list containing a list of the allele for each organism (empty if the coverage is bad).
Returns:
- list. a list of the index of the new SNPs in self.lsnp
'''
new_snp = dict() #will contain the the newly_created SNPs with Alternate base as key
for i, allele in enumerate(lallele): #for each organism
for j, a in enumerate(allele): #for each allele
if a != self.current_reference and new_snp.get(a, False) is False: #not the ref and not yet in new_snp -> the SNP is new
new_snp[a]=SNP(self.current_gene, self.current_position, self.current_reference, a) #creation of the new_snp
if a != self.current_reference:#not the reference
if i !=0:#not the child
new_snp[a].has_SNP(self.lorganism[i].name, j)#updating the Snp
else:#child
new_snp[a].has_SNP(self.lorganism[i].name, -1)#updating the Snp
#Adding absence data
for i, allele in enumerate(lallele): #for each organism
for j, a in enumerate(allele): #for each allele
for snp in list(new_snp.keys()):
if a != snp:
if i != 0:#not the child
new_snp[snp].no_SNP(self.lorganism[i].name, j)#absence
else:#child
new_snp[snp].no_SNP(self.lorganism[i].name, -1)#absence
#We now have every SNP at this position
#We will add the bad coverage data
for i, allele in enumerate(lallele): #for each organism
if len(allele)==0: #this means bad coverage for organism i
for snp in list(new_snp.values()):
for j in range(self.lorganism[i].ploidy):
snp.no_coverage(self.lorganism[i].name)
##
## #to trash
## if a == self.current_reference: #this is not a SNP
## continue
## if new_snp.get(a,False) is False: #the SNP is new
## new_snp[a]=SNP(self.current_gene,self.current_position,self.current_reference,a) #creation of the new_snp
## new_snp[a].has_SNP(self.lorganism[i].name)#updating the Snp
##
## #We now have every SNP at this position
## #We will add the bad coverage data
## for i,allele in enumerate(lallele): #for each organism
## if len(allele)==0: #this means bad coverage for organism i
## for snp in new_snp.values():
## snp.no_coverage(self.lorganism[i].name)
#We now have to had the information about absence of the SNP
#and finally add the new snps to lsnp and stock their index
new_index = list()
for snp in list(new_snp.values()):
#snp is now complete
new_index.append(len(self.lsnp))
self.lsnp.append(snp)
return new_index
[docs] def decompose_pileup_line(self, line):
'''Update the different Lanes with the current line information
Args:
- line (str): a line coming from a pileup file
'''
splitted_line = line.strip().split('\t')
#structure of the line:
#GENE POS REF COV1 PILE1 QUAL1 COV2 PILE2 QUAL2 ...
self.current_gene= splitted_line[0]
self.current_position= int(splitted_line[1])
self.current_reference = splitted_line[2]
#a lane is 3 column long
i=1 #count the number of treated lane
for org in self.lorganism: #for each organism
for j, lane in enumerate(self.dlane[org.name]):#for each sample_index, sample
lane.update(int(splitted_line[i*3]), splitted_line[i*3 +1 ], splitted_line[i*3 +2], org.name, org.sample_name[j], self.current_gene, self.current_position, self.current_reference)
i+=1
return
[docs] def treat_pileup_line(self, line):
'''Decompose the line; call SNPs; categorize and stock the reads of the different samples
Args:
- line (str): a line of the .pileup file
'''
self.decompose_pileup_line(line)
#information gathered
#reads have been created, stacked up for finishing, pile treated
#####################################
# SNPs calling
lallele = list()
#first begin by creating a dictionary of the count of each possible letter
for org in self.lorganism: #for each organism
allele_count = {'A':0,'T':0,'C':0,'G':0}
for lane in self.dlane[org.name]: #for each lane
lane_count = lane.get_count()
for l in list(allele_count.keys()):
allele_count[l]+= lane_count[l]
#We check the coverage of this region
if org.ploidy == 1:#the organism is an haploid
min_coverage = Parameters().get_param('MIN_COVERAGE_HAPLOID')
else:
min_coverage = Parameters().get_param('MIN_COVERAGE_POLYPLOID')
covered = sum(allele_count.values())>=min_coverage #boolean set to True if the coverage is good enough
if covered:
allele = self.detect_snp(self.current_reference, allele_count, org.ploidy)
else:
allele = []
lallele.append(allele)
# lallele contain a list of the allele of each organism at the current position
#TAG UPDATE
#we want to update the tags of the parents
for i in range(1, len(lallele)): #for each parent (the child is the first)
if len(lallele[i]) ==0 or self.lorganism[i].ploidy==1:#bad coverage or haploid parent, do nothing
continue
elif len(lallele[i]) ==1:
lallele[i] = [lallele[i][0], lallele[i][0]] #only one genotype at this position, we don't update the tags
else:
lallele[i] = self.update_tags(lallele[i], self.lorganism[i].name)
#the tags are updated, the list is alleles are completed too
#creating the snps
new_index = self.create_snp(lallele)
#for snp in new_index:
# print 'snp detected',self.current_position,snp,self.lsnp[snp].alt,self.lsnp[snp].presence
############################################
#we now have created the new snps and we have their index in new_index
#we can now add the snps to the open reads of the child only
for i in new_index: #for each new_snp
if self.lsnp[i].masked==True: #at least one organism has bad coverage for this position: all the SNPs at this position are bad covered
break
for lane in self.dlane[self.lorganism[0].name]: #for each lane of the child
lane.add_snp(i, self.lsnp[i].alt)
############################################
#all the reads display their snps, we can now update the fingerprint of the parents
for parent_index in range(1, len(self.lorganism)): #for each parent index. REMEMBER: CHILD ALWAYS COMES FIRST
for snp_index in new_index: #for each new SNP
if self.lsnp[snp_index].masked!=True: #The SNP isn't masked
#conditions here will more complicated for a polyploid parent ... actually, no :-)
self.lorganism[parent_index].add_snp(self.current_gene, self.current_position, snp_index, self.lsnp[snp_index].presence[self.lorganism[parent_index].name], None)
##############################
#We will now take care of the finishing reads
for org_index in range(len(self.lorganism)): #for each organism
for sample_index in range(len(self.lorganism[org_index].sample_name)): #for each sample
finish = self.dlane[self.lorganism[org_index].name][sample_index].finish_reads() #this return the list of the finished reads
self.lorganism[org_index].add_expression(self.lorganism[org_index].sample_name[sample_index], self.current_gene, len(finish))
if org_index == 0: #this mean that we are dealing with the child
for r in finish: #for each finishing read
fprints = dict((o.name, o.get_all_fingerprint(self.current_gene, r.start, r.stop)) for o in self.lorganism[1:])#{o.name : o.get_all_fingerprint(self.current_gene,r.start,r.stop) for o in self.lorganism[1:]} #generation of all the finger prints of all the parents
r.categorize(fprints, self.lsnp)
self.dread[self.lorganism[org_index].sample_name[sample_index]].append(r)
#piles have been read
#SNPs have been detected and categorized
#reads have been updated
#closed read have been categorized and stocked
return
[docs] def give_line(self):
'''
Yields:
- str. a line from the pileup file
'''
for l in self.pilereader.give_line():
yield l
[docs] def close_pileup(self):
'''
Close the pileup file
'''
self.pilereader.close()
return
[docs] def restaure(self):
'''
Re-open the pileup file
'''
filename = self.pilereader.filename
self.pilereader = Pileup_Reader(filename)
return
[docs] def get_nb_reads(self):
'''
Returns:
- int. the number of opened reads
'''
return [[lane.get_nb_reads() for lane in self.dlane[o.name]] for o in self.lorganism]