#!/usr/bin/env python3
# (c) Copyright 2013-2018 Murray Cox, Wandrille Duchemin, Pierre-Yves Dupont.
#
#
# This file is part of HyLiTE.
#
# HyLiTE is a free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
# HyLiTE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with HyLiTE. If not, see <http://www.gnu.org/licenses/>
#===============================#
# author: Wandrille Duchemin #
# Murray Cox #
# last modified: 9 January 2018 #
#===============================#
from .Parameters import Parameters
from .Read import Read
import sys
def phred_quality(char):
return ord(char) - Parameters().get_param('PHRED_STANDARD')
[docs]class Lane:
'''
This class is used to represent a lane in a pileup file (a lane is comprised of three column: coverage, pile, quality).
Attributes:
- index (int): its index in the pileup file
- coverage (int): its current coverage
- pile (str): its current pile
- qual (list): its current quality
- opened_read (list): a list of the opened read contained in the lane
- pileup_param (dict): key is the id of the parameters, values is the parameter. This is mostly the character .pileup use to represent data
'''
def __init__(self, index):
'''This class is used to represent a lane in a pileup file (a lane is comprised of three column: coverage, pile, quality).
Args:
- index (int): the index of the lane in a pileup file
'''
self.index = index
self.coverage = 0
self.pile=[]
self.qual=[]
self.opened_read = list()
self.finishing_reads = list() #list of the index of finishing reads
self.pileup_param=dict()
param = Parameters()
self.pileup_param['EMPTY_CHAR'] = param.get_param('EMPTY_CHAR')
self.pileup_param['BEGIN_CHAR'] = param.get_param('BEGIN_CHAR')
self.pileup_param['REF_CHAR'] = param.get_param('REF_CHAR')
self.pileup_param['INDEL_CHAR'] = param.get_param('INDEL_CHAR')
self.pileup_param['END_CHAR'] = param.get_param('END_CHAR')
self.current_position = None
return
[docs] def update(self, cov, pile, qual, organism, sample, gene, position, ref):
'''Update the Lane with this information
Args:
- cov (int): a coverage
- pile (str): a pile
- qual (str): a quality str
- organism (str): an organism name
- sample (str): a sample name
- gene (str): a gene name
- position (int): a position
- ref (str): the reference for the position
'''
self.coverage = cov
self.qual=qual#[phred_quality(c) for c in qual]
self.pile=list() #re-initialization of the pile
self.finishing_reads=list()
i=0
# print position,cov,pile
while i < len(pile):# BEWARE THE CONFUSION BETWEEN pile: a str and self.pile: a list ... sorry
char_dec = 0 #how much do we have to put forward to find the alignment char
if pile[i]==self.pileup_param['EMPTY_CHAR']: #which can mean really empty OR deletion
pass #we don't do anything
if pile[i] == self.pileup_param['BEGIN_CHAR']: # this mean that a read begin: ^<phred char><first char of the read>
self.opened_read.append(Read(organism, sample, gene, position))
char_dec +=2
if pile[i+char_dec] in self.pileup_param['REF_CHAR']:
self.pile.append(ref.upper())
else:
self.pile.append(pile[i+char_dec].upper())
if (i+char_dec+1) < len(pile):
if pile[i+char_dec+1] in self.pileup_param['INDEL_CHAR']: # this mean a deletion/insertion: <current char><+/-><size of INDEL><INDEL chars>
pos = i+char_dec+1
j=1
while pile[pos+1:pos+1+j+1].isdigit():
j+=1
indel_size = int(pile[pos+1:pos+1+j])
char_dec += (1 + indel_size + j) #INDEL_CHAR + SIZE_CHARS + INDEL
if (i+char_dec+1) < len(pile):
if pile[i+char_dec+1] == self.pileup_param['END_CHAR']: # this mean that a read end: $<last char of the read>
self.finishing_reads.append(len(self.pile)-1) #len(pile)-1 correspond to the index of the actual read
self.opened_read[len(self.pile)-1].stop = position
char_dec +=1
i+=char_dec
i+=1
# print "*"*80
# print "Update", position, self.pile
self.current_position = position
#by this point reads have been created, stacked up for finishing and pile has been reduced
return
[docs] def get_count(self):
'''
Returns:
- dict. a dictionnary containing the count of each letter (key are letter, value is count)
'''
c = {'A':0,'T':0,'C':0,'G':0}
# print self.coverage,self.pile
for i in range(self.coverage):
if self.pile[i] == self.pileup_param['EMPTY_CHAR']:
continue #we exclude deletions from the count
if not self.pile[i] in c:
if self.pile[i].upper() == "N":
sys.stderr.write("%s found in the sequence (position %s - column %s). Only ATGC are considered. Ignored\n" % (self.pile[i], self.current_position, i))
continue
else:
raise IOError("Error reading pileup file. %s (found at position %s, column %s) is not A,T,G,C or N.\n" % (self.pile[i], self.current_position, i))
c[self.pile[i]]+=1
return c
[docs] def add_snp(self, i, alt):
'''Add the snp to the reads
Args:
- i (int): the index of the spn
- alt (str): the alt of the snp
'''
#print self.coverage, len(self.opened_read), len(self.pile)
for j in range(len(self.opened_read)):
self.opened_read[j].add_snp(i, int(self.pile[j]==alt)) #the presence is 0 if l!=alt, 1 otherwise #Note that a deletion will count as ref allele in this case...
return
[docs] def finish_reads(self):
'''
Finish the reads properly and return them
Returns:
- list. a list of the finished reads
'''
finish = list()
for i in range(len(self.finishing_reads)): #for each finishing index
f = self.finishing_reads[i] - i #because the read are read in order and that they all decrease their index of one when we pop a read
finish.append(self.opened_read.pop(f))
return finish
[docs] def get_nb_reads(self):
'''
Returns:
- the number of opened reads
'''
return len(self.opened_read)