Source code for prestools.bioinf

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Created by Roberto Preste
import random
import numpy as np
from scipy import stats
from math import log, sqrt
from itertools import combinations
from typing import Union, Dict

_NT_LIST = ["A", "C", "G", "T"]

_AA_LIST = ["A", "C", "D", "E", "F", "G", "H", "I", "K", "L", "M", "N", "P",
            "Q", "R", "S", "T", "V", "W", "Y"]

_TRANSITIONS = ["AG", "GA", "CT", "TC"]

_TRANSVERSIONS = ["AC", "CA", "AT", "TA", "GC", "CG", "GT", "TG"]

_NT_DICT = {"A": "Adenine", "C": "Cytosine", "G": "Guanine", "T": "Thymine",
            "U": "Uracil", "R": "A/G", "Y": "C/T", "S": "G/C", "W": "A/T",
            "K": "G/T", "M": "A/C", "B": "C/G/T", "D": "A/G/T", "H": "A/C/T",
            "V": "A/C/G", "N": "A/C/G/T", "./-": "gap"}

_AA_DICT = {"A": ("Ala", "Alanine"), "B": ("Asx", "Aspartic Acid/Asparagine"),
            "C": ("Cys", "Cysteine"), "D": ("Asp", "Aspartic Acid"),
            "E": ("Glu", "Glutamic Acid"), "F": ("Phe", "Phenylalanine"),
            "G": ("Gly", "Glycine"), "H": ("His", "Histidine"),
            "I": ("Ile", "Isoleucine"), "K": ("Lys", "Lysine"),
            "L": ("Leu", "Leucine"), "M": ("Met", "Methionine"),
            "N": ("Asn", "Asparagine"), "P": ("Pro", "Proline"),
            "Q": ("Gln", "Glutamine"), "R": ("Arg", "Arginine"),
            "S": ("Ser", "Serine"), "T": ("Thr", "Threonine"),
            "V": ("Val", "Valine"), "W": ("Trp", "Tryptophan"),
            "X": ("Xaa", "Any"), "Y": ("Tyr", "Tyrosine"),
            "Z": ("Glx", "Glutamine/Glutamic Acid"), "*": ("***", "Stop")}

_COMPLEM_DICT = {"A": "T", "C": "G", "G": "C", "T": "A", "U": "A", "R": "Y",
                 "Y": "R", "S": "W", "W": "S", "K": "M", "M": "K", "B": "A",
                 "D": "C", "H": "G", "V": "T"}

[docs]def hamming_distance(seq_1: str, seq_2: str, ignore_case: bool = False) -> int: """Calculate the Hamming distance between two sequences. Args: seq_1: first sequence to compare seq_2: second sequence to compare ignore_case: ignore case when comparing sequences (default: False) Returns: distance: Hamming distance """ if len(seq_1) != len(seq_2): raise ValueError("Cannot calculate Hamming distance of " "sequences with different lengths.") if ignore_case: seq_1 = seq_1.casefold() seq_2 = seq_2.casefold() distance = sum([1 for i in range(len(seq_1)) if seq_1[i] != seq_2[i] and seq_1[i] != "-" and seq_2[i] != "-"]) return distance
[docs]def aa_one_to_three(sequence: str) -> str: """Convert one-letter amino acid code to three-letter code. Args: sequence: sequence of amino acids in one-letter code Returns: new_seq: sequence converted to three-letter code """ new_seq = "".join([_AA_DICT[aa.upper()][0] for aa in sequence]) return new_seq
[docs]def aa_three_to_one(sequence: str) -> str: """Convert three-letter amino acid code to one-letter code. Args: sequence: sequence of amino acids in three-letter code Returns: new_seq: sequence converted to one-letter code """ # TODO: this is very ugly, will have to refactor it new_seq = "" for n in range(0, len(sequence), 3): aa = sequence[n: n + 3].capitalize() for cod in _AA_DICT: if _AA_DICT[cod][0] == aa: new_seq += cod break return new_seq
[docs]def reverse_complement(sequence: str, conversion: str = "reverse_complement") -> str: """Convert a nucleotide sequence into its reverse complement. Convert a nucleotide sequence into its reverse, complement or reverse complement. Args: sequence: nucleotide sequence to be converted conversion: type of conversion to perform ('r'|'reverse', 'c'|'complement', 'rc'|'reverse_complement') (default: 'rc'|'reverse_complement') Returns: converted sequence """ if conversion not in ["r", "c", "rc", "reverse", "complement", "reverse_complement"]: raise ValueError("Invalid conversion option.") if conversion in ["r", "reverse"]: return sequence[::-1] elif conversion in ["c", "complement"]: return "".join([_COMPLEM_DICT[nt.upper()] for nt in sequence]) return "".join([_COMPLEM_DICT[nt.upper()] for nt in sequence])[::-1]
[docs]def shuffle_sequence(sequence: str) -> str: """Shuffle the given sequence. Randomly shuffle a sequence, maintaining the same composition. Args: sequence: input sequence to shuffle Returns: tmp_seq: shuffled sequence """ tmp_seq: str = "" while len(sequence) > 0: max_num = len(sequence) rand_num = random.randrange(max_num) tmp_char = sequence[rand_num] tmp_seq += tmp_char tmp_str_1 = sequence[:rand_num] tmp_str_2 = sequence[rand_num + 1:] sequence = tmp_str_1 + tmp_str_2 return tmp_seq
[docs]def random_sequence(length: Union[int, str], alphabet: str = "nt") -> str: """Create a random sequence of the given length. Create a random sequence of the given length using the specified alphabet (nucleotides or amino acids). Args: length: desired length of the random sequence alphabet: character alphabet to use ('nt', 'aa') (default: 'nt') Returns: sequence: new random sequence """ if alphabet not in ["nt", "aa"]: raise ValueError("Invalid alphabet option.") sequence = "" elems = _NT_LIST if alphabet == "nt" else _AA_LIST for i in range(int(length)): temp_char = random.choice(elems) sequence += temp_char return sequence
[docs]def mutate_sequence(sequence: str, mutations: int = 1, alphabet: str = "nt") -> str: """Mutate a sequence introducing a given number of mutations. Introduce a specific number of mutations into the given sequence. Args: sequence: input sequence to mutate mutations: number of mutations to introduce (default: 1) alphabet: character alphabet to use ('nt', 'aa') (default: 'nt') Returns: sequence: mutated sequence """ if alphabet not in ["nt", "aa"]: raise ValueError("Invalid alphabet option.") elems = _NT_LIST if alphabet == "nt" else _AA_LIST for n in range(mutations): max_num = len(sequence) rand_num = random.randrange(max_num) curr_char = sequence[rand_num] mut_char = random.choice(elems) while mut_char == curr_char: mut_char = random.choice(elems) sequence = sequence[0: rand_num] + mut_char + sequence[rand_num + 1:] return sequence
[docs]def nt_frequency(sequence: str) -> Dict[str, float]: """Calculate nucleotide frequencies. Return a dictionary with nucleotide frequencies from the given sequence. Args: sequence: input nucleotide sequence Returns: freqs: dictionary of nucleotide frequencies """ sequence = sequence.upper() length = len(sequence) freqs = {nt: sequence.count(nt)/length for nt in _NT_LIST} return freqs
[docs]def p_distance(seq_1: str, seq_2: str) -> float: """Calculate the pairwise distance between two sequences. Return the uncorrected distance between seq_1 and seq_2. Args: seq_1: first sequence to compare seq_2: second sequence to compare Returns: distance: pairwise distance """ distance = hamming_distance(seq_1, seq_2, ignore_case=True) / len(seq_1) return distance
[docs]def jukes_cantor_distance(seq_1: str, seq_2: str) -> float: """Calculate the Jukes-Cantor distance between two sequences. Return the Jukes-Cantor distance between seq_1 and seq_2, calculated as distance = -b log(1 - p/b) where b = 3/4 and p = p_distance. Args: seq_1: first sequence to compare seq_2: second sequence to compare Returns: distance: Jukes-Cantor distance """ b = 0.75 p = p_distance(seq_1, seq_2) try: distance = -b * log(1 - p/b) except ValueError: raise ValueError("Cannot calculate log of a negative number.") return distance
[docs]def tajima_nei_distance(seq_1: str, seq_2: str) -> float: """Calculate the Tajima-Nei distance between two sequences. Return the Tajima-Nei distance between seq_1 and seq_2, calculated as distance = -b log(1 - p / b) where b = 0.5 * [1 - Sum i from A to T(Gi^2+p^2/h)] h = Sum i from A to G(Sum j from C to T (Xij^2/2*Gi*Gj)) p = p-distance Xij = frequency of pair (i,j) in seq1 and seq2, with gaps removed Gi = frequency of base i over seq1 and seq2 Args: seq_1: first sequence to compare seq_2: second sequence to compare Returns: distance: Tajima-Nei distance """ G = nt_frequency(seq_1 + seq_2) p = p_distance(seq_1, seq_2) h = 0.0 pairs = [el for el in zip(seq_1, seq_2) if "-" not in el] nt_pairs = combinations(_NT_LIST, 2) length = len(pairs) for el in nt_pairs: paircount = pairs.count(el) + pairs.count(el[::-1]) x_ij_sq = (paircount / length) ** 2 gi_gj = G[el[0]] * G[el[1]] h += 0.5 * x_ij_sq / gi_gj b = 0.5 * (1 - sum([G[nt] ** 2 for nt in G]) + p ** 2 / h) try: distance = -b * log(1 - p/b) except ValueError: raise ValueError("Cannot calculate log of a negative number.") return distance
[docs]def kimura_distance(seq_1: str, seq_2: str) -> float: """Calculate the Kimura 2-Parameter distance between two sequences. Return the Kimura 2-Parameter distance between seq_1 and seq_2, calculated as distance = -0.5 log((1 - 2p -q) * sqrt( 1 - 2q )) where p = transition frequency and q = transversion frequency. Args: seq_1: first sequence to compare seq_2: second sequence to compare Returns: distance: Kimura distance """ pairs = [el for el in zip(seq_1, seq_2) if "-" not in el] ts = 0 tv = 0 length = len(pairs) for pair in pairs: if pair[0] + pair[1] in _TRANSITIONS: ts += 1 elif pair[0] + pair[1] in _TRANSVERSIONS: tv += 1 p = ts / length q = tv / length try: distance = -0.5 * log((1 - 2 * p - q) * sqrt(1 - 2 * q)) except ValueError: raise ValueError("Cannot calculate log of a negative number.") return distance
[docs]def tamura_distance(seq_1: str, seq_2: str) -> float: """Calculate the Tamura distance between two sequences. Return the Tamura distance between seq_1 and seq_2, calculated as distance = -C log(1 - P/C - Q) - 0.5(1 - C)log(1 - 2Q) where P = transition frequency Q = transversion frequency C = GC1 + GC2 - 2 * GC1 * GC2 GC1 = GC-content of seq_1 GC2 = GC-content of seq_2 Args: seq_1: first sequence to compare seq_2: second sequence to compare Returns: distance: Tamura distance """ pairs = [el for el in zip(seq_1, seq_2) if "-" not in el] ts = 0 tv = 0 length = len(pairs) for pair in pairs: if pair[0] + pair[1] in _TRANSITIONS: ts += 1 elif pair[0] + pair[1] in _TRANSVERSIONS: tv += 1 p = ts / length q = tv / length fr1 = nt_frequency(seq_1) fr2 = nt_frequency(seq_2) gc1 = fr1["C"] + fr1["G"] gc2 = fr2["C"] + fr2["G"] c = gc1 + gc2 - 2 * gc1 * gc2 try: distance = -c * log(1 - p/c - q) - 0.5 * (1 - c) * log(1 - 2*q) except ValueError: raise ValueError("Cannot calculate log of a negative number.") return distance
[docs]def rpkm(counts: np.ndarray, lengths: np.ndarray) -> np.ndarray: """Calculate reads per kilobase transcript per million reads. RPKM = (10^9 * C) / (N * L) Where: C = Number of reads mapped to a gene N = Total mapped reads in the experiment L = Exon length in base pairs for a gene Args: counts: count data where columns are individual samples and rows are genes, of shape (N_genes, N_samples) lengths: gene lengths in base pairs in the same order as the rows in counts, of shape (N_genes, ) Returns: normed: RPKM normalized counts matrix, of shape (N_genes, N_samples) """ c = counts.astype(float) n = np.sum(c, axis=0) n = n.reshape((1, n.shape[0])) l = lengths.reshape((lengths.shape[0], 1)) normed = 1e9 * c / (n * l) return normed
[docs]def quantile_norm(x: np.ndarray, to_log: bool = False) -> np.ndarray: """Normalize the columns of X to each have the same distribution. Given an expression matrix (microarray data, read counts, etc) of M genes by N samples, quantile normalization ensures all samples have the same spread of data (by construction). The data across each row are averaged to obtain an average column. Each column quantile is replaced with the corresponding quantile of the average column. Args: x: array of input data, of shape (N_genes, N_samples) to_log: log-transform the data before normalising (default: False) Returns: xn: array of normalised data, of shape (N_genes, N_samples) """ if to_log: x = np.log(x + 1) quantiles = np.mean(np.sort(x, axis=0), axis=1) ranks = np.apply_along_axis(stats.rankdata, 0, x) rank_indices = ranks.astype(int) - 1 xn = quantiles[rank_indices] return xn