Source code for gepyto.formats.gtf

#
# Parser for GTF/GFF files.
#
# This file is part of gepyto.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.

__author__ = "Marc-Andre Legault"
__copyright__ = ("Copyright 2014 Marc-Andre Legault and Louis-Philippe "
                 "Lemieux Perreault. All rights reserved.")
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


import re
import gzip
import functools
import datetime
import logging
from collections import namedtuple

from six.moves import urllib
from six import binary_type

from ..structures.sequences import Sequence


class InvalidGTF(Exception):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        return repr(self.value)


[docs]class GTFFile(object): """Parser for GTF files. This implementation was based on the format specification as described here: http://www.sanger.ac.uk/resources/software/gff/spec.html. You can use this parser on both local files (compressed using gzip, or not) and on remote files (on a HTTP server). For every line, this class will return a named tuple with the following fields: - seqname - source - features - start - end - score - strand - frame - attributes Example usage: >>> import gepyto.formats.gtf >>> url = "http://www.uniprot.org/uniprot/O60503.gff" >>> gtf = gepyto.formats.gtf.GTFFile(url) >>> gtf <gepyto.formats.gtf.GTFFile object at 0x1006dd590> >>> gtf.readline() _Line(seqname=u'O60503', source=u'UniProtKB', features=u'Chain', start=1,\ end=1353, score=None, strand=None, frame=None, attributes={u'Note':\ u'Adenylate cyclase type 9', u'ID': u'PRO_0000195708'}) """ Line = namedtuple("_Line", ["seqname", "source", "features", "start", "end", "score", "strand", "frame", "attributes"]) def __init__(self, fn): self._filename = fn if fn.endswith(".gz"): opener = gzip.open elif fn.startswith("http://"): opener = urllib.request.urlopen else: opener = functools.partial(open, mode="r") self._file = opener(fn) self._line_accumulator = None self._read_headers() def __next__(self): if self._line_accumulator is None: line = next(self._file) if line is None: raise StopIteration() else: line = self._line_accumulator self._line_accumulator = None return GTFFile.parse_line(line) next = __next__ def readline(self): return self.next() def __iter__(self): return self def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def close(self): self._file.close() @staticmethod def parse_line(line): # Decode if bytes. if type(line) is binary_type: line = line.decode("utf-8") assert not line.startswith("#") if line.startswith("chr"): line = line[3:] line = line.strip().split("\t") if len(line) < 8: raise InvalidGTF("Mandatory fields are missing.") seqname, source, feature, start, end, score, strand, frame = line[:8] attributes = None # Check the format and parse the attributes field (optional). if len(line) >= 9: # Merge attribute fields if tabs are in them. line[8] = " ".join(line[8:]) # Extra attributes are available. attributes = line[8].strip().split(";") parsed_attributes = {} for attr in attributes: if attr == "": # If lines finish with a ';' continue attr = attr.strip() # We authorize both whitespace or '=' sign for ID=VALUE. # The equals syntax is used by major institutions. tag = re.match(r"^([A-Za-z][A-Za-z0-9_]*)[\s=]", attr) if tag is None: raise InvalidGTF("Invalid tag in attributes field \"{}\"." "".format(attr)) tag = tag.group(1) value = attr[len(tag):].strip("= \t") value = value.replace('"', '') parsed_attributes[tag] = value attributes = parsed_attributes # Type checks. if seqname.startswith("chr"): seqname = seqname[3:] try: start = int(start) end = int(end) assert start <= end assert start >= 1 score = float(score) if score != "." else None strand = strand if strand != "." else None assert strand in ("+", "-") or strand is None frame = frame if frame != "." else None assert frame in ("0", "1", "2") or frame is None except AssertionError as e: message = ("Some fields of the GTF were invalid. The parsed " "parameters are as follows:\n" "\tsequname: {seqname}\n" "\tsource: {source}\n" "\tfeature: {feature}\n" "\tstart: {start}\n" "\tend: {end}\n" "\tscore: {score}\n" "\tstrand: {strand}\n" "\tframe: {frame}\n") message = message.format(seqname=seqname, source=source, feature=feature, start=start, end=end, score=score, strand=strand, frame=frame) logging.critical(message) raise e return GTFFile.Line(seqname, source, feature, start, end, score, strand, frame, attributes) def _read_headers(self): """Skip generic headers and parse paraseable headers of the GTF file. Recognized headers are `described here <http://www.sanger.ac.uk/resources/software/gff/spec.html#t_2>`_ : They are: - gff-version - source-version - date - Type - DNA - RNA - Protein - sequence-region If available, all of these will be parsed as attributes to the GTFFile object. """ try: line = next(self._file) except StopIteration: raise InvalidGTF("File seems to be empty.") while line.startswith("#"): if not line.startswith("##"): try: line = next(self._file) continue except StopIteration: return line = line.lstrip("#") line = line.split() if line[0] == "gff-version": self.gff_version = int(line[1]) elif line[0] == "source-version": if not hasattr(self, "source_version"): self.source_version = {} self.source_version[line[1]] = line[2] elif line[0] == "date": try: self.date = datetime.datetime.strptime(line[1], "%Y-%m-%d") except ValueError: msg = ("Could not parse date from GTF header. Expected" "ISO 8601 format, got '{}'.".format(line[1])) logging.warning(msg) self.date = line[1] elif line[0] == "Type": if line[1] not in ("DNA", "Protein", "RNA"): logging.warning("Unknown Type in GTF: '{}'. Known " "types are 'DNA', 'Protein' and 'RNA'." "".format(line[1])) if len(line) == 2: self.type = line[1] elif len(line) == 3: if not hasattr(self, "type"): self.type = {} self.type[line[2]] = line[1] elif line[0] in ("DNA", "RNA", "Protein"): seq_type = line[0] name = line[1] line = next(self._file).lstrip("#").strip() seq = "" while line != "end-{}".format(seq_type): seq += line.lstrip("#").strip() try: line = next(self._file).lstrip("#").strip() except StopIteration: raise Exception("'{}' sequence header found, but " "not closed.".format(name)) if not hasattr(self, "sequences"): self.sequences = {} if seq_type in ("DNA", "RNA"): self.sequences[name] = Sequence(name, seq, seq_type) else: self.sequences[name] = Sequence(name, seq, "AA") elif line[0] == "sequence-region": if not hasattr(self, "sequence_regions"): self.sequence_regions = {} self.sequence_regions[line[1]] = ( int(line[2]), int(line[3]) ) try: line = next(self._file) except StopIteration: return self._line_accumulator = line # This is not a header line. # Alias for the GFF format.
GFFFile = GTFFile