Source code for tools.parsers.GTFTools

import gzip
import os

class GTFEntry(object) :
    def __init__(self, gtfFile, lineNumber) :
        """A single entry in a GTF file"""
        
        self.lineNumber = lineNumber
        self.gtfFile = gtfFile
        self.data = gtfFile.lines[lineNumber][:-2].split('\t') #-2 remove ';\n'
        proto_atts = self.data[gtfFile.legend['attributes']].strip().split('; ')
        atts = {}
        for a in proto_atts :
            sa = a.split(' ')
            k = sa[0]
            v = sa[1].replace('"', '')
            if k not in atts:
                atts[k] = v
            else:
                if type(atts[k]) != list:
                    atts[k] = [atts[k], v]
                else:
                    atts[k].append(v)
        self.data[gtfFile.legend['attributes']] = atts
    
    def __getitem__(self, k) :
        try :
            return self.data[self.gtfFile.legend[k]]
        except KeyError :
            try :
                return self.data[self.gtfFile.legend['attributes']][k]
            except KeyError :
                #return None
                raise KeyError(
                        "Line %d does not have an element %s.\nline:%s" % (
                            self.lineNumber, k, self.gtfFile.lines[self.lineNumber])
                        )
    
    def __repr__(self) :
        return "<GTFEntry line: %d>" % self.lineNumber
    
    def __str__(self) :
        return  "<GTFEntry line: %d, %s>" % (self.lineNumber, str(self.data))

[docs]class GTFFile(object) : """This is a simple GTF2.2 (Revised Ensembl GTF) parser, see http://mblab.wustl.edu/GTF22.html for more infos """ def __init__(self, filename, gziped = False) : self.filename = filename self.legend = { 'seqname' : 0, 'source' : 1, 'feature' : 2, 'start' : 3, 'end' : 4, 'score' : 5, 'strand' : 6, 'frame' : 7, 'attributes' : 8} self.gziped = gziped if gziped : f = gzip.open(filename, 'rt') else : f = open(filename) self.lines = [] for l in f : if l[0] != '#' and l != '' : self.lines.append(l) f.close() self.currentPos = -1
[docs] def get(self, line, elmt) : """returns the value of the field 'elmt' of line 'line'""" return self[line][elmt]
[docs] def get_transcripts(self, transcript_ids=None): """returns genes with its transcripts and associated exons and CDSs from a GTF if transcript_ids is used, only these transcripts will be returned """ if transcript_ids is not None: #transcript_ids = [y for x in transcript_ids for y in [x, x.split('.')[0]]] transcript_ids = set(transcript_ids) if transcript_ids is not None else transcript_ids gtf = iter(self) end_reached = False # Iterate through all genes in file # first line try: line = next(gtf) except: raise ('Empty GTF file') # pop-out lines that come before first gene while line['feature'] != 'gene': try: line = next(gtf) except: raise ('Never encountered a gene in the GTF') # Get all genes while line['feature'] == 'gene': gene = line transcripts = [] exons = [] cdss = [] start_codons = [] stop_codons = [] # pop-out all lines that come before first transcript in gene while line['feature'] != 'transcript': try: line = next(gtf) except: raise StopIteration('Last gene %s contains no transcripts' % gene['gene_id']) # Get all transcripts in gene while line['feature'] == 'transcript': tr = line ex = [] cds = [] start_codon = [] stop_codon = [] try: line = next(gtf) except: raise StopIteration('Last transcript %s contains no features' % tr['transcript_id']) # Get all features in transcript while line['feature'] != 'transcript' and line['feature'] != 'gene': if line['feature'] == 'exon': ex.append(line) if line['feature'] == 'CDS': cds.append(line) if line['feature'] == 'start_codon': start_codon.append(line) if line['feature'] == 'stop_codon': stop_codon.append(line) try: line = next(gtf) except StopIteration: end_reached = True break if transcript_ids is None or tr[0]['transcript_id'] in transcript_ids: transcripts.append(tr) exons.append(ex) cdss.append(cds) start_codons.append(start_codon) stop_codons.append(stop_codon) assert len(transcripts) yield (gene, transcripts, exons, cdss, start_codons, stop_codons) assert end_reached # in a normal GTF file, this value should be true assert self.currentPos == len(self)
[docs] def gtf2bed(self, bed_filename, feature='transcripts'): """Transform gtf to bed6/bed12 and saves the output to file""" if feature == 'transcripts': return self.gtf2bed_transcripts(bed_filename) elif feature == 'exons': return self.gtf2bed_exons(bed_filename) elif feature == 'cds': return self.gtf2bed_cds(bed_filename) else: raise ValueError('Accepted feature types are "transcripts", "exons" and "cds".')
[docs] def gtf2bed_transcripts(self, bed_filename): """Retrieves transcript information from gtf in bed12 format""" f_out = open(bed_filename, 'w') for gene, transcripts, exons, cdss, start_codons, stop_codons in self.get_transcripts(): for tr, ex, cds, stt, stp in zip(transcripts, exons, cdss, start_codons, stop_codons): has_exon = bool(len(ex)) assert has_exon has_stop_codon = bool(len(stp)) if has_stop_codon: stop_codon_length = 0 for stp_cod in stp: stop_codon_length += int(stp_cod['end']) - int(stp_cod['start']) + 1 assert stop_codon_length == 3 chromosome = tr['seqname'] tid = tr['transcript_id'] strand = tr['strand'] start = int(tr['start']) - 1 # 0-base end = int(tr['end']) exon_lengths = [] exon_start_pos = [] for ex_line in ex: assert ex_line['transcript_id'] == tid s = int(ex_line['start']) - 1 # 0-base e = int(ex_line['end']) exon_lengths.append(e-s) exon_start_pos.append(s-start) exon_number = ex_line['exon_number'] if len(cds): cds_start_pos = float('inf') cds_end_pos = -1 for cds_line in cds: assert cds_line['transcript_id'] == tid s = int(cds_line['start']) - 1 # 0-base e = int(cds_line['end']) cds_start_pos = min(cds_start_pos, s) cds_end_pos = max(cds_end_pos, e) else: cds_start_pos = start cds_end_pos = end if strand == '+': if has_stop_codon: cds_end_pos += 3 try: assert cds_end_pos <= end except AssertionError: print("NOTE: Transcript %s has incorrect stop_codon annotation" % tid) cds_end_pos -= 3 elif strand == '-': if has_stop_codon: cds_start_pos -= 3 try: assert cds_start_pos >= start except AssertionError: print("NOTE: Transcript %s has incorrect stop_codon annotation" % tid) cds_start_pos += 3 exon_lengths = exon_lengths[::-1] exon_start_pos = exon_start_pos[::-1] else: raise ValueError("Unrecognizable strand value: '%s'." % strand) entry = [chromosome, start, end, tid, '.', strand, cds_start_pos, cds_end_pos, '.', exon_number, ','.join([str(x) for x in exon_lengths]), ','.join([str(x) for x in exon_start_pos])] f_out.write('\t'.join([str(x) for x in entry]) + '\n') f_out.close() return True
[docs] def gtf2bed_exons(self, bed_filename, join_overlaps=True): """Retrieves exon information from gtf in bed6 format""" chromosome_list = [] exon_positions_dict = dict() for gene, transcripts, exons, cdss, start_codons, stop_codons in self.get_transcripts(): gene_exon_positions = [] for tr, ex, cds, stt, stp in zip(transcripts, exons, cdss, start_codons, stop_codons): if len(ex): chromosome = tr['seqname'] gid = tr['gene_id'] strand = tr['strand'] if chromosome not in exon_positions_dict: exon_positions_dict[chromosome] = [] chromosome_list.append(chromosome) exon_positions = [(int(e['start'])-1, int(e['end'])) for e in ex] if strand == '+': pass elif strand == '-': exon_positions = exon_positions[::-1] else: raise ValueError("Unrecognizable strand value: '%s'." % strand) gene_exon_positions.extend(exon_positions) # Join overlapping entries if join_overlaps: gene_exon_positions = sorted(list(set(gene_exon_positions))) gene_exon_positions = self._join_ends(gene_exon_positions) for start, end in gene_exon_positions: exon_positions_dict[chromosome].append((chromosome, start, end, gid, '.', strand)) f_out = open(bed_filename, 'w') for chromosome in chromosome_list: current_positions = list(set(exon_positions_dict[chromosome])) current_positions = sorted(current_positions, key=lambda x: (x[1], x[2], x[3])) for entry in current_positions: f_out.write('\t'.join([str(x) for x in entry]) + '\n') f_out.close() return True
[docs] def gtf2bed_cds(self, bed_filename, join_overlaps=True): """Retrieves CDS information from gtf in bed6 format""" chromosome_list = [] cds_positions_dict = dict() for gene, transcripts, exons, cdss, start_codons, stop_codons in self.get_transcripts(): gene_cds_positions = [] for tr, ex, cds, stt, stp in zip(transcripts, exons, cdss, start_codons, stop_codons): if len(cds): chromosome = tr['seqname'] gid = tr['gene_id'] strand = tr['strand'] has_stop_codon = bool(len(stp)) if has_stop_codon: stop_codon_length = 0 for stp_cod in stp: stop_codon_length += int(stp_cod['end']) - int(stp_cod['start']) + 1 assert stop_codon_length == 3 #try: # has_stop_codon = 'cds_end_NF' not in set(tr['tag']) #except KeyError: # has_stop_codon = True if chromosome not in cds_positions_dict: cds_positions_dict[chromosome] = [] chromosome_list.append(chromosome) cds_positions = [(int(c['start'])-1, int(c['end'])) for c in cds] if strand == '+': if has_stop_codon: cds_positions[-1] = (cds_positions[-1][0], cds_positions[-1][1] + 3) elif strand == '-': if has_stop_codon: cds_positions[-1] = (cds_positions[-1][0] - 3, cds_positions[-1][1]) cds_positions = cds_positions[::-1] else: raise ValueError("Unrecognizable strand value: '%s'." % strand) gene_cds_positions.extend(cds_positions) # Join overlapping entries if join_overlaps: gene_cds_positions = sorted(list(set(gene_cds_positions))) gene_cds_positions = self._join_ends(gene_cds_positions) for start, end in gene_cds_positions: cds_positions_dict[chromosome].append((chromosome, start, end, gid, '.', strand)) f_out = open(bed_filename, 'w') for chromosome in chromosome_list: current_positions = list(set(cds_positions_dict[chromosome])) current_positions = sorted(current_positions, key=lambda x: (x[1], x[2], x[3])) for entry in current_positions: f_out.write('\t'.join([str(x) for x in entry]) + '\n') f_out.close() return True
def _join_ends(self, positions, kept=[]): if not positions: return kept elif len(positions) == 1: start, end = positions.pop(0) return self._join_ends(positions, kept=kept+[(start, end)]) else: start, end = positions.pop(0) start_next, end_next = positions.pop(0) while end >= start_next and positions: start = min(start, start_next) end = max(end, end_next) start_next, end_next = positions.pop(0) return self._join_ends(positions, kept=kept+[(start, end)]) def __iter__(self) : self.currentPos = -1 return self def __next__(self) : self.currentPos += 1 try : return GTFEntry(self, self.currentPos) except IndexError: raise StopIteration def __getitem__(self, i) : """returns the ith entry""" if self.lines[i].__class__ is not GTFEntry : self.lines[i] = GTFEntry(self, i) return self.lines[i] def __repr__(self) : return "<GTFFile: %s>" % (os.path.basename(self.filename)) def __str__(self) : return "<GTFFile: %s, gziped: %s, len: %d, currentPosition: %d>" % ( os.path.basename(self.filename), self.gziped, len(self), self.currentPos) return "<GTFFile: %s, gziped: %s, len: %d>" % ( os.path.basename(self.filename), self.gziped, len(self)) def __len__(self) : return len(self.lines)