# This file is part of gepyto.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.
__author__ = "Marc-Andre Legault"
__copyright__ = ("Copyright 2014 Marc-Andre Legault and Louis-Philippe "
"Lemieux Perreault. All rights reserved.")
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"
import re
import contextlib
import logging
import sqlite3
import sys
import traceback
from .. import settings
from ..reference import Reference, check_indel_reference
from ..db.ensembl import query_ensembl, get_url_prefix
from . import genes
__all__ = ["SNP", "Indel", "variant_list_to_dataframe"]
class Variant(object):
"""Base class for genome Variants.
"""
def __init__(self, *args, **kwargs):
"""Super constructor for Variant objects.
It will initalize the object by assuming one of the following
scenarios:
1) The position arguments (*args) correspond to the elements of the
_PARAMETERS argument which is overriden by the subclasses when the
parent constructor is called.
2) All of the _PARAMETERS elements should be specified as keyword
arguments (**kwargs).
A sanity check will be done after initalization to make sure that all
of the parameters are defined.
"""
if "_PARAMETERS" in kwargs:
_PARAMETERS = kwargs.pop("_PARAMETERS")
else:
_PARAMETERS = ["chrom", "start", "end"]
if len(args) == len(_PARAMETERS):
for i, p in enumerate(_PARAMETERS):
setattr(self, p, args[i])
else:
keys = list(kwargs.keys())
for k in keys:
if k not in _PARAMETERS:
raise Exception("Unknown parameter {}.".format(k))
setattr(self, k, kwargs.pop(k))
# Make sure everything was set.
for p in _PARAMETERS:
if getattr(self, p, "-1") == "-1":
raise Exception("Missing parameter {}.".format(p))
def __hash__(self):
raise NotImplementedError("Variant hash is only implemented on objects"
" with 'chrom', 'pos', 'ref' and 'alt'"
" attributes.")
def format(self, spec, format_mapping=None):
"""Specific format to print a variant.
When printing variants, you have control over the representation using
special symbols similar to the `datetime` module:
+--------+--------------------+
| Symbol | Description |
+========+====================+
| %c | Chromosome |
+--------+--------------------+
| %p | Position |
+--------+--------------------+
| %r | Reference allele |
+--------+--------------------+
| %a | Alternative allele |
+--------+--------------------+
| %i | RS Id |
+--------+--------------------+
The basic syntax is the to use something like: ::
"{:chr%c:%p_%r/%a}".format(variant) # OR
variant.format("chr%c:%p_%r/%a")
This can be useful to generate strings with a specific structure to
convert between file formats.
.. todo::
We also want to add something like `datetime.strptime()` to parse
variant objects from a coded string.
"""
if format_mapping is None:
format_mapping = {
"%c": "chrom",
"%p": "pos",
"%r": "ref",
"%a": "alt",
"%i": "rs",
}
res = ""
last_pos = 0
found_mapping = False
for match in re.finditer("%[a-zA-Z]", spec):
if match.group() not in format_mapping:
raise KeyError("{}: invalid format".format(match.group()))
res += spec[last_pos:match.start()]
res += "{}".format((getattr(self, format_mapping[match.group()])))
last_pos = match.end()
found_mapping = True
if not found_mapping:
return spec
return res
def __format__(self, format_spec, format_mapping=None):
if format_spec == "":
return repr(self)
else:
return self.format(format_spec, format_mapping)
class ShortVariant(Variant):
"""Parent class for short variants like SNPs and Indels.
These have the following attributes:
- chrom
- pos
- rs
- ref
- alt
"""
def __init__(self, *args, **kwargs):
_PARAMETERS = [
"chrom",
"pos",
"rs",
"ref",
"alt",
]
kwargs["_PARAMETERS"] = _PARAMETERS
super(ShortVariant, self).__init__(*args, **kwargs)
self.pos = int(self.pos)
self.ref = self.ref.upper()
self.alt = self.alt.upper()
try:
assert re.match(settings.CHROM_REGEX, str(self.chrom))
assert self.rs is None or re.match(r"^rs[0-9]+$", self.rs)
assert type(self.pos) is int
assert type(self.ref) is str
assert type(self.alt) is str
except AssertionError as e:
logging.critical(
"Assertion failed constructing the ShortVariant object. \n"
"Parameters were: \n"
"\tchrom: {chrom} ({chrom_type})\n"
"\tpos: {pos} ({pos_type})\n"
"\trs: {rs} ({rs_type})\n"
"\tref: {ref} ({ref_type})\n"
"\talt: {alt} ({alt_type})\n".format(
chrom=self.chrom, chrom_type=type(self.chrom),
pos=self.pos, pos_type=type(self.pos),
rs=self.rs, rs_type=type(self.rs),
ref=self.ref, ref_type=type(self.ref),
alt=self.alt, alt_type=type(self.alt),
)
)
traceback.print_tb(sys.exc_info()[2])
raise e
def __hash__(self):
return hash((self.chrom, self.pos, self.ref, self.alt))
def hash(self):
return self.__hash__()
def __eq__(self, other):
if type(self) is not type(other):
return False
return (
self.chrom == other.chrom and
self.pos == other.pos and
self.ref == other.ref and
self.alt == other.alt
)
def vcf_header(self):
"""Returns a valid VCF header line. """
return "\t".join([
"#CHROM",
"POS",
"ID",
"REF",
"ALT",
"QUAL",
"FILTER",
"INFO",
])
def vcf_line(self):
"""Returns a line describing the current variant as expected by the VCF
format.
"""
return "\t".join(str(i) for i in [
self.chrom,
self.pos,
self.rs if self.rs else ".",
self.ref,
self.alt,
".", # No Quality
"PASS", # PASS for filter
".", # Info
])
def in_gene(self, gene):
"""Method to test if the variant is in the gene.
:param: The gene object to test against.
By duck typing, this could be anything with a ``chrom``,
``start`` and ``end`` (or a ``pos``).
:returns: True is the variant is in the gene (region comparison).
:rtype: bool
"""
ret = (str(self.chrom) == str(gene.chrom) and
gene.start <= self.pos <= gene.end)
return ret
def load_ensembl_annotations(self, build=settings.BUILD):
"""Uses the Ensembl API to get annotations for this SNP.
Available annotations are:
- MAF
- evidence
- most_severe_consequence
This will add all of them to the `_info` field.
"""
if self.rs is None:
raise Exception("Getting annotation for '{}' from Ensembl "
"requires the 'rs' field to be set (and not None)"
".".format(self))
url = get_url_prefix(build)
url += ("variation/homo_sapiens/{snp}"
"?content-type=application/json").format(snp=self.rs)
response = query_ensembl(url)
if not hasattr(self, "_info"):
self._info = {}
self._info["maf"] = response.get("MAF", None)
if self._info["maf"] is not None:
self._info["maf"] = float(self._info["maf"])
self._info["evidence"] = ",".join(response.get("evidence", []))
self._info["most_severe_consequence"] = response.get(
"most_severe_consequence",
None
)
@staticmethod
def from_ensembl_api(rs, build=settings.BUILD):
"""Builds the correct ShortVariant subclass for the specified rs
number.
:param rs: The rs number describing the variant of interest.
:type rs: str
:param build: The genome build (either GRCh37 or GRCh38, default is
defined in the settings.
:type build: str
This method will fetch the data from the Ensembl rest api and try
to guess the right Variant subclass (e.g. SNP or Indel).
"""
url = get_url_prefix(build)
url += "variation/homo_sapiens/{snp}?content-type=application/json"
url = url.format(snp=rs)
response = query_ensembl(url)
if response is None:
return response
for mapping in response["mappings"]:
if mapping["assembly_name"] == build:
# This is a SNP
if mapping["start"] == mapping["end"]:
pos = "chr{}_{}".format(
mapping["location"].split("-")[0],
mapping["allele_string"]
)
# Note that for multi (>2) allelic loci, this will
# return a list of variants.
return SNP.from_str(pos, rs=rs)
# Otherwise an Indel
else:
return Indel._parse_ensembl_indel(rs, mapping)
def __repr__(self):
return "chr{}:{}_{}/{}".format(self.chrom, self.pos, self.ref,
self.alt)
class Indel(ShortVariant):
[docs] """Class representing short insertions/deletions (Indels).
Either initialize with the parameters corresponding to:
``chrom, pos, rs, ref, alt`` or by using the corresponding
named parameters.
The notation we are using is consistent with the VCF format, but it is
different from some API (e.g. Ensembl). We will try to standardize
everything to comply with the VCF format.
The latter represents deletions by using the preceding nucleotide for the
reference.
*e.g.* If the genomic sequence is AAGAA -> AAAA (deletion of the G)
the indel will be represented as follows:
- Start: 2
- Ref: `AG`
- Alt: `A`
- length: 1 (difference between allele lengths)
*e.g.* If the genomic sequence is AAGAA -> AAGCAA (insertion of the C)
the indel will be represented as follows:
- Start: 3
- Ref: `G`
- Alt: `GC`
- length: 1
"""
def __init__(self, *args, **kwargs):
if "skip_allele_check" in kwargs:
skip_allele_check = kwargs.pop("skip_allele_check")
else:
skip_allele_check = False
super(Indel, self).__init__(*args, **kwargs)
if not skip_allele_check:
assert "-" not in self.ref
assert "-" not in self.alt
@property
def length(self):
"""Computes the size of the indel.
This is the difference between the length of the alleles.
"""
return abs(len(self.ref) - len(self.alt))
@classmethod
def _parse_ensembl_indel(cls, rs, mapping):
chrom = str(mapping["seq_region_name"])
pos = mapping["start"]
alleles = str(mapping["allele_string"]).split("/")
ref = alleles[0]
alts = alleles[1:]
indels = []
with Reference() as reference:
for alt in list(alts):
indel = cls(chrom, pos, rs, ref, alt, skip_allele_check=True)
indel = check_indel_reference(indel, reference, True)
indels.append(indel)
return indels
@classmethod
def from_ensembl_api(cls, rs, build=settings.BUILD):
[docs] """Gets the information from the Ensembl REST API.
:param rs: The rs number for the variant of interest.
:param build: The genome build (e.g. GRCh37 or GRCh38).
"""
variants = ShortVariant.from_ensembl_api(rs, build)
return [v for v in variants if v.__class__ is cls]
class SNP(ShortVariant):
[docs] """Class representing a Single Nucleotide Polymorphism (SNP).
Instances can be created in two ways: either by providing ordered fields:
``chrom, pos, rs, ref, alt`` or by using named parameters.
"""
def __init__(self, *args, **kwargs):
super(SNP, self).__init__(*args, **kwargs)
@classmethod
def from_ensembl_api(cls, rs, build=settings.BUILD):
[docs] """Gets the information from the Ensembl REST API.
:param rs: The rs number for the variant of interest.
:param build: The genome build (e.g. GRCh37 or GRCh38).
"""
variants = ShortVariant.from_ensembl_api(rs, build)
return [v for v in variants if v.__class__ is cls]
@classmethod
def from_str(cls, s, rs=None):
[docs] """Parses a variant object from a str of the form chrXX:YYYY_R/A.
:param s: The string to parse the SNP from (Format: chrXX:YYY_R/A).
:param rs: An optional parameter specifying the rs number.
:returns: A list of SNP objects.
If it is a multi-allelic loci, the list will contain one SNP per
alternative allele.
If it is not, this will be a list of length 1...
"""
s = s.lstrip("chr")
chrom, s = s.split(":")
pos, s = s.split("_")
alleles = s.split("/")
ref = alleles[0]
alts = list(alleles[1:])
var_list = []
for alt in alts:
v = cls(chrom, pos, rs, ref, alt)
var_list.append(v)
return var_list
def variant_list_to_dataframe(variants):
[docs] """Converts a regular Python list of Variant objects to a Pandas dataframe.
:param variants: A list of :py:class:`Variant` objects.
:type variants: list
:returns: A Pandas dataframe with genomic information as well as extra
fields defined in the `_info` parameter.
:rtype: DataFrame
This function will add annotations from the `_info` dict if such a
parameter is set for the considered variants. This dict has to be
comparable between all elements of the list or an Exception will be raised.
This functionality can be very useful to flexibly add annotations to
variants and to write them to a CSV file.
"""
import pandas as pd
import numpy as np
# Check if we have extra information.
if hasattr(variants[0], "_info"):
extra_fields = sorted(list(variants[0]._info.keys()))
else:
extra_fields = []
def _as_tuple(v):
li = [v.chrom, v.pos, v.rs, v.ref, v.alt]
if len(extra_fields) != 0:
field_mismatch = ("Variants in list have to be comparable to "
"create a consistent DataFrame.")
assert extra_fields == sorted(list(v._info.keys())), field_mismatch
for k in extra_fields:
li.append(v._info[k])
return tuple(li)
df = pd.DataFrame(
[_as_tuple(v) for v in variants],
columns=["chrom", "pos", "rs", "ref", "alt"] + extra_fields,
)
df = df.fillna(value=np.nan)
return df