Source code for tools.parsers.VCFTools

import os, types, gzip

[docs]class VCFEntry(object) : """A single entry in a VCF file""" def __init__(self, vcfFile, line, lineNumber) : #CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003 #20 14370 rs6054257 G A 29 PASS NS=3;DP=14;AF=0.5;DB;H2 GT:GQ:DP:HQ 0|0:48:1:51,51 1|0:48:8:51,51 1/1:43:5:.,. self.vcfFile = vcfFile self.lineNumber = lineNumber self.data = {} tmpL = line.replace('\r', '\n').replace('\n', '') tmpData = str(tmpL).split('\t') for i in range(6) : self.data[vcfFile.dnegel[i]] = tmpData[i] self.data['POS'] = int(self.data['POS']) filters = tmpData[6].split(';') if len(filters) == 1 : self.data['FILTER'] = filters else : for filter_value in tmpData[6].split(';') : filt, value = info_value.split('=') self.data['FILTER'][filt] = value self.data['INFO'] = {} for s in tmpData[7].split(';') : info_value = s.split('=') try : typ = self.vcfFile.meta['INFO'][info_value[0]]['Type'] except KeyError : typ = None if len(info_value) == 1 : if typ == 'Flag' or typ == None : self.data['INFO'][info_value[0]] = True else : raise ValueError('%s is not a flag and has no value' % info_value[0]) else : if typ == 'Integer' : self.data['INFO'][info_value[0]] = int(info_value[1]) elif typ == 'Float' : self.data['INFO'][info_value[0]] = float(info_value[1]) else : self.data['INFO'][info_value[0]] = info_value[1] def __getitem__(self, key) : "with the vcf file format some fields are not present in all elements therefor, this fct never raises an exception but returns None or False if the field is definied as a Flag in Meta" try : return self.data[key] except KeyError: try : return self.data['INFO'][key] except KeyError: try : if self.vcfFile.meta['INFO'][key]['Type'] == 'Flag' : self.data['INFO'][key] = False return self.data['INFO'][key] else : return None except KeyError: return None def __repr__(self) : return "<VCFEntry line: %d>" % self.lineNumber def __str__(self) : return "<VCFEntry line: %d, %s" % (self.lineNumber, str(self.data))
[docs]class VCFFile(object) : """ This is a small parser for VCF files, it should work with any VCF file but has only been tested on dbSNP138 files. Represents a whole VCF file:: #reading f = VCFFile() f.parse('hop.vcf') for line in f : print(line['pos']) """ def __init__(self, filename = None, gziped = False, stream = False) : self.legend = {} self.dnegel = {} self.meta = {} self.lines = None if filename : self.parse(filename, gziped, stream)
[docs] def parse(self, filename, gziped = False, stream = False) : """opens a file""" self.stream = stream if gziped : self.f = gzip.open(filename, 'rt') else : self.f = open(filename) self.filename = filename self.gziped = gziped lineId = 0 inLegend = True while inLegend : ll = self.f.readline() l = ll.replace('\r', '\n').replace('\n', '') if l[:2] == '##' : eqPos = l.find('=') key = l[2:eqPos] values = l[eqPos+1:].strip() if l[eqPos+1] != '<' : self.meta[key] = values else : if key not in self.meta : self.meta[key] = {} svalues = l[eqPos+2:-1].split(',') #remove the < and > that surounf the string idKey = svalues[0].split('=')[1] self.meta[key][idKey] = {} i = 1 for v in svalues[1:] : sv = v.split("=") field = sv[0] value = sv[1] if field.lower() == 'description' : self.meta[key][idKey][field] = ','.join(svalues[i:])[len(field)+2:-1] break self.meta[key][idKey][field] = value i += 1 elif l[:6] == '#CHROM': #we are in legend sl = l.split('\t') for i in range(len(sl)) : self.legend[sl[i]] = i self.dnegel[i] = sl[i] break lineId += 1 if not stream : self.lines = self.f.readlines() self.f.close()
[docs] def close(self) : """closes the file""" self.f.close()
def _developLine(self, lineNumber) : if self.lines[lineNumber].__class__ is not VCFEntry : self.lines[lineNumber] = VCFEntry(self, self.lines[lineNumber], lineNumber) def __iter__(self) : self.currentPos = -1 return self def __next__(self) : self.currentPos += 1 if not self.stream : try : return self[self.currentPos-1] except IndexError: raise StopIteration else : midfile_header = True while midfile_header: line = self.f.readline() if not line : raise StopIteration if not line.startswith('#'): midfile_header = False return VCFEntry(self, line, self.currentPos) def __getitem__(self, line) : """returns the lineth element""" if self.stream : raise KeyError("When the file is opened as a stream it's impossible to ask for specific item") if self.lines[line].__class__ is not VCFEntry : self._developLine(line) return self.lines[line] def __len__(self) : """returns the number of entries""" return len(self.lines) def __repr__(self) : return "<VCFFile: %s>" % (os.path.basename(self.filename)) def __str__(self) : if self.stream : return "<VCFFile: %s, gziped: %s, stream: %s, len: undef>" % (os.path.basename(self.filename), self.gziped, self.stream) else : return "<VCFFile: %s, gziped: %s, stream: %s, len: %d>" % (os.path.basename(self.filename), self.gziped, self.stream, len(self))
if __name__ == '__main__' : from pyGeno.tools.ProgressBar import ProgressBar #v = VCFFile('/u/daoudat/py/pySrc/pyGeno_stuff/V2/packages/dbSNP/human/dbSNP138/00-All.vcf.gz', gziped = True, stream = True) v = VCFFile('/u/daoudat/py/pySrc/pyGeno_stuff/V2/packages/dbSNP/human/dbSNP138/test/00-All-test.vcf.gz', gziped = True, stream = True) startTime = time.time() i = 0 pBar = ProgressBar() for f in v : pBar.update('%s' % i) if i > 1000000 : break i += 1 pBar.close() #v = VCFFile('/u/daoudat/py/pySrc/pyGeno_stuff/V2/packages/dbSNP/human/dbSNP138/00-All.vcf', gziped = False, stream = True) v = VCFFile('/u/daoudat/py/pySrc/pyGeno_stuff/V2/packages/dbSNP/human/dbSNP138/test/00-All-test.vcf', gziped = False, stream = False) startTime = time.time() i = 0 pBar = ProgressBar() for f in v : pBar.update('%s' % i) if i > 1000000 : break i += 1 pBar.close()