Commit 954c14ca authored by Martin Larralde's avatar Martin Larralde
Browse files

Rewrite `pyhmmer.hmmer` to make main thread block on query insertion rather than result retrieval

parent e43dfa18
Pipeline #33527 passed with stage
in 4 minutes and 35 seconds
......@@ -99,7 +99,7 @@ class _PipelineThread(typing.Generic[_Q], threading.Thread):
Attributes:
sequence (`pyhmmer.plan7.PipelineSearchTargets`): The target
sequences to search for hits.
query_queue (`collections.deque`): The queue used to pass queries
query_queue (`queue.Queue`): The queue used to pass queries
between threads. It contains the query, its index so that the
results can be returned in the same order, and a `_ResultBuffer`
where to store the result when the query has been processed.
......@@ -129,7 +129,7 @@ class _PipelineThread(typing.Generic[_Q], threading.Thread):
self,
sequences: PipelineSearchTargets,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[_Q]]],
query_queue: "queue.Queue[typing.Optional[_Chore[_Q]]]",
query_count: multiprocessing.Value, # type: ignore
kill_switch: threading.Event,
callback: typing.Optional[typing.Callable[[_Q, int], None]],
......@@ -142,7 +142,7 @@ class _PipelineThread(typing.Generic[_Q], threading.Thread):
self.sequences = sequences
self.pipeline = pipeline_class(alphabet=alphabet, **options)
self.query_available: threading.Semaphore = query_available
self.query_queue: typing.Deque[typing.Optional[_Chore[_Q]]] = query_queue
self.query_queue = query_queue
self.query_count = query_count
self.callback: typing.Optional[typing.Callable[[_Q, int], None]] = callback or self._none_callback
self.kill_switch = kill_switch
......@@ -154,7 +154,7 @@ class _PipelineThread(typing.Generic[_Q], threading.Thread):
# been killed, even when no queries are available
if not self.query_available.acquire(timeout=1):
continue
chore = self.query_queue.popleft()
chore = self.query_queue.get_nowait()
# check if arguments from the queue are a poison-pill (`None`),
# in which case the thread will stop running
if chore is None:
......@@ -192,7 +192,7 @@ class _SequencePipelineThread(_PipelineThread[DigitalSequence]):
self,
sequences: PipelineSearchTargets,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[DigitalSequence]]],
query_queue: "queue.Queue[typing.Optional[_Chore[DigitalSequence]]]",
query_count: multiprocessing.Value, # type: ignore
kill_switch: threading.Event,
callback: typing.Optional[typing.Callable[[DigitalSequence, int], None]],
......@@ -223,7 +223,7 @@ class _MSAPipelineThread(_PipelineThread[DigitalMSA]):
self,
sequences: PipelineSearchTargets,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[DigitalMSA]]],
query_queue: "queue.Queue[typing.Optional[_Chore[DigitalMSA]]]",
query_count: multiprocessing.Value, # type: ignore
kill_switch: threading.Event,
callback: typing.Optional[typing.Callable[[DigitalMSA, int], None]],
......@@ -261,7 +261,7 @@ class _Search(typing.Generic[_Q], abc.ABC):
callback: typing.Optional[typing.Callable[[_Q, int], None]] = None,
pipeline_class: typing.Type[Pipeline] = Pipeline,
alphabet: Alphabet = Alphabet.amino(),
**options # type: typing.Dict[str, object]
**options # type: typing.Dict[str, object]get
) -> None:
self.queries: typing.Iterable[_Q] = queries
self.cpus = cpus
......@@ -278,7 +278,7 @@ class _Search(typing.Generic[_Q], abc.ABC):
def _new_thread(
self,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[_Q]]],
query_queue: "queue.Queue[typing.Optional[_Chore[_Q]]]",
query_count: "multiprocessing.Value[int]", # type: ignore
kill_switch: threading.Event,
) -> _PipelineThread[_Q]:
......@@ -288,7 +288,7 @@ class _Search(typing.Generic[_Q], abc.ABC):
# create the queues to pass the HMM objects around, as well as atomic
# values that we use to synchronize the threads
query_available = threading.Semaphore(0)
query_queue = collections.deque() # type: ignore
query_queue = queue.Queue() # type: ignore
query_count = multiprocessing.Value(ctypes.c_ulong)
kill_switch = threading.Event()
......@@ -309,13 +309,9 @@ class _Search(typing.Generic[_Q], abc.ABC):
# create the queues to pass the query objects around, as well as
# atomic values that we use to synchronize the threads
results: typing.Deque[_Chore[_Q]] = collections.deque()
query_queue = collections.deque() # type: ignore
query_queue = queue.Queue(maxsize=self.cpus) # type: ignore
query_count = multiprocessing.Value(ctypes.c_ulong)
kill_switch = threading.Event()
# the maximum number of queries to put in the query queue, to
# avoid filling the memory: using more than one query per thread
# permits some redundancy in case a query is very long to process,
query_bound = self.cpus * 2
# create and launch one pipeline thread per CPU
threads = []
......@@ -334,20 +330,18 @@ class _Search(typing.Generic[_Q], abc.ABC):
# get the next query and add it to the query queue
query_count.value += 1
chore = _Chore(query)
query_queue.append(chore)
query_queue.put(chore)
query_available.release()
results.append(chore)
# aggressively wait for the result with a very short
# timeout, and exit the loop if the queue is not full
while len(query_queue) >= query_bound:
if results[0].wait(timeout=0.01):
yield results[0].get()
results.popleft()
break
if results[0].available():
yield results[0].get()
results.popleft()
# now that we exhausted all queries, poison pill the
# threads so they stop on their own gracefully
for _ in threads:
query_queue.append(None)
query_queue.put(None)
query_available.release()
# yield all remaining results, in order
while results:
......@@ -371,7 +365,7 @@ class _ModelSearch(typing.Generic[_M], _Search[_M]):
def _new_thread(
self,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[_M]]],
query_queue: "queue.Queue[typing.Optional[_Chore[_M]]]",
query_count: "multiprocessing.Value[int]", # type: ignore
kill_switch: threading.Event,
) -> _ModelPipelineThread[_M]:
......@@ -407,7 +401,7 @@ class _SequenceSearch(_Search[DigitalSequence]):
def _new_thread(
self,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[DigitalSequence]]],
query_queue: "queue.Queue[typing.Optional[_Chore[DigitalSequence]]]",
query_count: "multiprocessing.Value[int]", # type: ignore
kill_switch: threading.Event,
) -> _SequencePipelineThread:
......@@ -444,7 +438,7 @@ class _MSASearch(_Search[DigitalMSA]):
def _new_thread(
self,
query_available: threading.Semaphore,
query_queue: typing.Deque[typing.Optional[_Chore[DigitalMSA]]],
query_queue: "queue.Queue[typing.Optional[_Chore[DigitalMSA]]]",
query_count: "multiprocessing.Value[int]", # type: ignore
kill_switch: threading.Event,
) -> _MSAPipelineThread:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment