Skip to content
Snippets Groups Projects
Commit 794d976c authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Tests pass using task queue

parent 2a3f7bab
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ from typing import Dict, List, Union
from fastapi import FastAPI, HTTPException
from .accessors import generate_file_accessor
from .models import BinaryThresholdSegmentationModel
from .pipelines.shared import PipelineRecord
from .roiset import IntensityThresholdInstanceMaskSegmentationModel, RoiSetExportParams, SerializeRoiSetError
from .session import session, AccessorIdError, InvalidPathError, RoiSetIdError, WriteAccessorError
......@@ -130,7 +131,7 @@ def delete_accessor(accessor_id: str):
else:
return _session_accessor(session.del_accessor, accessor_id)
# TODO: optional lazy loading, so that batch task can be queued before file data is loaded
@app.put('/accessors/read_from_file/{filename}')
def read_accessor_from_file(filename: str, accessor_id: Union[str, None] = None):
fp = session.paths['inbound_images'] / filename
......@@ -215,9 +216,10 @@ class TaskInfo(BaseModel):
# TODO: cover task API with tests, using dummy task resource endpoint
# TODO: return something smarter than bool
# TODO: test to cover reporting of exception in running tasks
@app.put('/tasks/{task_id}/run')
def run_task(task_id: str) -> bool:
def run_task(task_id: str) -> PipelineRecord:
return session.queue.run_task(task_id)
@app.get('/tasks/{task_id}')
......@@ -228,3 +230,9 @@ def get_task(task_id: str) -> TaskInfo:
def list_tasks() -> Dict[str, TaskInfo]:
res = session.queue.list_tasks()
return res
# TODO: implement
@app.put('/tasks/run/on_files')
def task_file_batch():
# new callable that parameterizes file name to acc_id, then passes this to to /tasks/*/run
pass
\ No newline at end of file
......@@ -47,7 +47,7 @@ class PipelineRecord(BaseModel):
class PipelineQueueRecord(BaseModel):
task_id: str
# TODO: variant of this for queued tasks where accessor ID is separately parameterized
def call_pipeline(func, p: PipelineParams) -> PipelineRecord:
# match accessor IDs to loaded accessor objects
accessors_in = {}
......
......@@ -76,16 +76,14 @@ class Queue(object):
p = task['params']
try:
task['status'] = self.status_codes['in_progress']
task['result'] = f(p)
result = f(p)
task['status'] = self.status_codes['finished']
return True
task['result'] = result
return result
except Exception as e:
task['status'] = self.status_codes['failed']
task['error'] = e
return False
raise e
class _Session(object):
"""
......
......@@ -287,4 +287,4 @@ class TestTaskQueuedRoiSetWorkflowOverApi(TestRoiSetWorkflowOverApi):
self.assertTrue(res_run)
self.assertEqual(self.assertGetSuccess(f'tasks/{task_id}')['status'], 'FINISHED')
return False
\ No newline at end of file
return res_run
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment