Source code for hylite.Lane

#!/usr/bin/env python3

#    (c) Copyright 2013-2018 Murray Cox, Wandrille Duchemin, Pierre-Yves Dupont.
#
#
#    This file is part of HyLiTE.
#
#    HyLiTE is a free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License version 3 as published by
#    the Free Software Foundation.
#
#    HyLiTE is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with HyLiTE.  If not, see <http://www.gnu.org/licenses/>


#===============================#
# author: Wandrille Duchemin    #
#         Murray Cox            #
# last modified: 9 January 2018 #
#===============================#


from .Parameters import Parameters
from .Read import Read
import sys

def phred_quality(char):
    return ord(char) - Parameters().get_param('PHRED_STANDARD')

[docs]class Lane: ''' This class is used to represent a lane in a pileup file (a lane is comprised of three column: coverage, pile, quality). Attributes: - index (int): its index in the pileup file - coverage (int): its current coverage - pile (str): its current pile - qual (list): its current quality - opened_read (list): a list of the opened read contained in the lane - pileup_param (dict): key is the id of the parameters, values is the parameter. This is mostly the character .pileup use to represent data ''' def __init__(self, index): '''This class is used to represent a lane in a pileup file (a lane is comprised of three column: coverage, pile, quality). Args: - index (int): the index of the lane in a pileup file ''' self.index = index self.coverage = 0 self.pile=[] self.qual=[] self.opened_read = list() self.finishing_reads = list() #list of the index of finishing reads self.pileup_param=dict() param = Parameters() self.pileup_param['EMPTY_CHAR'] = param.get_param('EMPTY_CHAR') self.pileup_param['BEGIN_CHAR'] = param.get_param('BEGIN_CHAR') self.pileup_param['REF_CHAR'] = param.get_param('REF_CHAR') self.pileup_param['INDEL_CHAR'] = param.get_param('INDEL_CHAR') self.pileup_param['END_CHAR'] = param.get_param('END_CHAR') self.current_position = None return
[docs] def update(self, cov, pile, qual, organism, sample, gene, position, ref): '''Update the Lane with this information Args: - cov (int): a coverage - pile (str): a pile - qual (str): a quality str - organism (str): an organism name - sample (str): a sample name - gene (str): a gene name - position (int): a position - ref (str): the reference for the position ''' self.coverage = cov self.qual=qual#[phred_quality(c) for c in qual] self.pile=list() #re-initialization of the pile self.finishing_reads=list() i=0 # print position,cov,pile while i < len(pile):# BEWARE THE CONFUSION BETWEEN pile: a str and self.pile: a list ... sorry char_dec = 0 #how much do we have to put forward to find the alignment char if pile[i]==self.pileup_param['EMPTY_CHAR']: #which can mean really empty OR deletion pass #we don't do anything if pile[i] == self.pileup_param['BEGIN_CHAR']: # this mean that a read begin: ^<phred char><first char of the read> self.opened_read.append(Read(organism, sample, gene, position)) char_dec +=2 if pile[i+char_dec] in self.pileup_param['REF_CHAR']: self.pile.append(ref.upper()) else: self.pile.append(pile[i+char_dec].upper()) if (i+char_dec+1) < len(pile): if pile[i+char_dec+1] in self.pileup_param['INDEL_CHAR']: # this mean a deletion/insertion: <current char><+/-><size of INDEL><INDEL chars> pos = i+char_dec+1 j=1 while pile[pos+1:pos+1+j+1].isdigit(): j+=1 indel_size = int(pile[pos+1:pos+1+j]) char_dec += (1 + indel_size + j) #INDEL_CHAR + SIZE_CHARS + INDEL if (i+char_dec+1) < len(pile): if pile[i+char_dec+1] == self.pileup_param['END_CHAR']: # this mean that a read end: $<last char of the read> self.finishing_reads.append(len(self.pile)-1) #len(pile)-1 correspond to the index of the actual read self.opened_read[len(self.pile)-1].stop = position char_dec +=1 i+=char_dec i+=1 # print "*"*80 # print "Update", position, self.pile self.current_position = position #by this point reads have been created, stacked up for finishing and pile has been reduced return
[docs] def get_count(self): ''' Returns: - dict. a dictionnary containing the count of each letter (key are letter, value is count) ''' c = {'A':0,'T':0,'C':0,'G':0} # print self.coverage,self.pile for i in range(self.coverage): if self.pile[i] == self.pileup_param['EMPTY_CHAR']: continue #we exclude deletions from the count if not self.pile[i] in c: if self.pile[i].upper() == "N": sys.stderr.write("%s found in the sequence (position %s - column %s). Only ATGC are considered. Ignored\n" % (self.pile[i], self.current_position, i)) continue else: raise IOError("Error reading pileup file. %s (found at position %s, column %s) is not A,T,G,C or N.\n" % (self.pile[i], self.current_position, i)) c[self.pile[i]]+=1 return c
[docs] def add_snp(self, i, alt): '''Add the snp to the reads Args: - i (int): the index of the spn - alt (str): the alt of the snp ''' #print self.coverage, len(self.opened_read), len(self.pile) for j in range(len(self.opened_read)): self.opened_read[j].add_snp(i, int(self.pile[j]==alt)) #the presence is 0 if l!=alt, 1 otherwise #Note that a deletion will count as ref allele in this case... return
[docs] def finish_reads(self): ''' Finish the reads properly and return them Returns: - list. a list of the finished reads ''' finish = list() for i in range(len(self.finishing_reads)): #for each finishing index f = self.finishing_reads[i] - i #because the read are read in order and that they all decrease their index of one when we pop a read finish.append(self.opened_read.pop(f)) return finish
[docs] def get_nb_reads(self): ''' Returns: - the number of opened reads ''' return len(self.opened_read)
[docs] def update_tags(self, allele): ''' Update the tags of its opened reads according to their content and the list of detected alleles at this position Args: - allele (list): list of the detected alleles at this position !!!This is not made for more than diploid yet ''' #we update the tags for t, a in enumerate(allele): if a != "*": #good allele for i in range(len(self.opened_read)): #for all non-tagged read if self.pile[i] == a and self.opened_read[i].tag == -1 : #this read present the good allele self.opened_read[i].tag = t #updating the tag # if the allele is ambiguous ("*"), do nothing return