Commits (21)
name: Changelog
on:
push:
tags:
- v*.*.*
jobs:
chandler:
environment: GitHub Releases
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Set up Ruby 2.7
uses: actions/setup-ruby@v1
with:
ruby-version: 2.7
- name: Install chandler gem
run: gem install chandler
- name: Update releases messages
run: chandler push --github=${{ github.repository }} --changelog="CHANGELOG.md"
env:
CHANDLER_GITHUB_API_TOKEN: ${{ secrets.CHANDLER_GITHUB_API_TOKEN }}
name: Test
on:
- push
- pull_request
jobs:
test_linux:
name: Test (Linux)
runs-on: ubuntu-latest
env:
OS: Linux
strategy:
matrix:
include:
- python-version: 3.6
python-release: v3.6
python-impl: CPython
- python-version: 3.7
python-release: v3.7
python-impl: CPython
- python-version: 3.8
python-release: v3.8
python-impl: CPython
- python-version: 3.9
python-release: v3.9
python-impl: CPython
steps:
- name: Checkout code
uses: actions/checkout@v2
with:
submodules: true
- name: Cache Python requirements
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.python-version }}
restore-keys: |
${{ runner.os }}-pip-
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Update CI dependencies
run: python -m pip install -U pip coverage wheel
- name: List project dependencies
run: python setup.py list_requirements
- name: Install project dependencies
run: python -m pip install -r requirements.txt
- name: Build required data files
run: python setup.py egg_info build_data --inplace
- name: Test with coverage
run: python -m coverage run -p -m unittest discover -vv
- name: Combine coverage reports
run: python -m coverage combine
- name: Upload statistics to Codecov
uses: codecov/codecov-action@v1
with:
flags: ${{ matrix.python-impl }},${{ matrix.python-release }},${{ env.OS }}
name: test-python-${{ matrix.python-version }}
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
test_osx:
name: Test (OSX)
runs-on: macos-latest
env:
OS: OSX
strategy:
matrix:
include:
# - python-version: 3.6
# python-release: v3.6
# python-impl: CPython
- python-version: 3.7
python-release: v3.7
python-impl: CPython
- python-version: 3.8
python-release: v3.8
python-impl: CPython
- python-version: 3.9
python-release: v3.9
python-impl: CPython
steps:
- name: Checkout code
uses: actions/checkout@v2
with:
submodules: true
- name: Cache Python requirements
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.python-version }}
restore-keys: |
${{ runner.os }}-pip-
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Update CI dependencies
run: python -m pip install -U pip coverage wheel
- name: List project dependencies
run: python setup.py list_requirements
- name: Install project dependencies
run: python -m pip install -r requirements.txt
- name: Build required data files
run: python setup.py egg_info build_data --inplace
- name: Test with coverage
run: python -m coverage run -p -m unittest discover -vv
- name: Combine coverage reports
run: python -m coverage combine
- name: Upload statistics to Codecov
uses: codecov/codecov-action@v1
with:
flags: ${{ matrix.python-impl }},${{ matrix.python-release }},${{ env.OS }}
name: test-python-${{ matrix.python-version }}
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
......@@ -34,6 +34,7 @@ share/python-wheels/
.installed.cfg
*.egg
MANIFEST
requirements.txt
# PyInstaller
# Usually these files are written by a python script from a template
......
......@@ -5,7 +5,19 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
[Unreleased]: https://git.embl.de/grp-zeller/GECCO/compare/v0.6.0...master
[Unreleased]: https://git.embl.de/grp-zeller/GECCO/compare/v0.6.1...master
## [v0.6.1] - 2021-03-15
[v0.6.1]: https://git.embl.de/grp-zeller/GECCO/compare/v0.6.0...v0.6.1
### Fixed
- Progress bar not being disabled by `-q` flag in CLI.
- Fallback to using HMM name if accession is not available in `PyHMMER`.
- Group genes by source contig and process them separately in `PyHMMER` to avoid bogus E-values.
### Added
- `psutil` dependency to get the number of physical CPU cores on the host machine.
- Support for using an arbitrary mapping of positives to negatives in `gecco embed`.
### Removed
- Unused and outdated `HMMER` and `DomainRow` classes from `gecco.hmmer`.
## [v0.6.0] - 2021-02-28
[v0.6.0]: https://git.embl.de/grp-zeller/GECCO/compare/v0.5.5...v0.6.0
......@@ -15,7 +27,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Updated internal InterPro catalog.
### Fixed
- Features not being grouped together in `gecco cv` and `gecco train`
when provided with a feature table where rows were not sorted by
when provided with a feature table where rows were not sorted by
protein IDs.
## [v0.5.5] - 2021-02-28
......
......@@ -44,7 +44,7 @@ directly with ``pip``:
.. code:: console
$ pip install https://github.com/zellerlab/GECCO
$ pip install https://github.com/zellerlab/GECCO/archive/master.zip
Keep in mind this will install the development version of the library, so not
......
......@@ -9,52 +9,6 @@ from subprocess import DEVNULL
from ._meta import classproperty
class MissingBinaryError(RuntimeError):
"""A runtime error when a required program could not be found.
"""
def __init__(self, name: str, params: Iterable[str]) -> None:
self.params = list(params)
self.name = name
super().__init__(name, self.params)
def __str__(self) -> str:
return f"could not locate binary: {self.name}"
class BinaryRunner(metaclass=abc.ABCMeta):
"""A base class that wraps a binary program.
"""
BINARY: typing.ClassVar[str] = NotImplemented
HELP: typing.ClassVar[str] = "-h"
@classmethod
def has_binary(
cls: Type["BinaryRunner"], args: Optional[Iterable[str]] = None
) -> bool:
try:
cls.check_binary(args)
return True
except MissingBinaryError:
return False
@classmethod
def check_binary(
cls: Type["BinaryRunner"], args: Optional[Iterable[str]] = None
) -> None:
try:
_args = [cls.HELP] if args is None else list(args)
subprocess.run([cls.BINARY, *_args], stdout=DEVNULL, stderr=DEVNULL)
except OSError as err:
if err.errno == errno.ENOENT:
raise MissingBinaryError(cls.BINARY, _args) from err
raise
def __init__(self) -> None:
self.check_binary()
class Dumpable(metaclass=abc.ABCMeta):
"""A metaclass for objects that can be dumped to a text file.
"""
......
......@@ -104,8 +104,9 @@ class Command(metaclass=abc.ABCMeta):
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.TimeElapsedColumn(),
rich.progress.TimeRemainingColumn(),
console=rich.console.Console(file=self.stream),
console=rich.console.Console(file=self.stream, soft_wrap=True),
disable=self.quiet > 0,
transient=True,
)
self.console = self.progress.console
......
......@@ -136,6 +136,7 @@ class Main(Command):
)
subcmd.verbose = self.verbose
subcmd.quiet = self.quiet
subcmd.progress.disable = self.quiet > 0
# run the subcommand
return subcmd.execute(ctx)
except CommandExit as sysexit:
......
......@@ -16,6 +16,7 @@ import signal
from typing import Any, Dict, Union, Optional, List, TextIO, Mapping
import numpy
import pyhmmer
import rich.emoji
import rich.progress
from Bio import SeqIO
......@@ -86,11 +87,14 @@ class Annotate(Command): # noqa: D101
if base.endswith(".gz"):
base, _ = os.path.splitext(base)
base, _ = os.path.splitext(base)
with pyhmmer.plan7.HMMFile(path) as hmm_file:
size = sum(1 for _ in hmm_file)
yield HMM(
id=base,
version="?",
url="?",
path=path,
size=size,
relabel_with=r"s/([^\.]*)(\..*)?/\1/"
)
......@@ -138,8 +142,8 @@ class Annotate(Command): # noqa: D101
hmms = list(self._custom_hmms() if self.hmm else embedded_hmms())
task = self.progress.add_task(description=f"HMM annotation", unit="HMMs", total=len(hmms))
for hmm in self.progress.track(hmms, task_id=task):
task = self.progress.add_task(description=f"{hmm.id} v{hmm.version}", total=1, unit="domains")
callback = lambda n, total: self.progress.update(task, advance=1, total=total)
task = self.progress.add_task(description=f"{hmm.id} v{hmm.version}", total=hmm.size, unit="domains")
callback = lambda h, t: self.progress.update(task, advance=1)
self.info("Starting", f"annotation with [bold blue]{hmm.id} v{hmm.version}[/]", level=2)
features = PyHMMER(hmm, self.jobs).run(genes, progress=callback)
self.success("Finished", f"annotation with [bold blue]{hmm.id} v{hmm.version}[/]", level=2)
......@@ -201,5 +205,8 @@ class Annotate(Command): # noqa: D101
except KeyboardInterrupt:
self.error("Interrupted")
return -signal.SIGINT
except Exception as err:
self.progress.stop()
raise
else:
return 0
......@@ -232,5 +232,8 @@ class Cv(Command): # noqa: D101
except KeyboardInterrupt:
self.error("Interrupted")
return -signal.SIGINT
except Exception as err:
self.progress.stop()
raise
else:
return 0
......@@ -19,7 +19,6 @@ import pandas
from ._base import Command, InvalidArgument, CommandExit
from .._utils import numpy_error_context, in_context, patch_showwarnings
from ...hmmer import HMMER
class Embed(Command): # noqa: D101
......@@ -41,6 +40,10 @@ class Embed(Command): # noqa: D101
containing non-BGC training instances.
Parameters:
-M <list>, --mapping <list> an arbitrary list of which BGC
should go into which contig. Ignores
``--min-size`` and ``--skip`` when
provided.
-o <out>, --output <out> the prefix used for the output files
(which will be ``<prefix>.features.tsv``
and ``<prefix>.clusters.tsv``).
......@@ -64,9 +67,12 @@ class Embed(Command): # noqa: D101
self.min_size = self._check_flag("--min-size", int, lambda x: x >= 0, "positive or null integer")
self.e_filter = self._check_flag("--e-filter", float, lambda x: 0 <= x <= 1, hint="real number between 0 and 1")
self.jobs = self._check_flag("--jobs", int, lambda x: x >= 0, hint="positive or null integer")
self.mapping = self.args["--mapping"]
self.bgc = self.args["--bgc"]
self.no_bgc = self.args["--no-bgc"]
self.output = self.args["--output"]
if self.mapping is not None:
self.min_size = 0
except InvalidArgument:
raise CommandExit(1)
......@@ -91,7 +97,7 @@ class Embed(Command): # noqa: D101
return [
s
for _, s in no_bgc_df.groupby("sequence_id", sort=True)
if s.shape[0] > self.min_size
if len(s.protein_id.unique()) > self.min_size
]
def _read_bgc(self):
......@@ -108,6 +114,12 @@ class Embed(Command): # noqa: D101
bgc_df.sort_values(by=["sequence_id", "start", "domain_start"], inplace=True)
return [s for _, s in bgc_df.groupby("sequence_id", sort=True)]
def _read_mapping(self):
if self.mapping is not None:
mapping = pandas.read_table(self.mapping)
return { t.bgc_id:t.sequence_id for t in mapping.itertuples() }
return None
def _check_count(self, no_bgc_list, bgc_list):
no_bgc_count, bgc_count = len(no_bgc_list) - self.skip, len(bgc_list)
if no_bgc_count < bgc_count:
......@@ -147,14 +159,19 @@ class Embed(Command): # noqa: D101
self.success("Finished", "embedding", repr(bgc_id), "into", repr(sequence_id), level=2)
return embed
def _make_embeddings(self, no_bgc_list, bgc_list):
def _make_embeddings(self, no_bgc_list, bgc_list, mapping):
self.info("Embedding", len(bgc_list), "BGCs into", len(no_bgc_list), "contigs")
_jobs = os.cpu_count() if not self.jobs else self.jobs
unit = "BGC" if len(bgc_list) == 1 else "BGCs"
task = self.progress.add_task("Embedding", unit=unit, total=len(bgc_list))
it = zip(itertools.islice(no_bgc_list, self.skip, None), bgc_list)
if mapping is None:
it = zip(itertools.islice(no_bgc_list, self.skip, None), bgc_list)
else:
no_bgc_index = {x.sequence_id.values[0]:x for x in no_bgc_list}
it = [(no_bgc_index[mapping[ bgc.sequence_id.values[0] ]], bgc) for bgc in bgc_list]
embeddings = pandas.concat([
self._embed(*args)
for args in self.progress.track(it, task_id=task, total=len(bgc_list))
......@@ -217,9 +234,11 @@ class Embed(Command): # noqa: D101
# load inputs
no_bgc_list = self._read_no_bgc()
bgc_list = self._read_bgc()
self._check_count(no_bgc_list, bgc_list)
mapping = self._read_mapping()
if mapping is None:
self._check_count(no_bgc_list, bgc_list)
# make embeddings
embeddings = self._make_embeddings(no_bgc_list, bgc_list)
embeddings = self._make_embeddings(no_bgc_list, bgc_list, mapping)
# write outputs
self._write_features(embeddings)
self._write_clusters(embeddings)
......@@ -228,5 +247,8 @@ class Embed(Command): # noqa: D101
except KeyboardInterrupt:
self.error("Interrupted")
return -signal.SIGINT
except Exception as err:
self.progress.stop()
raise
else:
return 0
......@@ -58,7 +58,8 @@ class Help(Command): # noqa: D101
# Render the help message
doc = Main.doc() if subcmd_cls is None else subcmd_cls.doc()
text = rich.text.Text(textwrap.dedent(doc).lstrip())
rich.print(text, file=self._stream)
console = rich.console.Console(file=self._stream, soft_wrap=True)
console.print(text)
except CommandExit as cexit:
return cexit.code
else:
......
......@@ -231,5 +231,8 @@ class Run(Annotate): # noqa: D101
except KeyboardInterrupt:
self.error("Interrupted")
return -signal.SIGINT
except Exception as err:
self.progress.stop()
raise
else:
return 0
......@@ -27,7 +27,6 @@ from .._utils import in_context, patch_showwarnings
from ...model import FeatureTable, ClusterTable, Cluster
from ...crf import ClusterCRF
from ...refine import ClusterRefiner
from ...hmmer import HMMER
class Train(Command): # noqa: D101
......@@ -284,5 +283,8 @@ class Train(Command): # noqa: D101
except KeyboardInterrupt:
self.error("Interrupted")
return -signal.SIGINT
except Exception as err:
self.progress.stop()
raise
else:
return 0
[hmm]
id = Pfam
version = 33.1
size = 2873
url = ftp://ftp.ebi.ac.uk/pub/databases/Pfam/releases/Pfam33.1/Pfam-A.hmm.gz
relabel_with = s/(PF\d+).\d+/\1/
md5 = 7795fabbbbc68b9c93aa14dbec371894
......
......@@ -3,4 +3,5 @@ id = Tigrfam
version = 15.0
url = http://ftp.ncbi.nlm.nih.gov/hmm/TIGRFAMs/release_15.0/TIGRFAMs_15.0_HMM.LIB.gz
md5 = 050d50759f60d102466c319fe6da69e6
size = 2382
......@@ -7,6 +7,7 @@ import contextlib
import csv
import errno
import glob
import itertools
import os
import re
import subprocess
......@@ -18,11 +19,12 @@ import pkg_resources
import pyhmmer
from Bio import SeqIO
from ._worker import hmmsearch
from .._meta import requires
from .._base import BinaryRunner
from ..model import Gene, Domain
from ..interpro import InterPro
if typing.TYPE_CHECKING:
from Bio.SeqRecord import SeqRecord
......@@ -32,55 +34,6 @@ if typing.TYPE_CHECKING:
__all__ = ["DomainRow", "HMM", "HMMER", "PyHMMER", "embedded_hmms"]
class DomainRow(typing.NamedTuple):
"""A single row in a domain table created by ``hmmsearch``.
See Also:
The description of each field in page 48 of the `HMMER manual
<http://eddylab.org/software/hmmer3/3.1b2/Userguide.pdf>`_.
"""
target_name: str
target_accession: Optional[str]
target_length: int
query_name: str
query_accession: Optional[str]
query_length: int
evalue: float
score: float
bias: float
domain_number: int
domain_total: int
c_evalue: float
i_evalue: float
domain_score: float
domain_bias: float
hmm_from: int
hmm_to: int
ali_from: int
ali_to: int
env_from: int
env_to: int
post: float
description: Optional[str]
@classmethod
def from_line(cls: Type["_T"], row: str) -> "_T":
"""Extract a `DomainRow` from a single domain table line.
"""
line = list(filter(None, row.split(" ")))
values = {}
for i, (field, ty) in enumerate(cls.__annotations__.items()):
if isinstance(ty, type):
values[field] = ty(line[i])
elif field == "target_accession" or field == "query_accession":
values[field] = None if line[i] == "-" else line[i]
elif field == "description":
values[field] = " ".join(line[i:]) if line[i:] else None
return cls(**values)
class HMM(typing.NamedTuple):
"""A Hidden Markov Model library to use with `~gecco.hmmer.HMMER`.
"""
......@@ -89,6 +42,7 @@ class HMM(typing.NamedTuple):
version: str
url: str
path: str
size: int
relabel_with: Optional[str] = None
md5: Optional[str] = None
......@@ -134,68 +88,8 @@ class DomainAnnotator(metaclass=abc.ABCMeta):
return NotImplemented
class HMMER(DomainAnnotator):
"""A wrapper for HMMER that uses the ``hmmsearch`` binary.
"""
BINARY = "hmmsearch"
def __init__(self, hmm: HMM, cpus: Optional[int] = None) -> None:
DomainAnnotator.__init__(self, hmm, cpus)
BinaryRunner.__init__(self)
def run(self, genes: Iterable[Gene]) -> List[Gene]:
# collect genes and build an index of genes by protein id
gene_index = collections.OrderedDict([(gene.id, gene) for gene in genes])
# create a temporary file to write the input and output to
seqs_tmp = tempfile.NamedTemporaryFile(prefix="hmmer", suffix=".faa")
doms_tmp = tempfile.NamedTemporaryFile(prefix="hmmer", suffix=".dom", mode="rt")
# write protein sequences
protein_records = (g.protein.to_seq_record() for g in gene_index.values())
SeqIO.write(protein_records, seqs_tmp.name, "fasta")
# Prepare the command line arguments
cmd = ["hmmsearch", "--noali", "--domtblout", doms_tmp.name]
if self.cpus is not None:
cmd.extend(["--cpu", str(self.cpus)])
cmd.extend([self.hmm.path, seqs_tmp.name])
# Run HMMER
subprocess.run(cmd, stdout=subprocess.DEVNULL).check_returncode()
# Load InterPro metadata for the annotation
interpro = InterPro.load()
# Read the domain table
lines = filter(lambda line: not line.startswith("#"), doms_tmp)
rows = map(DomainRow.from_line, lines)
# update protein domains
for row in rows:
# extract domain from the domain table row
accession = self.hmm.relabel(row.query_accession or row.query_name)
entry = interpro.by_accession.get(accession)
# add additional qualifiers with available metadata
qualifiers: Dict[str, List[str]] = {
"inference": ["protein motif"],
"note": ["e-value: {}".format(row.i_evalue)],
"db_xref": ["{}:{}".format(self.hmm.id.upper(), accession)],
"function": [] if entry is None else [entry.name]
}
if entry is not None and entry.integrated is not None:
qualifiers["db_xref"].append("InterPro:{}".format(entry.integrated))
# add the domain to the protein domains of the right gene
domain = Domain(accession, row.env_from, row.env_to, self.hmm.id, row.i_evalue, None, qualifiers)
gene_index[row.target_name].protein.domains.append(domain)
# return the updated list of genes that was given in argument
return list(gene_index.values())
class PyHMMER(DomainAnnotator):
"""A domain annotator that uses `pyhmmer.hmmer.hmmsearch`.
"""A domain annotator that uses `pyhmmer`.
"""
def run(
......@@ -206,47 +100,54 @@ class PyHMMER(DomainAnnotator):
# collect genes and build an index of genes by protein id
gene_index = collections.OrderedDict([(gene.id, gene) for gene in genes])
# convert to Easel sequences
# group genes by contig
by_source = collections.defaultdict(list)
for gene in gene_index.values():
by_source[gene.source.id].append(gene)
# convert to Easel sequences, grouped by contig
esl_abc = pyhmmer.easel.Alphabet.amino()
esl_sqs = [
pyhmmer.easel.TextSequence(
name=gene.protein.id.encode(),
sequence=str(gene.protein.seq)
).digitize(esl_abc)
for gene in gene_index.values()
[
pyhmmer.easel.TextSequence(
name=gene.protein.id.encode(),
sequence=str(gene.protein.seq)
).digitize(esl_abc)
for gene in contig_genes
]
for contig_genes in by_source.values()
]
# Run HMMER subprocess.run(cmd, stdout=subprocess.DEVNULL).check_returncode()
with pyhmmer.plan7.HMMFile(self.hmm.path) as hmm_file:
cpus = 0 if self.cpus is None else self.cpus
hmms_hits = pyhmmer.hmmsearch(hmm_file, esl_sqs, cpus=cpus, callback=progress)
hmms_hits = hmmsearch(hmm_file, esl_sqs, cpus=cpus, callback=progress)
# Load InterPro metadata for the annotation
interpro = InterPro.load()
# Transcribe HMMER hits to GECCO model
for hits in hmms_hits:
for hit in hits:
target_name = hit.name.decode('utf-8')
for domain in hit.domains:
raw_acc = domain.alignment.hmm_accession
accession = self.hmm.relabel(raw_acc.decode('utf-8'))
entry = interpro.by_accession.get(accession)
# extract qualifiers
qualifiers: Dict[str, List[str]] = {
"inference": ["protein motif"],
"note": ["e-value: {}".format(domain.i_evalue)],
"db_xref": ["{}:{}".format(self.hmm.id.upper(), accession)],
"function": [] if entry is None else [entry.name]
}
if entry is not None and entry.integrated is not None:
qualifiers["db_xref"].append("InterPro:{}".format(entry.integrated))
# add the domain to the protein domains of the right gene
assert domain.env_from < domain.env_to
domain = Domain(accession, domain.env_from, domain.env_to, self.hmm.id, domain.i_evalue, None, qualifiers)
gene_index[target_name].protein.domains.append(domain)
for hit in itertools.chain.from_iterable(hmms_hits):
target_name = hit.name.decode('utf-8')
for domain in hit.domains:
raw_acc = domain.alignment.hmm_accession or domain.alignment.hmm_name
accession = self.hmm.relabel(raw_acc.decode('utf-8'))
entry = interpro.by_accession.get(accession)
# extract qualifiers
qualifiers: Dict[str, List[str]] = {
"inference": ["protein motif"],
"note": ["e-value: {}".format(domain.i_evalue)],
"db_xref": ["{}:{}".format(self.hmm.id.upper(), accession)],
"function": [] if entry is None else [entry.name]
}
if entry is not None and entry.integrated is not None:
qualifiers["db_xref"].append("InterPro:{}".format(entry.integrated))
# add the domain to the protein domains of the right gene
assert domain.env_from < domain.env_to
domain = Domain(accession, domain.env_from, domain.env_to, self.hmm.id, domain.i_evalue, None, qualifiers)
gene_index[target_name].protein.domains.append(domain)
# return the updated list of genes that was given in argument
return list(gene_index.values())
......@@ -258,4 +159,8 @@ def embedded_hmms() -> Iterator[HMM]:
for ini in glob.glob(pkg_resources.resource_filename(__name__, "*.ini")):
cfg = configparser.ConfigParser()
cfg.read(ini)
yield HMM(path=ini.replace(".ini", ".h3m"), **dict(cfg.items("hmm")))
args = dict(cfg.items("hmm"))
args["size"] = int(args["size"])
yield HMM(path=ini.replace(".ini", ".h3m"), **args)
import ctypes
import multiprocessing
import queue
import threading
import typing
import psutil
from pyhmmer.easel import Alphabet, DigitalSequence
from pyhmmer.plan7 import Pipeline, TopHits, HMM
class _HMMPipelineThread(threading.Thread):
@staticmethod
def _none_callback(hmm: HMM, total: int) -> None:
pass
def __init__(
self,
contigs: typing.Iterable[typing.Collection[DigitalSequence]],
query_queue: "queue.Queue[typing.Optional[typing.Tuple[int, HMM]]]",
query_count: multiprocessing.Value, # type: ignore
hits_queue: "queue.PriorityQueue[typing.Tuple[int, typing.List[TopHits]]]",
kill_switch: threading.Event,
callback: typing.Optional[typing.Callable[[HMM, int], None]],
options: typing.Dict[str, typing.Any],
) -> None:
super().__init__()
self.options = options
self.pipeline = Pipeline(alphabet=Alphabet.amino(), **options)
self.contigs = contigs
self.query_queue = query_queue
self.query_count = query_count
self.hits_queue = hits_queue
self.callback = callback or self._none_callback
self.kill_switch = kill_switch
self.error: typing.Optional[BaseException] = None
def run(self) -> None:
while not self.kill_switch.is_set():
args = self.query_queue.get()
if args is None:
self.query_queue.task_done()
return
else:
index, query = args
try:
self.process(index, query)
self.query_queue.task_done()
except BaseException as exc:
self.error = exc
self.kill()
return
def kill(self) -> None:
self.kill_switch.set()
def process(self, index, query):
for hits in self.search(query):
self.hits_queue.put(hits)
self.callback(query, self.query_count.value) # type: ignore
def search(self, query: HMM) -> typing.Iterator[TopHits]:
for contig in self.contigs:
yield self.pipeline.search_hmm(query, contig)
self.pipeline.clear()
class _HMMSearch(object):
def __init__(
self,
queries: typing.Iterable[HMM],
contigs: typing.Collection[typing.Collection[DigitalSequence]],
cpus: int = 0,
callback: typing.Optional[typing.Callable[[HMM, int], None]] = None,
**options,
) -> None:
self.queries = queries
self.contigs = contigs
self.cpus = cpus
self.callback = callback
self.options = options
def _new_thread(
self,
query_queue: "queue.Queue[typing.Optional[typing.Tuple[int, HMM]]]",
query_count: multiprocessing.Value, # type: ignore
hits_queue: "queue.PriorityQueue[typing.Tuple[int, TopHits]]",
kill_switch: threading.Event,
) -> _HMMPipelineThread:
return _HMMPipelineThread(
self.contigs,
query_queue,
query_count,
hits_queue,
kill_switch,
self.callback,
self.options,
)
def _single_threaded(self) -> typing.Iterator[TopHits]:
# create the queues to pass the HMM objects around, as well as atomic
# values that we use to synchronize the threads
query_queue = queue.Queue() # type: ignore
query_count = multiprocessing.Value(ctypes.c_ulong)
hits_queue = queue.Queue() # type: ignore
kill_switch = threading.Event()
# create the thread (to recycle code)
thread = self._new_thread(query_queue, query_count, hits_queue, kill_switch)
# process each HMM independently and yield the result
# immediately
for index, query in enumerate(self.queries):
query_count.value += 1
thread.process(index, query)
while not hits_queue.empty():
yield hits_queue.get_nowait()
def _multi_threaded(self) -> typing.Iterator[TopHits]:
# create the queues to pass the HMM objects around, as well as atomic
# values that we use to synchronize the threads
hits_queue = queue.Queue() # type: ignore
query_queue = queue.Queue() # type: ignore
query_count = multiprocessing.Value(ctypes.c_ulong)
kill_switch = threading.Event()
# create and launch one pipeline thread per CPU
threads = []
for _ in range(self.cpus):
thread = self._new_thread(query_queue, query_count, hits_queue, kill_switch)
thread.start()
threads.append(thread)
# queue the HMMs passed as arguments
for index, query in enumerate(self.queries):
query_count.value += 1
query_queue.put((index, query))
# poison-pill the queue so that threads terminate when they
# have consumed all the HMMs
for _ in threads:
query_queue.put(None)
# wait for all threads to be completed, and yield
# results as soon as available otherwise
while any(thread.is_alive() for thread in threads):
while not hits_queue.empty():
yield hits_queue.get_nowait()
alive = next((thread for thread in threads if thread.is_alive()), None)
if alive is not None:
alive.join(timeout=0.1)
# make sure threads didn't exit with an error, raise it otherwise
for thread in threads:
thread.join()
if thread.error is not None:
raise thread.error
# yield remaining results
while not hits_queue.empty():
yield hits_queue.get_nowait()#[1]
def run(self) -> typing.Iterator[TopHits]:
if self.cpus == 1:
return self._single_threaded()
else:
return self._multi_threaded()
def hmmsearch(
queries: typing.Iterable[HMM],
contigs: typing.Collection[typing.Collection[DigitalSequence]],
cpus: int = 0,
callback: typing.Optional[typing.Callable[[HMM, int], None]] = None,
**options, # type: typing.Any
) -> typing.Iterator[TopHits]:
"""Search HMM profiles against a sequence database.
This function is a patched version from `pyhmmer.hmmer.hmmsearch` that
enables processing several databases in parallel with the same query.
"""
# count the number of CPUs to use
_cpus = cpus if cpus > 0 else psutil.cpu_count(logical=False) or multiprocessing.cpu_count()
runner = _HMMSearch(queries, contigs, _cpus, callback, **options)
return runner.run()
......@@ -17,7 +17,6 @@ import pyrodigal
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from ._base import BinaryRunner
from .model import Gene, Protein, Strand
if typing.TYPE_CHECKING:
......