Commits (18)
......@@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
[Unreleased]: https://git.embl.de/grp-zeller/GECCO/compare/v0.4.4...master
## [v0.4.5] - 2020-11-23
[v0.4.5]: https://git.embl.de/grp-zeller/GECCO/compare/v0.4.4...v0.4.5
### Added
- Additional `fold` column to cross-validation table output.
### Changed
- Use sequence ID instead of protein ID to extract type from cluster in `gecco cv`.
- Install HMM data in pre-pressed format to make `hmmsearch` runs faster on short sequences.
- `gecco.orf` was rewritten to extract genes from input sequences in parallel.
## [v0.4.4] - 2020-09-30
[v0.4.4]: https://git.embl.de/grp-zeller/GECCO/compare/v0.4.3...v0.4.4
### Added
......
......@@ -2,6 +2,7 @@ include README.md
include LICENSE
include static/gecco.png
include gecco/_version.txt
include gecco/py.typed
recursive-include gecco/crf *.pkl *.pkl.md5
recursive-include gecco/hmmer *.ini
......
......@@ -7,6 +7,5 @@ set -e
python setup.py bdist_wheel
WHEEL=$(python setup.py --name)-$(python setup.py --version)-py2.py3-none-any.whl
pip install -U "dist/$WHEEL[train]"
python -m pip install --find-links=dist gecco[train]
python -m coverage run -p -m unittest discover -vv
"""Implementation of the ``gecco cv`` subcommand.
"""
import copy
import functools
import itertools
import os
......@@ -14,7 +15,7 @@ import tqdm
import sklearn.model_selection
from ._base import Command
from ...model import ClusterTable, FeatureTable
from ...model import ClusterTable, FeatureTable, ProductType
from ...crf import ClusterCRF
from ...crf.cv import LeaveOneGroupOut
......@@ -112,8 +113,15 @@ class Cv(Command): # noqa: D101
self.logger.info("Performing cross-validation")
for i, (train_indices, test_indices) in enumerate(tqdm.tqdm(splits)):
# extract train data
train_data = [gene for i in train_indices for gene in seqs[i]]
test_data = [gene for i in test_indices for gene in seqs[i]]
# extract test data and erase existing probabilities
test_data = [copy.deepcopy(gene) for i in test_indices for gene in seqs[i]]
for gene in test_data:
gene.protein.domains = [d.with_probability(None) for d in gene.protein.domains]
# fit and predict the CRF for the current fold
crf = ClusterCRF(
self.args["--feature-type"],
algorithm="lbfgs",
......@@ -125,7 +133,8 @@ class Cv(Command): # noqa: D101
new_genes = crf.predict_probabilities(test_data, jobs=self.args["--jobs"])
with open(self.args["--output"], "a" if i else "w") as out:
FeatureTable.from_genes(new_genes).dump(out, header=i==0)
frame = FeatureTable.from_genes(new_genes).to_dataframe()
frame.assign(fold=i).to_csv(out, header=i==0, sep="\t", index=False)
return 0
......@@ -136,8 +145,7 @@ class Cv(Command): # noqa: D101
self.logger.info("Converting data to genes")
gene_count = len(set(table.protein_id))
genes = list(tqdm.tqdm(table.to_genes(), total=gene_count))
del table
genes = list(tqdm.tqdm(table.to_genes(), total=gene_count, leave=False))
self.logger.info("Sorting genes by location")
genes.sort(key=operator.attrgetter("source.id", "start", "end"))
......@@ -158,11 +166,17 @@ class Cv(Command): # noqa: D101
self.logger.info("Loading the clusters table")
with open(self.args["--clusters"]) as in_:
table = ClusterTable.load(in_)
index = { protein: row.type for row in table for protein in row.proteins }
index = { row.sequence_id: row.type for row in table }
if len(index) != len(table):
raise ValueError("Training data contains several clusters per sequence")
groups = []
for cluster in seqs:
ty = next(index[seq.id] for seq in cluster if seq.id in index)
ty = next((index[g.source.id] for g in cluster if g.source.id in index), None)
if ty is None:
seq_id = next(gene.source.id for gene in cluster)
self.logger.warning("Could not find type of cluster in {!r}", seq_id)
ty = ProductType.Unknown
groups.append(ty.unpack())
return list(LeaveOneGroupOut().split(seqs, groups=groups))
......
......@@ -111,11 +111,9 @@ class Run(Command): # noqa: D101
sequences = SeqIO.parse(genome, guess_sequences_format(genome))
self.logger.debug("Extracting genes from input sequences")
orf_finder = PyrodigalFinder(metagenome=True)
genes = list(
itertools.chain.from_iterable(map(orf_finder.find_genes, sequences))
)
orf_finder = PyrodigalFinder(metagenome=True, cpus=self.args["--jobs"])
genes = list(orf_finder.find_genes(sequences))
if genes:
self.logger.info("Found {} potential genes", len(genes))
else:
......
*.hmm
*.hmm.h3*
*.hmm.gz
*.xml.gz
......@@ -117,23 +117,23 @@ class HMMER(BinaryRunner):
self.hmm = hmm
self.cpus = cpus
def run(self, genes: Sequence[Gene]) -> Sequence[Gene]:
def run(self, genes: Iterable[Gene]) -> List[Gene]:
"""Run HMMER on proteins of ``genes`` and update them with domains.
Arguments:
genes (sequence of `~gecco.model.Gene`): A sequence of genes to
annotate with ``self.hmm``.
genes (iterable of `~gecco.model.Gene`): An iterable that yield
genes to annotate with ``self.hmm``.
"""
# collect genes and build an index of genes by protein id
gene_index = {gene.id: gene for gene in genes}
gene_index = collections.OrderedDict([(gene.id, gene) for gene in genes])
# create a temporary file to write the input and output to
seqs_tmp = tempfile.NamedTemporaryFile(prefix="hmmer", suffix=".faa")
doms_tmp = tempfile.NamedTemporaryFile(prefix="hmmer", suffix=".dom", mode="rt")
# write protein sequences
protein_records = (g.protein.to_seq_record() for g in genes)
protein_records = (g.protein.to_seq_record() for g in gene_index.values())
SeqIO.write(protein_records, seqs_tmp.name, "fasta")
# Prepare the command line arguments
......@@ -156,22 +156,22 @@ class HMMER(BinaryRunner):
for row in rows:
# extract domain from the domain table row
accession = self.hmm.relabel(row.query_accession or row.query_name)
entry = interpro.by_accession[accession]
entry = interpro.by_accession.get(accession)
# add additional qualifiers with available metadata
qualifiers: Dict[str, List[str]] = {
"inference": ["protein motif"],
"note": ["e-value: {}".format(row.i_evalue)],
"db_xref": ["{}:{}".format(self.hmm.id.upper(), accession)],
"function": [entry.name]
"function": [] if entry is None else [entry.name]
}
if entry.integrated is not None:
if entry is not None and entry.integrated is not None:
qualifiers["db_xref"].append("InterPro:{}".format(entry.integrated))
# add the domain to the protein domains of the right gene
domain = Domain(accession, row.env_from, row.env_to, self.hmm.id, row.i_evalue, None, qualifiers)
gene_index[row.target_name].protein.domains.append(domain)
# return the updated list of genes that was given in argument
return genes
return list(gene_index.values())
def embedded_hmms() -> Iterator[HMM]:
......@@ -180,4 +180,4 @@ def embedded_hmms() -> Iterator[HMM]:
for ini in glob.glob(pkg_resources.resource_filename(__name__, "*.ini")):
cfg = configparser.ConfigParser()
cfg.read(ini)
yield HMM(path=ini.replace(".ini", ".hmm.gz"), **dict(cfg.items("hmm")))
yield HMM(path=ini.replace(".ini", ".hmm"), **dict(cfg.items("hmm")))
......@@ -21,6 +21,7 @@ from Bio.SeqRecord import SeqRecord
from . import __version__
from ._base import Dumpable
from ._meta import requires
# fmt: off
......@@ -447,6 +448,19 @@ class FeatureTable(Dumpable, Sized):
gene.protein.domains.append(domain)
yield gene
@requires("pandas")
def to_dataframe(self) -> "pandas.DataFrame":
"""Convert the feature table to a `~pandas.DataFrame`.
Raises:
ImportError: if the `pandas` module could not be imported.
"""
frame = pandas.DataFrame() # type: ignore
for column in self.__annotations__:
frame[column] = getattr(self, column)
return frame
def __bool__(self) -> bool: # noqa: D105
return len(self) != 0
......
......@@ -3,11 +3,13 @@
import abc
import io
import multiprocessing
import os
import subprocess
import queue
import threading
import tempfile
import typing
from typing import Iterable, Iterator, List, Optional
from typing import Callable, Iterable, Iterator, List, Optional
import Bio.Alphabet
import Bio.SeqIO
......@@ -27,7 +29,7 @@ class ORFFinder(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def find_genes(self, dna: SeqRecord) -> Iterable[Gene]: # type: ignore
def find_genes(self, records: Iterable[SeqRecord]) -> Iterable[Gene]: # type: ignore
"""Find all genes from a DNA sequence.
"""
return NotImplemented # type: ignore
......@@ -48,34 +50,121 @@ class PyrodigalFinder(ORFFinder):
"""
def __init__(self, metagenome: bool = True) -> None:
class _Worker(threading.Thread):
@staticmethod
def _none_callback(record, total):
pass
def __init__(
self,
metagenome: bool,
record_count: multiprocessing.Value,
record_queue: "queue.Queue[typing.Optional[SeqRecord]]",
genes_queue: "queue.Queue[Gene]",
callback: Optional[Callable[[SeqRecord, int], None]],
) -> None:
super().__init__()
self.pyrodigal = pyrodigal.Pyrodigal(meta=metagenome)
self.record_queue = record_queue
self.record_count = record_count
self.genes_queue = genes_queue
self.callback = callback or self._none_callback
def run(self) -> None:
while True:
record = self.record_queue.get()
if record is None:
self.record_queue.task_done()
return
orfs = self.pyrodigal.find_genes(str(record.seq))
for j, orf in enumerate(orfs):
# wrap the protein into a Protein object
prot_seq = Seq(orf.translate(), Bio.Alphabet.generic_protein)
protein = Protein(id=f"{record.id}_{j+1}", seq=prot_seq)
# wrap the gene into a Gene
self.genes_queue.put(Gene(
source=record,
start=min(orf.begin, orf.end),
end=max(orf.begin, orf.end),
strand=Strand.Coding if orf.strand == 1 else Strand.Reverse,
protein=protein,
qualifiers={
"inference": ["ab initio prediction:Prodigal:2.6"],
"transl_table": str(orf.translation_table),
}
))
self.record_queue.task_done()
self.callback(record, self.record_count.value)
def __init__(self, metagenome: bool = True, cpus: int = 0) -> None:
"""Create a new `PyrodigalFinder` instance.
Arguments:
metagenome (bool): Whether or not to run PRODIGAL in metagenome
mode, defaults to `True`.
cpus (int): The number of threads to use to run Pyrodigal in
parallel. Pass ``0`` to use the number of CPUs on the machine.
"""
super().__init__()
self.metagenome = metagenome
self.pyrodigal = pyrodigal.Pyrodigal(meta=metagenome)
def find_genes(self, dna: SeqRecord) -> Iterator[Gene]: # noqa: D102
# find all ORFs in the given DNA sequence
orfs = self.pyrodigal.find_genes(str(dna.seq))
for j, orf in enumerate(orfs):
# wrap the protein into a Protein object
prot_seq = Seq(orf.translate(), Bio.Alphabet.generic_protein)
protein = Protein(id=f"{dna.id}_{j+1}", seq=prot_seq)
# wrap the gene into a Gene
yield Gene(
source=dna,
start=min(orf.begin, orf.end),
end=max(orf.begin, orf.end),
strand=Strand.Coding if orf.strand == 1 else Strand.Reverse,
protein=protein,
qualifiers={
"inference": ["ab initio prediction:Prodigal:2.6"],
"transl_table": str(orf.translation_table),
}
self.cpus = cpus
def find_genes(
self,
records: Iterable[SeqRecord],
progress: Optional[Callable[[SeqRecord, int], None]] = None,
) -> Iterator[Gene]:
"""Find all genes contained in a sequence of DNA records.
Arguments:
records (iterable of `~Bio.SeqRecord.SeqRecord`): An iterable of
DNA records in which to find genes.
progress (callable, optional): A progress callback of signature
``progress(record, total)`` that will be called everytime a
record has been processed successfully, with ``record`` being
the `~Bio.SeqRecord.SeqRecord` instance, and ``total`` being
the total number of records to process.
"""
# count the number of CPUs to use
_cpus = self.cpus if self.cpus > 0 else multiprocessing.cpu_count()
# create the queue to pass the objects around
record_count = multiprocessing.Value('i')
record_queue = typing.cast("queue.Queue[typing.Optional[SeqRecord]]", queue.Queue())
genes_queue = typing.cast("queue.Queue[SeqRecord]", queue.Queue())
# create and launch one thread per CPU
threads = []
for _ in range(_cpus):
thread = self._Worker(
self.metagenome,
record_count,
record_queue,
genes_queue,
callback=progress,
)
thread.start()
threads.append(thread)
# queue the sequences passed as arguments
for record in records:
record_count.value += 1
record_queue.put(record)
# poison-pill the queue so that threads terminate when they
# have consumed all the sequences
for _ in threads:
record_queue.put(None)
# wait for all sequences to be processed
record_queue.join()
# return an iterable over the output
sentinel = object()
genes_queue.put(sentinel)
return iter(genes_queue.get, sentinel)
......@@ -21,6 +21,7 @@ classifiers =
Programming Language :: Python :: 3.8
Topic :: Scientific/Engineering :: Bio-Informatics
Topic :: Scientific/Engineering :: Medical Science Apps.
Typing :: Typed
[options]
zip_safe = false
......@@ -54,7 +55,7 @@ train =
include = gecco
[options.package_data]
* = _version.txt
gecco = _version.txt, py.typed
[options.entry_points]
gecco.cli.commands =
......@@ -67,7 +68,7 @@ console-scripts =
gecco = gecco.cli:main
[bdist_wheel]
universal = true
universal = false
[coverage:report]
include = gecco/*
......
......@@ -19,7 +19,7 @@ from functools import partial
from xml.etree import ElementTree as etree
import setuptools
from setuptools.command.build_py import build_py as _build_py
from setuptools.command.build_ext import build_ext as _build_ext
from setuptools.command.sdist import sdist as _sdist
from tqdm import tqdm
......@@ -110,25 +110,37 @@ class update_model(setuptools.Command):
return entries
class build_py(_build_py):
"""A hacked `build_py` command to download data before wheel creation.
class build_ext(_build_ext):
"""A hacked `build_ext` command to download data before wheel creation.
Using ``build_ext`` switches `setuptools` to build in non-universal mode,
and to generate platform-specific wheels. We do not have platform-specific
code in GECCO, but we have platform-specific data: binary HMM pressed with
``hmmpressed`` are CPU-architecture-specific, so we can only install them
on a x86-64 machine if they were pressed on a x86-64 machine.
"""
def run(self):
_build_py.run(self)
self.download_hmms()
def download_hmms(self):
for in_ in glob.glob(os.path.join("gecco", "hmmer", "*.ini")):
cfg = configparser.ConfigParser()
cfg.read(in_)
out = os.path.join(self.build_lib, in_.replace(".ini", ".hmm.gz"))
try:
self.make_file([in_], out, self.download_hmm, [out, dict(cfg.items("hmm"))])
except:
if os.path.exists(out):
os.remove(out)
raise
for ext in self.extensions:
in_ = ext.sources[0]
pressed = os.path.join(self.build_lib, in_).replace(".ini", ".hmm.h3i")
self.make_file([in_], pressed, self.download_and_press, (in_,))
def download_and_press(self, in_):
cfg = configparser.ConfigParser()
cfg.read(in_)
out = os.path.join(self.build_lib, in_.replace(".ini", ".hmm"))
try:
self.download_hmm(out, dict(cfg.items("hmm")))
except:
if os.path.exists(out):
os.remove(out)
raise
self.spawn(["hmmpress", out])
os.remove(out)
def download_hmm(self, output, options):
base = "https://github.com/althonos/GECCO/releases/download/v{version}/{id}.hmm.gz"
......@@ -148,10 +160,14 @@ class build_py(_build_py):
)
with tqdm.wrapattr(response, "read", **format) as src:
with open(output, "wb") as dst:
shutil.copyfileobj(src, dst)
shutil.copyfileobj(gzip.open(src), dst)
if __name__ == "__main__":
setuptools.setup(
cmdclass={"build_py": build_py, "sdist": sdist, "update_model": update_model,},
cmdclass={"build_ext": build_ext, "sdist": sdist, "update_model": update_model,},
ext_modules=[
setuptools.Extension("Pfam", [os.path.join("gecco", "hmmer", "Pfam.ini")]),
setuptools.Extension("Tigrfam", [os.path.join("gecco", "hmmer", "Tigrfam.ini")]),
]
)
......@@ -3,6 +3,7 @@
import os
import unittest
from unittest import mock
import Bio.SeqIO
from gecco.model import Strand
......@@ -24,8 +25,8 @@ class TestPyrodigalFinder(unittest.TestCase):
def test_order(self):
"""Test proteins are emitted in the right order from the source file.
"""
finder = PyrodigalFinder()
for expected, actual in zip(self.proteins, finder.find_genes(self.genome)):
finder = PyrodigalFinder(cpus=1)
for expected, actual in zip(self.proteins, finder.find_genes([self.genome])):
self.assertEqual(expected.id, actual.protein.id)
def test_sequence_letters(self):
......@@ -33,15 +34,22 @@ class TestPyrodigalFinder(unittest.TestCase):
"""
by_id = { expected.id: expected for expected in self.proteins }
finder = PyrodigalFinder()
for gene in finder.find_genes(self.genome):
for gene in finder.find_genes([self.genome]):
self.assertEqual( gene.protein.seq, by_id[gene.id].seq )
def test_sequence_coordinates(self):
"""Test emitted genes have the protein sequence matching their coordinates.
"""
finder = PyrodigalFinder()
for gene in finder.find_genes(self.genome):
for gene in finder.find_genes([self.genome]):
subseq = self.genome.seq[gene.start-1:gene.end]
if gene.strand is Strand.Reverse:
subseq = subseq.reverse_complement()
self.assertEqual(subseq.translate(), gene.protein.seq)
def test_progress_callback(self):
progress = mock.MagicMock()
finder = PyrodigalFinder(cpus=1)
genes = list(finder.find_genes([self.genome], progress=progress))
progress.assert_called_with(self.genome, 1)