Commit 024bf2ff authored by Martin Larralde's avatar Martin Larralde
Browse files

Inline `numpy` and naive implementation in relevant functions

parent 91ab2ec4
......@@ -3,7 +3,9 @@
import array
import collections
import functools
import math
import operator
import random
import statistics
import typing
......@@ -11,10 +13,9 @@ import typing
from . import tables, datasets
try:
from numpy import array, prod, sum, take, zeros
import numpy
except ImportError:
from ._npcompat import array, prod, take, zeros
numpy = None
__all__ = ["Peptide", "tables", "datasets"]
__version__ = "0.2.0"
......@@ -243,9 +244,9 @@ class Peptide(typing.Sequence[str]):
self.sequence: str = sequence
# store an encoded version of the sequence as an array of indices
encoder = {aa:i for i,aa in enumerate(self._CODE1)}
self.encoded = zeros(len(sequence), dtype='B')
self.encoded = array.array('B')
for i, aa in enumerate(sequence):
self.encoded[i] = encoder.get(aa, encoder["X"])
self.encoded.append(encoder.get(aa, encoder["X"]))
def __len__(self) -> int:
return len(self.sequence)
......@@ -309,11 +310,19 @@ class Peptide(typing.Sequence[str]):
sigma = statistics.stdev(table.values())
table = {k: (v - mu) / sigma for k, v in table.items()}
# build look up table
lut = array([table.get(aa, 0.0) for aa in self._CODE1])
lut = [table.get(aa, 0.0) for aa in self._CODE1]
# compute using Cruciani formula
v1 = array(lut.take(self.encoded[:-lag]))
v2 = array(lut.take(self.encoded[lag:]))
return (v1*v2).sum() / (v1**2).sum()
if numpy is None:
s1 = s2 = 0.0
for aa1, aa2 in zip(self.encoded[:-lag], self.encoded[lag:]):
s1 += lut[aa1] * lut[aa2]
s2 += lut[aa1] ** 2
else:
v1 = numpy.take(lut, self.encoded[:-lag])
v2 = numpy.take(lut, self.encoded[lag:])
s1 = numpy.sum(v1*v2)
s2 = numpy.sum(v1**2)
return s1 / s2
def auto_covariance(
self, table: typing.Dict[str, float], lag: int = 1, center: bool = True
......@@ -335,11 +344,17 @@ class Peptide(typing.Sequence[str]):
sigma = statistics.stdev(table.values())
table = {k: (v - mu) / sigma for k, v in table.items()}
# build the lookup table
lut = array([table.get(aa, 0.0) for aa in self._CODE1])
lut = [table.get(aa, 0.0) for aa in self._CODE1]
# compute correlation using Cruciani formula
v1 = lut.take(self.encoded[:-lag])
v2 = lut.take(self.encoded[lag:])
return (v1 * v2).sum() / len(self)
if numpy is None:
s = 0.0
for aa1, aa2 in zip(self.encoded[:-lag], self.encoded[lag:]):
s += lut[aa1] * lut[aa2]
else:
v1 = numpy.take(lut, self.encoded[:-lag])
v2 = numpy.take(lut, self.encoded[lag:])
s = numpy.sum(v1*v2)
return s / len(self)
def cross_covariance(
self,
......@@ -370,14 +385,19 @@ class Peptide(typing.Sequence[str]):
table2 = {k: (v - mu2) / sigma2 for k, v in table2.items()}
# build the lookup table
lut1 = array([table1.get(aa, 0.0) for aa in self._CODE1])
lut2 = array([table2.get(aa, 0.0) for aa in self._CODE1])
lut1 = [table1.get(aa, 0.0) for aa in self._CODE1]
lut2 = [table2.get(aa, 0.0) for aa in self._CODE1]
# compute using Cruciani formula
v1 = lut1.take(self.encoded[:-lag])
v2 = lut2.take(self.encoded[lag:])
return (v1*v2).sum() / len(self)
if numpy is None:
s = 0.0
for aa1, aa2 in zip(self.encoded[:-lag], self.encoded[lag:]):
s += lut1[aa1] * lut2[aa2]
else:
v1 = numpy.take(lut1, self.encoded[:-lag])
v2 = numpy.take(lut2, self.encoded[lag:])
s = numpy.sum(v1*v2)
return s / len(self)
# --- Sequence properties -----------------------------------------------
......@@ -488,8 +508,8 @@ class Peptide(typing.Sequence[str]):
:doi:`10.1046/j.1365-2796.2003.01228.x`. :pmid:`12930229`.
"""
lut = array([tables.BOMAN["Boman"].get(aa, 0.0) for aa in self._CODE1])
return -lut.take(self.encoded).sum() / len(self.sequence)
lut = [tables.BOMAN["Boman"].get(aa, 0.0) for aa in self._CODE1]
return -_takesum(lut, self.encoded) / len(self)
def charge(self, pH: float = 7, pKscale: str = "Lehninger") -> float:
"""Compute the theoretical net charge of a peptide sequence.
......@@ -570,24 +590,32 @@ class Peptide(typing.Sequence[str]):
"""
# get chosen the pKa scale
scale = tables.PK.get(pKscale)
if scale is None:
scale_pKa = tables.PK.get(pKscale)
if scale_pKa is None:
raise ValueError(f"Invalid pK scale: {scale!r}")
scale_sign = tables.CHARGE["sign"]
# build a look-up table for the pKa scale and the charge sign
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
sign_lut = array([tables.CHARGE["sign"].get(aa, 0.0) for aa in self._CODE1])
# compute charge of each amino-acid and sum
pK = lut.take(self.encoded)
sign = array(sign_lut.take(self.encoded))
charge = (sign / (1 + 10**(sign * (pH - pK)))).sum()
lut_pKa = [scale_pKa.get(aa, 0.0) for aa in self._CODE1]
lut_sign = [scale_sign.get(aa, 0.0) for aa in self._CODE1]
# compute charge of each amino-acid, and sum
if numpy is None:
charge = 0.0
for aa in self.encoded:
pKa = lut_pKa[aa]
sign = lut_sign[aa]
charge += sign / (1.0 + 10 ** (sign * (pH - pKa)))
else:
pKa = numpy.take(lut_pKa, self.encoded)
sign = numpy.take(lut_sign, self.encoded)
charge = numpy.sum(sign / (1.0 + 10**(sign * (pH - pKa))))
# add charge for C-terminal and N-terminal ends of the peptide
if "nTer" in scale:
charge += 1.0 / (1.0 + 10 ** (1.0 * (pH - scale["nTer"])))
if "cTer" in scale:
charge += -1.0 / (1.0 + 10 ** (-1.0 * (pH - scale["cTer"])))
if "nTer" in scale_pKa:
charge += 1.0 / (1.0 + 10 ** (pH - scale_pKa["nTer"]))
if "cTer" in scale_pKa:
charge += -1.0 / (1.0 + 10 ** (scale_pKa["cTer"] - pH))
# return the net protein charge
return charge
......@@ -630,18 +658,28 @@ class Peptide(typing.Sequence[str]):
"""
scale = tables.HYDROPHOBICITY["Eisenberg"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
angles = [(angle * i) % 360 for i in range(window)]
angsin = array([math.sin(math.radians(a)) for a in angles])
angcos = array([math.cos(math.radians(a)) for a in angles])
if numpy is None:
angsin = [math.sin(math.radians(theta)) for theta in angles]
angcos = [math.cos(math.radians(theta)) for theta in angles]
else:
angsin = numpy.sin(numpy.radians(angles))
angcos = numpy.cos(numpy.radians(angles))
maxnorm = 0.0
for i in range(len(self.sequence) - window + 1):
# compute sin and cos of angles
hvec = array(lut.take(self.encoded[i:i+window]))
sumsin = (hvec * angsin).sum()
sumcos = (hvec * angcos).sum()
if numpy is None:
sumsin = sumcos = 0
for aa, s, c in zip(self.encoded[i:i+window], angsin, angcos):
sumsin += lut[aa]*s
sumcos += lut[aa]*c
else:
hvec = numpy.take(lut, self.encoded[i:i+window])
sumsin = numpy.sum(hvec * angsin)
sumcos = numpy.sum(hvec * angcos)
# compute only the distance component (this way we can avoid
# computing the square root in each iteration)
norm = sumsin**2 + sumcos**2
......@@ -909,8 +947,9 @@ class Peptide(typing.Sequence[str]):
table = tables.HYDROPHOBICITY.get(scale)
if table is None:
raise ValueError(f"Invalid hydrophobicity scale: {scale!r}")
lut = array([table.get(aa, 0.0) for aa in self._CODE1], dtype="d")
return lut.take(self.encoded).sum() / len(self.sequence)
lut = [table.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
return _takesum(lut, self.encoded) / len(self)
def instability_index(self) -> float:
"""Compute the instability index of a protein sequence.
......@@ -1062,8 +1101,12 @@ class Peptide(typing.Sequence[str]):
f"Expected str or dict, found {aa_shift.__class__.__name__}"
)
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
return lut.take(self.encoded).sum() + scale.get("nTer", 0.0) + scale.get("cTer", 0.0)
# build the lookup table
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# sum the mass-shift of each amino acid
shift = _takesum(lut, self.encoded)
# return the shift with C-terminal and N-terminal ends
return shift + scale.get("nTer", 0.0) + scale.get("cTer", 0.0)
def molecular_weight(
self,
......@@ -1110,9 +1153,9 @@ class Peptide(typing.Sequence[str]):
raise ValueError(f"Invalid average weight scale: {average!r}")
# build a look-up table
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# sum the weight of each amino acid
mass = lut.take(self.encoded).sum()
mass = _takesum(lut, self.encoded)
# add weight of water molecules
mass += scale["H2O"]
# add mass shift for labeled proteins
......@@ -1188,9 +1231,9 @@ class Peptide(typing.Sequence[str]):
if scale not in tables.HYDROPHOBICITY:
raise ValueError(f"Invalid hydrophobicity scale: {scale!r}")
profile = zeros(len(self.sequence) - window + 1)
for i in range(len(profile)):
profile[i] = self[i : i + window].hydrophobicity(scale=scale)
profile = array.array('f')
for i in range(len(self.sequence) - window + 1):
profile.append(self[i : i + window].hydrophobicity(scale=scale))
return profile
......@@ -1219,9 +1262,9 @@ class Peptide(typing.Sequence[str]):
maximal hydrophobic moment instead of building a profile.
"""
profile = zeros(len(self.sequence) - window + 1)
for i in range(len(profile)):
profile[i] = (
profile = array.array("f")
for i in range(len(self.sequence) - window + 1):
profile.append(
self[i : i + window].hydrophobic_moment(window=window - 1, angle=angle)
)
......@@ -1484,7 +1527,8 @@ class Peptide(typing.Sequence[str]):
# if Bayes discriminant is requested, add the logarithm of
# the product of all non-null eigenvalues
if distance == "discriminant":
distances[name] += math.log(prod(einvals[1:]))
product = functools.reduce(operator.prod, einvals[1:])
distances[name] += math.log(product)
if not distances:
raise ValueError(
f"Cannot use {frequencies!r} frequencies with "
......@@ -1535,9 +1579,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.BLOSUM)):
# build a look-up table
scale = tables.BLOSUM[f"BLOSUM{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return BLOSUMIndices(*out)
def cruciani_properties(self) -> CrucianiProperties:
......@@ -1575,9 +1619,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.CRUCIANI)):
# build a look-up table
scale = tables.CRUCIANI[f"PP{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return CrucianiProperties(*out)
def fasgai_vectors(self) -> FasgaiVectors:
......@@ -1618,9 +1662,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.FASGAI)):
# build a look-up table
scale = tables.FASGAI[f"F{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return FasgaiVectors(*out)
def kidera_factors(self) -> KideraFactors:
......@@ -1666,9 +1710,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.KIDERA)):
# build a look-up table
scale = tables.KIDERA[f"KF{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return KideraFactors(*out)
def ms_whim_scores(self) -> MSWHIMScores:
......@@ -1713,9 +1757,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.MSWHIM)):
# build a look-up table
scale = tables.MSWHIM[f"MSWHIM{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return MSWHIMScores(*out)
def pcp_descriptors(self) -> PCPDescriptors:
......@@ -1756,9 +1800,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.PCP_DESCRIPTORS)):
# build a look-up table
scale = tables.PCP_DESCRIPTORS[f"E{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return PCPDescriptors(*out)
def physical_descriptors(self) -> PhysicalDescriptors:
......@@ -1804,9 +1848,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.PHYSICAL_DESCRIPTORS)):
# build a look-up table
scale = tables.PHYSICAL_DESCRIPTORS[f"PD{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return PhysicalDescriptors(*out)
def protfp_descriptors(self) -> ProtFPDescriptors:
......@@ -1854,9 +1898,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.PROTFP)):
# build a look-up table
scale = tables.PROTFP[f"ProtFP{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return ProtFPDescriptors(*out)
def sneath_vectors(self) -> SneathVectors:
......@@ -1897,9 +1941,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.SNEATH)):
# build a look-up table
scale = tables.SNEATH[f"SV{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1], "f")
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return SneathVectors(*out)
def st_scales(self) -> STScales:
......@@ -1939,9 +1983,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.ST_SCALES)):
# build a look-up table
scale = tables.ST_SCALES[f"ST{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return STScales(*out)
def t_scales(self) -> TScales:
......@@ -1978,9 +2022,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.T_SCALES)):
# build a look-up table
scale = tables.T_SCALES[f"T{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return TScales(*out)
def vhse_scales(self) -> VHSEScales:
......@@ -2024,9 +2068,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.VHSE)):
# build a look-up table
scale = tables.VHSE[f"VHSE{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return VHSEScales(*out)
def z_scales(self) -> ZScales:
......@@ -2069,9 +2113,9 @@ class Peptide(typing.Sequence[str]):
for i in range(len(tables.Z_SCALES)):
# build a look-up table
scale = tables.Z_SCALES[f"Z{i+1}"]
lut = array([scale.get(aa, 0.0) for aa in self._CODE1])
lut = [scale.get(aa, 0.0) for aa in self._CODE1]
# average the weight of each amino acid
out.append(lut.take(self.encoded).sum() / len(self))
out.append(_takesum(lut, self.encoded) / len(self))
return ZScales(*out)
__DESCRIPTORS = {
......@@ -2088,3 +2132,16 @@ class Peptide(typing.Sequence[str]):
"VHSE": vhse_scales,
"Z": z_scales,
}
def _takesum(table, indices):
"""Return the sum of values from ``table`` indexed by ``indices``.
With NumPy, equivalent to ``numpy.sum(numpy.take(table, indices))``.
"""
if numpy is None:
return sum(map(table.__getitem__, indices))
else:
return numpy.sum(numpy.take(table, indices))
"""Compatibility layer to use `array.array` when `numpy` is not available.
"""
import array as _array
import builtins
import functools
import operator
def array(values, dtype="f"):
_values = _array.array(dtype, values)
return lazyarray(_values, dtype=dtype, length=len(_values))
class lazyarray(object):
def __init__(self, values, dtype, length):
self.__values = values
self.dtype = dtype
self.length = length
def __iter__(self):
return iter(self.__values)
def __len__(self):
return self.length
def __getitem__(self, i):
if isinstance(i, slice):
s = self.__values.__getitem__(i)
return lazyarray(s, self.dtype, len(s))
return self.__values.__getitem__(i)
def __setitem__(self, i, val):
self.__values.__setitem__(i, val)
def __add__(self, other):
if isinstance(other, (int, float)):
return lazyarray((x + other for x in self), self.dtype, self.length)
# return lazyarray((x + other for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x1 + x2 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __radd__(self, other):
if isinstance(other, (int, float)):
return lazyarray((other + x for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x2 + x1 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __sub__(self, other):
if isinstance(other, (int, float)):
return lazyarray((x - other for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x1 - x2 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __rsub__(self, other):
if isinstance(other, (int, float)):
return lazyarray((other - x for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x2 - x1 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __mul__(self, other):
if isinstance(other, (int, float)):
return lazyarray((x*other for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x1 * x2 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __rmul__(self, other):
if isinstance(other, (int, float)):
return lazyarray((other*x for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x2 * x1 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __truediv__(self, other):
if isinstance(other, (int, float)):
return lazyarray((x/other for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x1 / x2 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __rtruediv__(self, other):
if isinstance(other, (int, float)):
return lazyarray((other/x for x in self), self.dtype, self.length)
elif not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(
(x2 / x1 for x1, x2 in zip(self, other)),
self.dtype,
self.length
)
def __pow__(self, other):
if isinstance(other, (int, float)):
return lazyarray((x**other for x in self), self.dtype, self.length)
if not isinstance(other, lazyarray):
return NotImplemented
if len(self) != len(other):
raise ValueError("Cannot pairwise multiply arrays of different lengths")
return lazyarray(