# Methods build generic indices for text files containing genomic coordinates.
#
# This file is part of gepyto.
#
# This work is licensed under the Creative Commons Attribution-NonCommercial
# 4.0 International License. To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to Creative
# Commons, PO Box 1866, Mountain View, CA 94042, USA.
__author__ = "Marc-Andre Legault"
__copyright__ = ("Copyright 2014 Marc-Andre Legault and Louis-Philippe "
"Lemieux Perreault. All rights reserved.")
__license__ = "Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)"
try:
import cPickle as pickle
except ImportError:
import pickle
import bisect
import logging
import re
import os
import functools
import numpy as np
MAGIC_NUMBER = 10 ** 9 # This should be larger than any indexed position.
class EndOfFile(Exception):
pass
class ChromosomeNotIndexed(Exception):
pass
def _get_locus(line, chrom_col, pos_col, delimiter):
if not line:
raise EndOfFile()
line = line.rstrip().split(delimiter)
chrom = line[chrom_col]
if chrom.startswith("chr"):
chrom = chrom[3:]
pos = int(line[pos_col])
return chrom, pos
[docs]def build_index(fn, chrom_col, pos_col, delimiter='\t', skip_lines=0,
index_rate=0.2, ignore_startswith=None):
"""Build a index for the given file.
:param fn: The filename
:type fn: str
:param chrom_col: The column representing the chromosome (0 based).
:type chrom_col: int
:param pos_col: The column for the position on the chromosome (0 based).
:type pos_col: int
:param delimiter: The delimiter for the columns (default tab).
:type delimiter: str
:param skip_lines: Number of header lines to skip.
:type skip_lines: int
:param index_rate: The approximate rate of line indexing. As an example,
a file with 1000 lines and the default index_rate of
0.2 will have an index with ~200 entries.
:type index_rate: float
:param ignore_startswith: Ignore lines that start with a given string.
This can be used to skip headers, but will not
be used to parse the rest of the file.
:type ignore_startswith: str
:returns: The index filename.
:rtype: str
"""
assert chrom_col != pos_col
idx_fn = _get_index_fn(fn)
size = os.path.getsize(fn) # Total filesize
start = 0 # Byte position of the meat of the file (data).
with open(fn, "r") as f:
if skip_lines > 0:
for i in range(skip_lines):
# Skip header lines if needed.
f.readline()
current_position = f.tell()
# Skip the lines that start with the user provided string.
if ignore_startswith is not None:
line = f.readline()
while line.startswith(ignore_startswith):
current_position = f.tell()
line = f.readline()
f.seek(current_position)
start = f.tell()
size -= start # Adjust file size to remove header.
# Estimate the line length using first 100 lines
line_length = np.empty((100))
prev = start
# We take 100 sample positions in the file.
for i, jump in enumerate(np.linspace(0, 0.9 * size, 100)):
f.seek(start + int(jump))
f.readline() # Throw away chunk.
line_start = f.tell()
f.readline()
line_length[i] = f.tell() - line_start
line_length = np.mean(line_length)
approx_num_lines = size / line_length
# Go back to start
f.seek(start)
# Prepare the variables to keep track of the position encoding.
encoding = {
"current_key": 0,
"chromosomes": {}
}
index = [] # List of (code, seek) tuples.
def encode_locus(chrom, pos):
chrom = str(chrom)
if chrom not in encoding["chromosomes"]:
encoding["current_key"] += 1
encoding["chromosomes"][chrom] = encoding["current_key"]
return encoding["chromosomes"][chrom] * MAGIC_NUMBER + pos
# Bind some parameters so that function calls look nicer.
get_locus = functools.partial(
_get_locus,
chrom_col=chrom_col,
pos_col=pos_col,
delimiter=delimiter
)
# Add the first line to the index.
chrom1, pos1 = get_locus(f.readline())
index.append((encode_locus(chrom1, pos1), start))
# Compute the seek jump size.
target_num_lines = index_rate * approx_num_lines
seek_jump = size / target_num_lines
# We start indexing here.
current_position = f.tell()
if index_rate == 1:
logging.debug("Full indexing mode.")
tell = f.tell()
line = f.readline()
while line:
chrom, pos = get_locus(line)
code = encode_locus(chrom, pos)
if index[-1][0] > code:
raise Exception("This file is not sorted.")
elif index[-1][0] == code:
pass
else:
index.append((code, tell))
tell = f.tell()
line = f.readline()
else:
logging.debug("Sparse indexing mode.")
while current_position + seek_jump < size + start:
# Jump in the file.
f.seek(current_position + seek_jump)
# Throw away partial line.
f.readline()
# We need to make sure this is a unique line.
try:
cur_chrom, cur_pos = get_locus(f.readline())
tell = f.tell()
next_chrom, next_pos = get_locus(f.readline())
while next_chrom == cur_chrom and next_pos == cur_pos:
tell = f.tell()
next_chrom, next_pos = get_locus(f.readline())
# We found "new" content.
# First we make sure that the file looks sorted. Then, we
# index it.
code = encode_locus(next_chrom, next_pos)
if index[-1][0] > code:
raise Exception("This file is not sorted.")
index.append((code, tell))
current_position = tell
except EndOfFile:
break # Reached the end of the file.
index = np.array(index)
# Create a dict containing the relevant information to be able to find the
# chromosome and position columns.
info = {"chrom_col": chrom_col, "pos_col": pos_col, "delimiter": delimiter,
"chrom_codes": encoding["chromosomes"]}
pickle_string = pickle.dumps(info)
# Write the index pickle (and numpy matrix).
with open(idx_fn, "wb") as f:
f.write(pickle_string)
np.save(f, index)
return idx_fn
[docs]def get_index(fn):
"""Restores the index for a given file or builds it if the index was not
previously created.
:param fn: The filname of the file to index.
:type fn: str
:returns: The numpy array representing the actual index.
:rtype: :py:class:`numpy.ndarray`
"""
# Read the information pickle part.
# We use the numpy format definition to know when to stop:
# https://github.com/numpy/numpy/blob/master/doc/neps/npy-format.rst
indexed_filename = fn
fn = _get_index_fn(indexed_filename)
with open(fn, "rb") as f:
i = 0
chunk = None
while chunk != b"\x93NUMPY":
f.seek(i)
chunk = f.read(6)
if not chunk:
raise Exception("Invalid format for the index.")
i += 1
pickle_length = f.tell() - 6
f.seek(0)
info = pickle.loads(f.read(pickle_length))
index = np.load(f)
# This is a class that will behave like the (info, index) tuple.
# It is only used to make it printing the index object prettier.
class Index(object):
def __init__(self, info, index, filename):
self.filename = filename
self.info = info
self.index = index
def __getitem__(self, key):
if key == 0:
return self.info
elif key == 1:
return self.index
else:
raise IndexError()
def __iter__(self):
return (i for i in (self.info, self.index))
def __repr__(self):
return "<{} object for file '{}'>".format(self.__class__.__name__,
indexed_filename)
return Index(info, index, fn)
[docs]def goto(f, index, chrom, pos):
"""Given a file, a locus and the index, go to the genomic coordinates.
:param f: An open file.
:type f: file
:param index: This is actually a tuple. The first element is an information
dict containing the delimiter, chromosome column and position
column. The second element is a numpy matrix containing the
encoded loci and the "tell" positions.
:param index: tuple
:param chrom: The queried chromosome.
:param pos: The queried position on the chromosome.
:returns: True if the position was found and the cursor moved, False if
the queried chromosome, position wasn't found.
:rtype: bool
"""
# Type checks for the parameters.
chrom = str(chrom)
if chrom.startswith("chr"):
chrom = chrom[3:]
pos = int(pos)
info, index = index
# Encode the locus.
chrom_code = info["chrom_codes"].get(str(chrom))
if chrom_code is None:
raise ChromosomeNotIndexed(
"Chromosome '{}' is not in the index.".format(chrom)
)
code = chrom_code * MAGIC_NUMBER + pos
logging.debug("Looking for code: {}".format(code))
# Find the boundaries for the locus.
boundary = bisect.bisect_left(index[:, 0], code)
# The hit is at the end of the index.
if boundary == index.shape[0]:
logging.debug("Looking from the end of the index.")
return goto_fine(f, chrom, pos, index[-1][1], float("+infinity"), info)
if index[boundary, 0] == code:
# We got a direct hit.
logging.debug("Direct hit on locus.")
f.seek(index[boundary, 1])
return True
# We always index the first position, so nothing should come before unless
# it's a direct hit.
if boundary == 0:
logging.debug("Locus before first index.")
left = index[boundary - 1, 1]
left_code = index[boundary, 0]
try:
right = index[boundary + 1, 1]
right_code = index[boundary + 1, 0]
except IndexError:
right = float("+infinity") # Right boundary not in index, use the EOF.
right_code = float("+infinity")
logging.debug("Found boundaries: {}:{} and {}:{}. Using linear search to "
"find the exact position.".format(left_code, left,
right_code, right))
return goto_fine(f, chrom, pos, left, right, info)
def goto_fine(f, chrom, pos, left, right, info):
# Go to the start of the plausible region.
f.seek(left)
# Bind some parameters to the line parser.
get_locus = functools.partial(
_get_locus,
chrom_col=info["chrom_col"],
pos_col=info["pos_col"],
delimiter=info["delimiter"],
)
# Iterate through the file.
while True:
here = f.tell()
try:
this_chrom, this_pos = get_locus(f.readline())
# We got it!
if this_chrom == chrom and this_pos == pos:
f.seek(here)
return True
except EndOfFile:
logging.debug("Reached end of file without finding the locus.")
return False
if here > right:
logging.debug("Didn't find the locus before the right boundary.")
return False
def _get_index_fn(fn):
"""Generates the index filename from the path to the indexed file.
:param fn: The name of the file to index.
:type fn: str
"""
if not os.path.isfile(fn):
raise Exception("File '{}' does not exist.".format(
fn
))
return os.path.abspath("{}.gtidx".format(fn))