Source code for gepyto.formats.wig

"""
Parser for Wiggle Track Format files.
"""

# This file is part of gepyto.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.

__author__ = "Marc-Andre Legault"
__copyright__ = ("Copyright 2014 Marc-Andre Legault and Louis-Philippe "
                 "Lemieux Perreault. All rights reserved.")
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"


import os

import pandas as pd
import six


[docs]class WiggleFile(object): """Parser for WIG files. This returns a pandas dataframe with all the necessary information. In the process, all the inherent compactness of the Wiggle format is lost in exchange for an easier to manage representation. This means that more efficient parsers should be used for large chunks of data. This implementation is based on the specification from: http://genome.ucsc.edu/goldenpath/help/wiggle.html .. warning:: ``fixedStep`` is the only implemented mode for now. Future releases might improve this parser to be more flexible. To access the parsed information, use the :py:func:`WiggleFile.as_dataframe` function. Usage (given a file on disk): >>> import gepyto.formats.wig >>> with gepyto.formats.wig.WiggleFile("my_file.wig") as f: ... df = f.as_dataframe() ... >>> df chrom pos value 0 chr3 400601 11 1 chr3 400701 22 2 chr3 400801 33 """ def __init__(self, stream): self.stream = stream if isinstance(stream, six.string_types): if os.path.isfile(stream): self.stream = open(stream, "r") else: raise IOError("Can't find file '{}'.".format(stream)) mode, first_header = self._parse_header(next(self.stream)) if mode == "fixedStep": self.data = self._parse_fixed_step(header=first_header) else: raise NotImplementedError("fixedStep is the only implemented mode " "for now.") # Use categories for chrom to save space. self.data["chrom"] = self.data["chrom"].astype("category") # Check if regions or only 1 bases # If so use pos instead of start, end. if (self.data["start"] == self.data["end"]).all(): self.data = self.data.drop("end", axis=1) self.data.columns = ("chrom", "pos", "value") def __enter__(self): return self def __exit__(self, *params): self.close() def close(self): # This will close the file if it's a file. try: self.stream.close() except AttributeError: pass def as_dataframe(self): return self.data def _parse_fixed_step(self, header=None): data = [] for line in self.stream: if self._is_header(line): mode, header = self._parse_header(line) assert ( mode == "fixedStep" ), "Can't change mode after parsing started." else: data.append(( header["chrom"], header["pos"], header["pos"] + header["span"] - 1, float(line.rstrip()) )) header["pos"] += header["step"] return pd.DataFrame( data, columns=("chrom", "start", "end", "value") ) @staticmethod def _parse_header(line): line = line.rstrip().split() mode = line[0] line = line[1:] header = dict([field.split("=") for field in line]) header["start"] = int(header["start"]) header["step"] = int(header["step"]) header["span"] = int(header.get("span", 1)) header["pos"] = header["start"] return mode, header @staticmethod def _is_header(line): return ( line.startswith("variableStep") or line.startswith("fixedStep") )