Skip to content
Snippets Groups Projects

Pipeline task management

Merged Christopher Randolph Rhodes requested to merge dev_task_queue into staging
4 files
+ 30
5
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 31
6
from pydantic import BaseModel, Field
from typing import List, Union
from typing import Dict, List, Union
from fastapi import FastAPI, HTTPException
from .accessors import generate_file_accessor
from .models import BinaryThresholdSegmentationModel
from .pipelines.shared import PipelineRecord
from .roiset import IntensityThresholdInstanceMaskSegmentationModel, RoiSetExportParams, SerializeRoiSetError
from .session import session, AccessorIdError, InvalidPathError, RoiSetIdError, WriteAccessorError
from .session import session, AccessorIdError, InvalidPathError, RoiSetIdError, RunTaskError, WriteAccessorError
app = FastAPI(debug=True)
@@ -51,6 +52,7 @@ def show_session_status():
'paths': session.get_paths(),
'rois': session.list_rois(),
'accessors': session.list_accessors(),
'tasks': session.tasks.list_tasks(),
}
@@ -129,7 +131,6 @@ def delete_accessor(accessor_id: str):
else:
return _session_accessor(session.del_accessor, accessor_id)
@app.put('/accessors/read_from_file/{filename}')
def read_accessor_from_file(filename: str, accessor_id: Union[str, None] = None):
fp = session.paths['inbound_images'] / filename
@@ -140,9 +141,9 @@ def read_accessor_from_file(filename: str, accessor_id: Union[str, None] = None)
@app.put('/accessors/write_to_file/{accessor_id}')
def write_accessor_to_file(accessor_id: str, filename: Union[str, None] = None) -> str:
def write_accessor_to_file(accessor_id: str, filename: Union[str, None] = None, pop: bool = True) -> str:
try:
return session.write_accessor(accessor_id, filename)
return session.write_accessor(accessor_id, filename, pop=pop)
except AccessorIdError as e:
raise HTTPException(404, f'Did not find accessor with ID {accessor_id}')
except WriteAccessorError as e:
@@ -202,4 +203,28 @@ def roiset_get_object_map(
raise HTTPException(
404,
f'Did not find object map from classification model {model_id} in RoiSet {roiset_id}'
)
\ No newline at end of file
)
class TaskInfo(BaseModel):
module: str
params: dict
func_str: str
status: str
error: Union[str, None]
result: Union[Dict, None]
@app.put('/tasks/{task_id}/run')
def run_task(task_id: str) -> PipelineRecord:
try:
return session.tasks.run_task(task_id)
except RunTaskError as e:
raise HTTPException(409, str(e))
@app.get('/tasks/{task_id}')
def get_task(task_id: str) -> TaskInfo:
return session.tasks.get_task_info(task_id)
@app.get('/tasks')
def list_tasks() -> Dict[str, TaskInfo]:
res = session.tasks.list_tasks()
return res
\ No newline at end of file
Loading