Commits (50)
......@@ -31,8 +31,9 @@ variables:
artifacts:
expire_in: 1 week
reports:
cobertura: coverage.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml
# --- Stages -------------------------------------------------------------------
......@@ -57,7 +58,9 @@ test:python3.9:
- dist/*.whl
- coverage.xml
reports:
cobertura: coverage.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml
docs:
image: python:3.9
......
......@@ -5,7 +5,48 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
[Unreleased]: https://git.embl.de/grp-zeller/GECCO/compare/v0.9.2...master
[Unreleased]: https://git.embl.de/grp-zeller/GECCO/compare/v0.9.5...master
## [v0.9.5] - 2022-08-10
[v0.9.5]: https://git.embl.de/grp-zeller/GECCO/compare/v0.9.4...v0.9.5
### Added
- `gecco predict` command to predict BGCs from an annotated genome.
- `Protein.with_seq` function to assign a new sequence to a protein object.
### Fixed
- Issue with antiSMASH sideload JSON file generation in `gecco run` and `gecco predict`.
- Make `gecco.orf` handle STOP codons consistently ([#9](https://github.com/zellerlab/GECCO/issues/9)).
## [v0.9.4] - 2022-05-31
[v0.9.4]: https://git.embl.de/grp-zeller/GECCO/compare/v0.9.3...v0.9.4
### Added
- `classes_` property to `TypeClassifier` to access the `classes_` attribute of the `TypeBinarizer`.
- Alternative ORF finder `CDSFinder` which simply extracts CDS features from input sequences ([#8](https://github.com/zellerlab/GECCO/issues/8)).
- Support for annotating domains with "exclusive" HMMs to annotate genes with *at most* one HMM from the library.
### Changed
- `ProductType` is not restricted to MIBiG types anymore and can support any string as a base type identifier.
- `PyrodigalFinder` now uses `multiprocessing.pool.ThreadPool` instead of custom thread code thanks to `OrfFinder.find_genes` reentrancy introduced in Pyrodigal `v1.0`.
- `PyrodigalFinder` can now be used in single / non-meta mode from the API.
- BUmped minimum `rich` version to `12.3` to use `None` total in progress bars when the size of an HMM library is unknown.
### Fixed
- Broken MyPy type annotations in the `gecco.model` and `gecco.cli` modules.
## [v0.9.3] - 2022-05-13
[v0.9.3]: https://git.embl.de/grp-zeller/GECCO/compare/v0.9.2...v0.9.3
### Changed
- `--format` flag of `gecco annotate` and `gecco run` CLI commands is now made lowercase before giving value to `Bio.SeqIO`.
### Fixed
- Genes with duplicate IDs being silently ignored in `HMMER.run`.
## [v0.9.2] - 2022-04-11
[v0.9.2]: https://git.embl.de/grp-zeller/GECCO/compare/v0.9.1...v0.9.2
......
# This CITATION.cff file was generated with cffinit.
# Visit https://bit.ly/cffinit to generate yours today!
cff-version: 1.2.0
title: >-
Accurate de novo identification of biosynthetic
gene clusters with GECCO
message: >-
If you use this software, please cite it using the
metadata from this file.
type: software
authors:
- family-names: Carroll
email: laura.carroll@embl.de
given-names: Laura M.
affiliation: >-
Structural and Computational Biology Unit,
EMBL, Heidelberg, Germany
orcid: 'https://orcid.org/0000-0002-3677-0192'
- given-names: Martin
family-names: Larralde
email: martin.larralde@embl.de
affiliation: >-
Structural and Computational Biology Unit,
EMBL, Heidelberg, Germany
orcid: 'https://orcid.org/0000-0002-3947-4444'
- given-names: Jonas Simon
family-names: Fleck
email: jonas.simon.fleck@gmail.com
affiliation: >-
Department of Biosystems Science and
Engineering, ETH Zürich, Basel, Switzerland
orcid: 'https://orcid.org/0000-0002-4686-7254'
- given-names: Ruby
family-names: Ponnudurai
email: ruby.ponnudurai@embl.de
affiliation: >-
Structural and Computational Biology Unit,
EMBL, Heidelberg, Germany
orcid: 'https://orcid.org/0000-0003-0105-6287'
- given-names: Alessio
family-names: Milanese
orcid: 'https://orcid.org/0000-0002-7050-2239'
affiliation: >-
Department of Biology, ETH Zürich, Zürich,
Switzerland
email: alessio.milanese@embl.de
- given-names: Elisa
family-names: Cappio Barazzone
orcid: 'https://orcid.org/0000-0003-1295-4672'
email: elisa.cappio@gmail.com
affiliation: >-
Structural and Computational Biology Unit,
EMBL, Heidelberg, Germany
- given-names: Georg
family-names: Zeller
email: georg.zeller@embl.de
affiliation: >-
Structural and Computational Unit, EMBL,
Heidelberg, Germany
orcid: 'https://orcid.org/0000-0003-1429-7485'
identifiers:
- type: doi
value: 10.1101/2021.05.03.442509
description: bioRXiv preprint
repository-code: 'https://github.com/zellerlab/GECCO'
url: 'https://gecco.embl.de'
keywords:
- biosynthetic gene cluster
- conditional random fields
license: GPLv3
include README.md
include LICENSE
include CITATION.cff
include static/gecco.png
include gecco/py.typed
......
......@@ -75,15 +75,22 @@ Additional parameters of interest are:
- `--threshold`, controlling the minimum probability for a gene to be
considered part of a BGC region. Using a lower number will increase the
number (and possibly length) of predictions, but reduce accuracy. The
default of *0.3* was selected to optimize precision/recall on a test set
default of *0.8* was selected to optimize precision/recall on a test set
of 364 BGCs from [MIBiG 2.0](https://mibig.secondarymetabolites.org/).
- `--cds-feature`, which can be supplied a feature name to extract genes
if the input file already contains gene annotations instead of predicting
genes with [Pyrodigal](https://pyrodigal.readthedocs.io). A common value
for records downloaded from GenBank is `--cds-feature CDS`.
## 🔎 Results
GECCO will create the following files:
- `{genome}.genes.tsv`: The *genes* file, containing the genes extracted
or predicted from the input file, and per-gene BGC probabilities
predicted by the CRF.
- `{genome}.features.tsv`: The *features* file, containing the identified
proteins and domains in the input sequences, in tabular format.
domains in the input sequences, in tabular format.
- `{genome}.clusters.tsv`: If any were found, a *clusters* file, containing
the coordinates of the predicted clusters along their putative biosynthetic
type, in tabular format.
......@@ -91,9 +98,9 @@ GECCO will create the following files:
containing the cluster sequence annotated with its member proteins and domains.
To get a more visual way of exploring of the predictions, you
can open the GenBank files in a genome editing software like [UGENE](http://ugene.net/),
or you can load the results into an AntiSMASH report.
Check the [Integrations](https://gecco.embl.de/integrations.html#antismash) page of the
can open the GenBank files in a genome editing software like [UGENE](http://ugene.net/).
You can otherwise load the results into an AntiSMASH report: check the
[Integrations](https://gecco.embl.de/integrations.html#antismash) page of the
documentation for a step-by-step guide.
......
......@@ -10,4 +10,4 @@ See Also:
__author__ = "Martin Larralde"
__license__ = "GPLv3"
__version__ = "0.9.2"
__version__ = "0.9.5"
......@@ -7,13 +7,30 @@ import operator
import subprocess
import typing
from collections.abc import Sized
from typing import Iterable, Iterator, List, NamedTuple, Optional, Type, TextIO, Union
from subprocess import DEVNULL
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Type,
TextIO,
Union,
Sequence,
)
from ._meta import classproperty, requires
if typing.TYPE_CHECKING:
import pandas
_SELF = typing.TypeVar("_SELF")
_TABLE = typing.TypeVar("_TABLE", bound="Table")
def _parse_str(value: str) -> str:
......@@ -78,19 +95,20 @@ class Loadable(metaclass=abc.ABCMeta):
@classmethod
def loads(cls: typing.Type[_SELF], s: str) -> _SELF:
return self.load(io.StringIO(s))
return cls.load(io.StringIO(s)) # type: ignore
class Table(Dumpable, Loadable, Sized):
class Table(Dumpable, Loadable, Sequence["Table.Row"]):
"""A metaclass for objects that
"""
Row: typing.Type[typing.NamedTuple]
class Row(typing.NamedTuple):
pass
def __bool__(self) -> bool: # noqa: D105
return len(self) != 0
def __iadd__(self: _SELF, rhs: _SELF) -> _SELF: # noqa: D105
def __iadd__(self: _TABLE, rhs: object) -> _TABLE: # noqa: D105
if not isinstance(rhs, type(self)):
return NotImplemented
for col in self.__annotations__:
......@@ -98,21 +116,21 @@ class Table(Dumpable, Loadable, Sized):
return self
@typing.overload
def __getitem__(self: _SELF, item: slice) -> _SELF: # noqa: D105
def __getitem__(self, item: int) -> "Table.Row": # noqa: D105
pass
@typing.overload
def __getitem__(self, item: int) -> "Row": # noqa: D105
def __getitem__(self: _TABLE, item: slice) -> _TABLE: # noqa: D105
pass
def __getitem__(self: _SELF, item: Union[slice, int]) -> Union[_SELF, "Row"]: # noqa: D105
def __getitem__(self: _TABLE, item: Union[slice, int]) -> Union[_TABLE, "Table.Row"]: # noqa: D105
columns = [getattr(self, col)[item] for col in self.__annotations__]
if isinstance(item, slice):
return type(self)(*columns)
else:
return self.Row(*columns)
def __iter__(self) -> Iterator["Row"]: # noqa: D105
def __iter__(self) -> Iterator["Table.Row"]: # noqa: D105
columns = { c: operator.attrgetter(c) for c in self.__annotations__ }
for i in range(len(self)):
row = { c: getter(self)[i] for c, getter in columns.items() }
......@@ -126,7 +144,7 @@ class Table(Dumpable, Loadable, Sized):
optional_columns.add(name)
return optional_columns
_FORMAT_FIELD: dict = {
_FORMAT_FIELD: Dict[Any, Callable[[Any], str]] = {
str: _format_str,
int: _format_int,
float: _format_float,
......@@ -167,7 +185,7 @@ class Table(Dumpable, Loadable, Sized):
for i in range(len(self)):
writer.writerow([format(col[i]) for col,format in zip(columns, formatters)])
_PARSE_FIELD: dict = {
_PARSE_FIELD: Dict[Any, Callable[[str], Any]] = {
str: _parse_str,
int: _parse_int,
float: _parse_float,
......@@ -177,7 +195,7 @@ class Table(Dumpable, Loadable, Sized):
}
@classmethod
def load(cls: typing.Type[_SELF], fh: TextIO, dialect: str = "excel-tab") -> _SELF:
def load(cls: typing.Type[_TABLE], fh: TextIO, dialect: str = "excel-tab") -> _TABLE:
"""Load a table in CSV format from a file handle in text mode.
"""
table = cls()
......@@ -204,14 +222,14 @@ class Table(Dumpable, Loadable, Sized):
return table
@requires("pandas")
def to_dataframe(self) -> "pandas.DataFrame":
def to_dataframe(self) -> "pandas.DataFrame": # type: ignore
"""Convert the table to a `~pandas.DataFrame`.
Raises:
ImportError: if the `pandas` module could not be imported.
"""
frame = pandas.DataFrame() # type: ignore
frame = pandas.DataFrame()
for column in self.__annotations__:
frame[column] = getattr(self, column)
return frame
......@@ -9,16 +9,17 @@ import locale
import operator
import typing
from multiprocessing.pool import Pool
from typing import Any, Callable, Iterable, Iterator, List, Tuple, Optional, Type
from typing import Any, Callable, Iterable, Iterator, List, Tuple, Union, Optional, Type
if typing.TYPE_CHECKING:
from types import TracebackType
from types import TracebackType, ModuleType
_S = typing.TypeVar("_S")
_T = typing.TypeVar("_T")
_A = typing.TypeVar("_A")
_R = typing.TypeVar("_R")
# _F = typing.TypeVar("_F", bound=Callable[[_A], _R])
_F = typing.TypeVar("_F", bound=Callable[..., Any])
class classproperty(property):
"""A class property decorator.
......@@ -51,7 +52,9 @@ class requires:
"""A decorator for functions that require optional dependencies.
"""
def __init__(self, module_name):
module: Union["ModuleType", BaseException]
def __init__(self, module_name: str) -> None:
self.module_name = module_name
try:
......@@ -59,12 +62,12 @@ class requires:
except ImportError as err:
self.module = err
def __call__(self, func):
def __call__(self, func: "_F") -> "_F":
if isinstance(self.module, ImportError):
@functools.wraps(func)
def newfunc(*args, **kwargs):
def newfunc(*args, **kwargs): # type: ignore
msg = f"calling {func.__qualname__} requires module {self.module.name}"
raise RuntimeError(msg) from self.module
else:
......@@ -73,14 +76,14 @@ class requires:
basename = self.module_name.split(".")[-1]
newfunc.__globals__[basename] = self.module
return newfunc
return newfunc # type: ignore
class UniversalContainer(object):
"""A container that contains everything.
"""
def __repr__(self):
def __repr__(self) -> str:
return f"{self.__class__.__name__}()"
def __contains__(self, item: object) -> bool:
......@@ -99,7 +102,7 @@ def sliding_window(length: int, window: int, step: int) -> Iterator[slice]:
@contextlib.contextmanager
def patch_locale(name: str):
def patch_locale(name: str) -> Iterator[None]:
"""Create a context manager to locally change the locale in use.
"""
lc = locale.setlocale(locale.LC_TIME)
......
......@@ -10,3 +10,5 @@ def main(argv: Optional[List[str]] = None, stream: Optional[TextIO] = None) -> i
with contextlib.ExitStack() as ctx:
return Main(argv, stream).execute(ctx)
return 0
......@@ -7,15 +7,19 @@ import io
import logging
import typing
import warnings
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Type, TextIO
from types import ModuleType, TracebackType
from typing import Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, Optional, Type, TextIO, Union, Tuple
from .._meta import classproperty
if typing.TYPE_CHECKING:
from rich.progress import Progress, TaskID
ShowWarning = typing.Callable[
[str, Type[Warning], str, int, Optional[TextIO], Optional[str]],
[Union[Warning, str], Type[Warning], str, int, Optional[TextIO], Optional[str]],
None
]
_F = typing.TypeVar("_F", bound=Callable[..., Any])
class ProgressReader(io.RawIOBase):
......@@ -23,7 +27,7 @@ class ProgressReader(io.RawIOBase):
"""
@staticmethod
def scale_size(length):
def scale_size(length: float) -> Tuple[float, int, str]:
for scale, unit in enumerate(["B", "kiB", "MiB", "GiB", "TiB"]):
if length > 1024:
length /= 1024
......@@ -31,48 +35,58 @@ class ProgressReader(io.RawIOBase):
break
return length, scale, unit
def __init__(self, handle, progress, task, scale=0):
def __init__(
self,
handle: BinaryIO,
progress: "Progress",
task: "TaskID",
scale: int = 0
):
self.handle = handle
self.progress = progress
self.task = task
self.scale = scale
def __enter__(self):
def __enter__(self) -> "ProgressReader":
self.handle.__enter__()
return self
def __exit__(self, exc_val, exc_ty, tb):
self.handle.__exit__(exc_val, exc_ty, tb)
return False
def __exit__(
self,
exc_ty: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
tb: Optional[TracebackType]
) -> None:
self.handle.__exit__(exc_ty, exc_val, tb)
def _update(self, length):
def _update(self, length: int) -> None:
self.progress.update(self.task, advance=length / (1024 ** self.scale))
def readable(self):
def readable(self) -> bool:
return True
def seekable(self):
def seekable(self) -> bool:
return False
def writable(self):
def writable(self) -> bool:
return False
def readline(self, size=-1):
line = self.handle.readline(size)
def readline(self, size: Optional[int] = -1) -> bytes:
line = self.handle.readline(-1 if size is None else size)
self._update(len(line))
return line
def readlines(self, hint=-1):
lines = self.handle.readlines(hint)
def readlines(self, hint: Optional[int] = -1) -> List[bytes]:
lines = self.handle.readlines(-1 if hint is None else hint)
self._update(sum(map(len, lines)))
return lines
def read(self, size=-1):
block = self.handle.read(size)
def read(self, size: Optional[int] = -1) -> bytes:
block = self.handle.read(-1 if size is None else size)
self._update(len(block))
return block
def close(self):
def close(self) -> None:
self.handle.close()
......@@ -80,17 +94,17 @@ class ProgressReader(io.RawIOBase):
def patch_showwarnings(new_showwarning: "ShowWarning") -> Iterator[None]:
"""Make a context patching `warnings.showwarning` with the given function.
"""
old_showwarning = warnings.showwarning
old_showwarning: ShowWarning = warnings.showwarning
try:
warnings.showwarning = new_showwarning
warnings.showwarning = new_showwarning # type: ignore
yield
finally:
warnings.showwarning = old_showwarning
warnings.showwarning = old_showwarning # type: ignore
@contextlib.contextmanager
def numpy_error_context(
numpy,
numpy: ModuleType,
*,
all: Optional[str] = None,
divide: Optional[str] = None,
......@@ -139,15 +153,17 @@ def guess_sequences_format(path: str) -> Optional[str]:
return "fasta"
elif head.startswith("LOCUS"):
return "genbank"
elif head.startswith("ID "):
return "embl"
else:
return None
def in_context(func):
def in_context(func: "_F") -> "_F":
@functools.wraps(func)
def newfunc(*args, **kwargs):
def newfunc(*args, **kwargs): # type: ignore
with contextlib.ExitStack() as ctx:
return func(*args, ctx, **kwargs)
return newfunc
return newfunc # type: ignore
......@@ -28,7 +28,7 @@ class CommandExit(Exception):
"""An error to request immediate exit from a function.
"""
def __init__(self, code):
def __init__(self, code: int) -> None:
self.code = code
class Command(metaclass=abc.ABCMeta):
......@@ -48,14 +48,14 @@ class Command(metaclass=abc.ABCMeta):
other number on error.
"""
return NotImplemented # type: ignore
return NotImplemented
@classmethod
@abc.abstractmethod
def doc(cls, fast: bool = False) -> str:
"""Get the help message for the command.
"""
return NotImplemented # type: ignore
return NotImplemented
# -- Concrete methods ----------------------------------------------------
......@@ -131,7 +131,7 @@ class Command(metaclass=abc.ABCMeta):
_check = (lambda x: True) if check is None else check
if self.args[name] is None:
if optional:
return default
return default # type: ignore
self.error(f"Missing value for argument [purple]{name}[/]")
raise InvalidArgument(self.args[name])
try:
......@@ -145,7 +145,7 @@ class Command(metaclass=abc.ABCMeta):
self.error(f"Invalid value for argument [purple]{name}[/]:", repr(self.args[name]), f"(expected {hint})")
raise InvalidArgument(self.args[name]) from err
else:
return value
return value # type: ignore
def _on_import_error(
self,
......@@ -161,7 +161,7 @@ class Command(metaclass=abc.ABCMeta):
# -- Logging methods -----------------------------------------------------
def error(self, message, *args, level=0):
def error(self, message: str, *args: Any, level: int = 0) -> None:
if self.quiet <= 2 and level <= self.verbose:
if self.verbose <= 1:
self.console.print(
......@@ -177,7 +177,7 @@ class Command(metaclass=abc.ABCMeta):
*args,
)
def info(self, verb, *args, level=1):
def info(self, verb: str, *args: Any, level: int = 1) -> None:
if self.quiet == 0 and level <= self.verbose:
if self.verbose <= 1:
self.console.print(
......@@ -193,7 +193,7 @@ class Command(metaclass=abc.ABCMeta):
*args,
)
def success(self, verb, *args, level=1):
def success(self, verb: str, *args: Any, level: int = 1) -> None:
if self.quiet == 0 and level <= self.verbose:
if self.verbose <= 1:
self.console.print(
......@@ -209,7 +209,7 @@ class Command(metaclass=abc.ABCMeta):
*args,
)
def warn(self, verb, *args, level=0):
def warn(self, verb: str, *args: Any, level: int = 0) -> None:
if self.quiet <= 1 and level <= self.verbose:
if self.verbose <= 1:
self.console.print(
......@@ -225,7 +225,7 @@ class Command(metaclass=abc.ABCMeta):
*args
)
def _logprefix(self):
def _logprefix(self) -> List[str]:
return [
f"[dim cyan]{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}[/]",
f"[dim purple]{self._hostname}[/]",
......
......@@ -7,7 +7,7 @@ import sys
import textwrap
import typing
import warnings
from typing import Mapping, Optional, Type
from typing import Mapping, Optional, Type, List
import docopt
import operator
......@@ -21,29 +21,29 @@ from ._base import Command, CommandExit, InvalidArgument
try:
import importlib.metadata as importlib_metadata
except ImportError:
import importlib_metadata
import importlib_metadata # type: ignore
class Main(Command):
"""The *main* command launched before processing subcommands.
"""
_entry_points_cache = None
_entry_points_cache: Optional[List["importlib_metadata.EntryPoint"]] = None
@classproperty
def _entry_points(cls):
@classmethod
def _entry_points(cls) -> List["importlib_metadata.EntryPoint"]:
if cls._entry_points_cache is None:
cls._entry_points_cache = importlib_metadata.entry_points().get(__parent__, [])
cls._entry_points_cache = list(importlib_metadata.entry_points().get(__parent__, []))
return cls._entry_points_cache
@classmethod
def _get_subcommand_names(cls) -> Mapping[str, Type[Command]]:
return [cmd.name for cmd in cls._entry_points]
def _get_subcommand_names(cls) -> List[str]:
return [cmd.name for cmd in cls._entry_points()]
@classmethod
def _get_subcommands(cls) -> Mapping[str, Type[Command]]:
commands = {}
for cmd in cls._entry_points:
for cmd in cls._entry_points():
try:
commands[cmd.name] = cmd.load()
except Exception:
......@@ -52,15 +52,15 @@ class Main(Command):
@classmethod
def _get_subcommand_by_name(cls, name: str) -> Optional[Type[Command]]:
for cmd in cls._entry_points:
for cmd in cls._entry_points():
if cmd.name == name:
return cmd.load()
return cmd.load() # type: ignore
return None
# --
@classmethod
def doc(cls, fast=False): # noqa: D102
def doc(cls, fast: bool = False) -> str: # noqa: D102
if fast:
commands = (f" {cmd}" for cmd in cls._get_subcommand_names())
else:
......@@ -108,7 +108,7 @@ class Main(Command):
try:
# check arguments and enter context
self._check()
ctx.enter_context(patch_showwarnings(self._showwarnings))
ctx.enter_context(patch_showwarnings(self._showwarnings)) # type: ignore
# Get the subcommand class
subcmd_name = self.args["<cmd>"]
......
import gzip
import io
import itertools
import os
import operator
import typing
from typing import BinaryIO, Container, Iterator, Iterable, Optional, List
from .._utils import ProgressReader, guess_sequences_format
from ._base import Command, CommandExit, InvalidArgument
if typing.TYPE_CHECKING:
from Bio.SeqRecord import SeqRecord
from ...model import Cluster, Gene, FeatureTable, GeneTable
from ...hmmer import HMM
from ...types import TypeClassifier
class SequenceLoaderMixin(Command):
format: Optional[str]
genome: str
def _load_sequences(self) -> Iterator["SeqRecord"]:
from Bio import SeqIO
try:
# guess format or use the one given in CLI
if self.format is not None:
format: Optional[str] = self.format.lower()
self.info("Using", "user-provided sequence format", repr(format), level=2)
else:
self.info("Detecting", "sequence format from file contents", level=2)
format = guess_sequences_format(self.genome)
if format is None:
raise RuntimeError(f"Failed to detect format of {self.genome!r}")
self.success("Detected", "format of input as", repr(format), level=2)
# get filesize and unit
input_size = os.stat(self.genome).st_size
total, scale, unit = ProgressReader.scale_size(input_size)
task = self.progress.add_task("Loading sequences", total=total, unit=unit, precision=".1f")
# load sequences
n = 0
self.info("Loading", "sequences from genomic file", repr(self.genome), level=1)
with ProgressReader(open(self.genome, "rb"), self.progress, task, scale) as f:
for record in SeqIO.parse(io.TextIOWrapper(f), format): # type: ignore
yield record
n += 1
except FileNotFoundError as err:
self.error("Could not find input file:", repr(self.genome))
raise CommandExit(err.errno) from err
except ValueError as err:
self.error("Failed to load sequences:", err)
raise CommandExit(getattr(err, "errno", 1)) from err
else:
self.success("Found", n, "sequences", level=1)
class TableLoaderMixin(Command):
genes: str
features: List[str]
def _load_genes(self) -> Iterator["Gene"]:
from ...model import GeneTable
try:
# get filesize and unit
input_size = os.stat(self.genes).st_size
total, scale, unit = ProgressReader.scale_size(input_size)
task = self.progress.add_task("Loading genes", total=total, unit=unit, precision=".1f")
# load gene table
self.info("Loading", "genes table from file", repr(self.genes))
with ProgressReader(open(self.genes, "rb"), self.progress, task, scale) as genes_file:
yield from GeneTable.load(io.TextIOWrapper(genes_file)).to_genes() # type: ignore
except OSError as err:
self.error("Fail to parse genes coordinates: {}", err)
raise CommandExit(err.errno) from err
def _load_features(self) -> "FeatureTable":
from ...model import FeatureTable
features = FeatureTable()
for filename in self.features:
try:
# get filesize and unit
input_size = os.stat(filename).st_size
total, scale, unit = ProgressReader.scale_size(input_size)
task = self.progress.add_task("Loading features", total=total, unit=unit, precision=".1f")
# load features
self.info("Loading", "features table from file", repr(filename))
with ProgressReader(open(filename, "rb"), self.progress, task, scale) as in_:
features += FeatureTable.load(io.TextIOWrapper(in_)) # type: ignore
except FileNotFoundError as err:
self.error("Could not find feature file:", repr(filename))
raise CommandExit(err.errno) from err
self.success("Loaded", "a total of", len(features), "features", level=1)
return features
def _annotate_genes(self, genes: List["Gene"], features: "FeatureTable") -> List["Gene"]:
from ...model import Domain
# index genes by protein_id
gene_index = { gene.protein.id:gene for gene in genes }
if len(gene_index) < len(genes):
raise ValueError("Duplicate gene names in input genes")
# add domains from the feature table
unit = "row" if len(features) == 1 else "rows"
task = self.progress.add_task("Annotating genes", total=len(features), unit=unit, precision="")
for row in typing.cast(Iterable["FeatureTable.Row"], self.progress.track(features, total=len(features), task_id=task)):
# get gene by ID and check consistency
gene = gene_index[row.protein_id]
if gene.source.id != row.sequence_id:
raise ValueError(f"Mismatched source sequence for {row.protein_id!r}: {gene.source.id!r} != {row.sequence_id!r}")
elif gene.end - gene.start != row.end - row.start:
raise ValueError(f"Mismatched gene length for {row.protein_id!r}: {gene.end - gene.start!r} != {row.end - row.start!r}")
elif gene.start != row.start:
raise ValueError(f"Mismatched gene start for {row.protein_id!r}: {gene.start!r} != {row.start!r}")
elif gene.end != row.end:
raise ValueError(f"Mismatched gene end for {row.protein_id!r}: {gene.end!r} != {row.end!r}")
elif gene.strand.sign != row.strand:
raise ValueError(f"Mismatched gene strand for {row.protein_id!r}: {gene.strand.sign!r} != {row.strand!r}")
elif gene.source.id != row.sequence_id:
raise ValueError(f"Mismatched sequence ID {row.protein_id!r}: {gene.source.id!r} != {row.sequence_id!r}")
# add the row domain to the gene
domain = Domain(
name=row.domain,
start=row.domain_start,
end=row.domain_end,
hmm=row.hmm,
i_evalue=row.i_evalue,
pvalue=row.pvalue,
probability=row.bgc_probability,
)
gene.protein.domains.append(domain)
# return
return list(gene_index.values())
class OutputWriterMixin(Command):
genome: str
output_dir: str
def _make_output_directory(self, outputs: List[str]) -> None:
# Make output directory
self.info("Using", "output folder", repr(self.output_dir), level=1)
try:
os.makedirs(self.output_dir, exist_ok=True)
except OSError as err:
self.error("Could not create output directory: {}", err)
raise CommandExit(err.errno) from err
# Check if output files already exist
for output in outputs:
if os.path.isfile(os.path.join(self.output_dir, output)):
self.warn("Output folder contains files that will be overwritten")
break
def _write_feature_table(self, genes: List["Gene"]) -> None:
from ...model import FeatureTable
base, _ = os.path.splitext(os.path.basename(self.genome))
pred_out = os.path.join(self.output_dir, f"{base}.features.tsv")
self.info("Writing", "feature table to", repr(pred_out), level=1)
with open(pred_out, "w") as f:
FeatureTable.from_genes(genes).dump(f)
def _write_genes_table(self, genes: List["Gene"]) -> None:
from ...model import GeneTable
base, _ = os.path.splitext(os.path.basename(self.genome))
pred_out = os.path.join(self.output_dir, f"{base}.genes.tsv")
self.info("Writing", "gene table to", repr(pred_out), level=1)
with open(pred_out, "w") as f:
GeneTable.from_genes(genes).dump(f)
def _write_cluster_table(self, clusters: List["Cluster"]) -> None:
from ...model import ClusterTable
base, _ = os.path.splitext(os.path.basename(self.genome))
cluster_out = os.path.join(self.output_dir, f"{base}.clusters.tsv")
self.info("Writing", "cluster table to", repr(cluster_out), level=1)
with open(cluster_out, "w") as out:
ClusterTable.from_clusters(clusters).dump(out)
def _write_clusters(self, clusters: List["Cluster"], merge: bool = False) -> None:
from Bio import SeqIO
if merge:
base, _ = os.path.splitext(os.path.basename(self.genome))
gbk_out = os.path.join(self.output_dir, f"{base}.clusters.gbk")
records = (cluster.to_seq_record() for cluster in clusters)
SeqIO.write(records, gbk_out, "genbank")
else:
for cluster in clusters:
gbk_out = os.path.join(self.output_dir, f"{cluster.id}.gbk")
self.info("Writing", f"cluster [bold blue]{cluster.id}[/] to", repr(gbk_out), level=1)
SeqIO.write(cluster.to_seq_record(), gbk_out, "genbank")
class DomainFilterMixin(Command):
e_filter: Optional[float]
p_filter: Optional[float]
def _filter_domains(self, genes: List["Gene"]) -> List["Gene"]:
# Filter i-evalue and p-value if required
if self.e_filter is not None:
self.info("Excluding", "domains with e-value over", self.e_filter, level=1)
key = lambda d: d.i_evalue < self.e_filter
genes = [
gene.with_protein(gene.protein.with_domains(filter(key, gene.protein.domains)))
for gene in genes
]
if self.p_filter is not None:
self.info("Excluding", "domains with p-value over", self.p_filter, level=1)
key = lambda d: d.pvalue < self.p_filter
genes = [
gene.with_protein(gene.protein.with_domains(filter(key, gene.protein.domains)))
for gene in genes
]
if self.p_filter is not None or self.e_filter is not None:
count = sum(1 for gene in genes for domain in gene.protein.domains)
self.info("Using", "remaining", count, "domains", level=1)
return genes
class AnnotatorMixin(DomainFilterMixin):
hmm: Optional[List[str]]
hmm_x: Optional[List[str]]
jobs: int
def _custom_hmms(self) -> Iterable["HMM"]:
from ...hmmer import HMM
for path in typing.cast(List[str], self.hmm):
base = os.path.basename(path)
file: BinaryIO = open(path, "rb")
if base.endswith(".gz"):
base, _ = os.path.splitext(base)
file = gzip.GzipFile(fileobj=file, mode="rb") # type: ignore
base, _ = os.path.splitext(base)
yield HMM(
id=base,
version="?",
url="?",
path=path,
size=None,
exclusive=False,
relabel_with=r"s/([^\.]*)(\..*)?/\1/"
)
for path in typing.cast(List[str], self.hmm_x):
base = os.path.basename(path)
file = open(path, "rb")
if base.endswith(".gz"):
base, _ = os.path.splitext(base)
file = gzip.GzipFile(fileobj=file, mode="rb") # type: ignore
base, _ = os.path.splitext(base)
yield HMM(
id=base,
version="?",
url="?",
path=path,
size=None,
exclusive=True,
relabel_with=r"s/([^\.]*)(\..*)?/\1/"
)
def _annotate_domains(self, genes: List["Gene"], whitelist: Optional[Container[str]] = None) -> List["Gene"]:
from ...hmmer import PyHMMER, embedded_hmms
self.info("Running", "HMMER domain annotation", level=1)
# Run all HMMs over ORFs to annotate with protein domains
hmms = list(self._custom_hmms() if self.hmm else embedded_hmms())
task = self.progress.add_task(description=f"Annotating domains", unit="HMMs", total=len(hmms), precision="")
for hmm in self.progress.track(hmms, task_id=task, total=len(hmms)):
task = self.progress.add_task(description=f" {hmm.id} v{hmm.version}", total=hmm.size, unit="domains", precision="")
callback = lambda h, t: self.progress.update(task, advance=1)
self.info("Starting", f"annotation with [bold blue]{hmm.id} v{hmm.version}[/]", level=2)
genes = PyHMMER(hmm, self.jobs, whitelist).run(genes, progress=callback)
self.success("Finished", f"annotation with [bold blue]{hmm.id} v{hmm.version}[/]", level=2)
self.progress.update(task_id=task, visible=False)
# Count number of annotated domains
count = sum(1 for gene in genes for domain in gene.protein.domains)
self.success("Found", count, "domains across all proteins", level=1)
# Filter i-evalue and p-value if required
genes = self._filter_domains(genes)
# Sort genes
self.info("Sorting", "genes by coordinates", level=2)
genes.sort(key=lambda g: (g.source.id, g.start, g.end))
for gene in genes:
gene.protein.domains.sort(key=operator.attrgetter("start", "end"))
return genes
class PredictorMixin(Command):
model: Optional[str]
no_pad: bool
threshold: float
postproc: str
cds: int
edge_distance: int
def _predict_probabilities(self, genes: List["Gene"]) -> List["Gene"]:
from ...crf import ClusterCRF
if self.model is None:
self.info("Loading", "embedded CRF pre-trained model", level=1)
else:
self.info("Loading", "CRF pre-trained model from", repr(self.model), level=1)
crf = ClusterCRF.trained(self.model)
self.info("Predicting", "cluster probabilitites with the CRF model", level=1)
unit = "genes" if len(genes) > 1 else "gene"
task = self.progress.add_task("Predicting marginals", total=len(genes), unit=unit, precision="")
return list(crf.predict_probabilities(
self.progress.track(genes, task_id=task, total=len(genes)),
pad=not self.no_pad,
))
def _extract_clusters(self, genes: List["Gene"]) -> List["Cluster"]:
from ...refine import ClusterRefiner
self.info("Extracting", "predicted biosynthetic regions", level=1)
refiner = ClusterRefiner(
threshold=self.threshold,
criterion=self.postproc,
n_cds=self.cds,
edge_distance=self.edge_distance
)
total = len({gene.source.id for gene in genes})
unit = "contigs" if total > 1 else "contig"
task = self.progress.add_task("Extracting clusters", total=total, unit=unit, precision="")
clusters: List["Cluster"] = []
gene_groups = itertools.groupby(genes, lambda g: g.source.id) # type: ignore
for _, gene_group in self.progress.track(gene_groups, task_id=task, total=total):
clusters.extend(refiner.iter_clusters(list(gene_group)))
return clusters
def _load_type_classifier(self) -> "TypeClassifier":
from ...types import TypeClassifier
self.info("Loading", "type classifier from internal model", level=2)
return TypeClassifier.trained(self.model)
def _predict_types(self, clusters: List["Cluster"], classifier: "TypeClassifier") -> List["Cluster"]:
from ...model import ProductType
self.info("Predicting", "BGC types", level=1)
unit = "cluster" if len(clusters) == 1 else "clusters"
task = self.progress.add_task("Predicting types", total=len(clusters), unit=unit, precision="")
clusters_new = []
for cluster in self.progress.track(clusters, task_id=task):
clusters_new.extend(classifier.predict_types([cluster]))
if cluster.type:
name = "/".join(f"[bold blue]{name}[/]" for name in cluster.type.names)
prob = "/".join(f"[bold purple]{cluster.type_probabilities[name]:.0%}[/]" for name in cluster.type.names)
self.success(f"Predicted type of [bold blue]{cluster.id}[/] as {name} ({prob} confidence)")
else:
ty = max(cluster.type_probabilities, key=cluster.type_probabilities.get) # type: ignore
prob = f"[bold purple]{cluster.type_probabilities[ty]:.0%}[/]"
name = f"[bold blue]{ty}[/]"
self.warn(f"Couldn't assign type to [bold blue]{cluster.id}[/] (maybe {name}, {prob} confidence)")
return clusters_new
# class PredictorMixin(Command):
......@@ -15,9 +15,10 @@ import pickle
import tempfile
import typing
import signal
from typing import Any, Dict, Union, Optional, List, TextIO, Mapping
from typing import Any, BinaryIO, Container, Dict, Iterable, Union, Optional, List, TextIO, Mapping
from ._base import Command, CommandExit, InvalidArgument
from ._mixins import SequenceLoaderMixin, OutputWriterMixin, AnnotatorMixin
from .._utils import (
guess_sequences_format,
in_context,
......@@ -25,18 +26,24 @@ from .._utils import (
ProgressReader,
)
if typing.TYPE_CHECKING:
from Bio.SeqRecord import SeqRecord
from ...hmmer import HMM
from ...model import Gene
from ...orf import ORFFinder
class Annotate(Command): # noqa: D101
class Annotate(SequenceLoaderMixin, OutputWriterMixin, AnnotatorMixin): # noqa: D101
summary = "annotate protein features of one or several contigs."
@classmethod
def doc(cls, fast=False): # noqa: D102
def doc(cls, fast: bool = False) -> str: # noqa: D102
return f"""
gecco annotate - {cls.summary}
Usage:
gecco annotate --genome <file> [--hmm <hmm>]... [options]
gecco annotate --genome <file> [--hmm <hmm>]... [--hmm-x <hmm>]... [options]
Arguments:
-g <file>, --genome <file> a genomic file containing one or more
......@@ -57,7 +64,7 @@ class Annotate(Command): # noqa: D101
-o <out>, --output-dir <out> the directory in which to write the
output files. [default: .]
--force-tsv always write TSV output files even
when they are empty (e.g. because
when they are empty (e.g. because
no genes or no clusters were found).
......@@ -65,6 +72,13 @@ class Annotate(Command): # noqa: D101
-M, --mask Enable unknown region masking to
prevent genes from stretching across
unknown nucleotides.
--cds-feature <cds_feature> Extract genes from annotated records
using a feature rather than calling
genes from scratch.
--locus-tag <locus_tag> The name of the feature qualifier
to use for naming extracted genes
when using the ``--cds-feature``
flag. [default: locus_tag]
Parameters - Domain Annotation:
-e <e>, --e-filter <e> the e-value cutoff for protein domains
......@@ -77,9 +91,11 @@ class Annotate(Command): # noqa: D101
Parameters - Debug:
--hmm <hmm> the path to one or more alternative
HMM file to use (in HMMER format).
--hmm-x <hmm> the path to one or more exclusive
HMM file to use (in HMMER format).
"""
def _check(self) -> typing.Optional[int]:
def _check(self) -> None:
super()._check()
try:
self.e_filter = self._check_flag(
......@@ -97,172 +113,42 @@ class Annotate(Command): # noqa: D101
optional=True,
)
self.jobs = self._check_flag("--jobs", int, lambda x: x >= 0, hint="positive or null integer")
self.format = self._check_flag("--format", optional=True)
self.genome = self._check_flag("--genome")
self.hmm = self._check_flag("--hmm", optional=True)
self.output_dir = self._check_flag("--output-dir")
self.format: Optional[str] = self._check_flag("--format", optional=True)
self.genome: str = self._check_flag("--genome")
self.hmm: Optional[List[str]] = self._check_flag("--hmm", optional=True)
self.hmm_x: Optional[List[str]] = self._check_flag("--hmm-x", optional=True)
self.output_dir: str = self._check_flag("--output-dir")
self.mask = self._check_flag("--mask", bool)
self.force_tsv = self._check_flag("--force-tsv", bool)
self.cds_feature: Optional[str] = self._check_flag("--cds-feature", optional=True)
self.locus_tag: str = self._check_flag("--locus-tag")
except InvalidArgument:
raise CommandExit(1)
def _custom_hmms(self):
from ...hmmer import HMM
for path in self.hmm:
base = os.path.basename(path)
file = open(path, "rb")
if base.endswith(".gz"):
base, _ = os.path.splitext(base)
file = gzip.GzipFile(fileobj=file)
base, _ = os.path.splitext(base)
yield HMM(
id=base,
version="?",
url="?",
path=path,
size=1,
relabel_with=r"s/([^\.]*)(\..*)?/\1/"
)
# ---
_OUTPUT_FILES = ["features.tsv", "genes.tsv"]
def _make_output_directory(self, extensions: List[str]) -> None:
# Make output directory
self.info("Using", "output folder", repr(self.output_dir), level=1)
try:
os.makedirs(self.output_dir, exist_ok=True)
except OSError as err:
self.error("Could not create output directory: {}", err)
raise CommandExit(err.errno) from err
# Check if output files already exist
base, _ = os.path.splitext(os.path.basename(self.genome))
# output_exts = ["features.tsv", "genes.tsv", "clusters.tsv"]
# if self.antismash_sideload:
# output_exts.append("sideload.json")
for ext in extensions:
if os.path.isfile(os.path.join(self.output_dir, f"{base}.{ext}")):
self.warn("Output folder contains files that will be overwritten")
break