Skip to content
Snippets Groups Projects

Pipeline task management

Merged Christopher Randolph Rhodes requested to merge dev_task_queue into staging
4 files
+ 17
11
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -4,6 +4,7 @@ from time import perf_counter
from typing import List, Union
from fastapi import HTTPException
from numba.scripts.generate_lower_listing import description
from pydantic import BaseModel, Field, root_validator
from ..accessors import GenericImageDataAccessor, InMemoryDataAccessor
@@ -12,6 +13,7 @@ from ..session import session, AccessorIdError
class PipelineParams(BaseModel):
schedule: bool = Field(False, description='Schedule as a task instead of running immediately')
keep_interm: bool = Field(False, description='Keep accessors to intermediate images in session')
api: bool = Field(True, description='Validate parameters against server session and map HTTP errors if True')
@@ -44,7 +46,19 @@ class PipelineRecord(BaseModel):
roiset_id: Union[str, None] = None
def call_pipeline(func, p: PipelineParams) -> PipelineRecord:
class PipelineQueueRecord(BaseModel):
task_id: str
def call_pipeline(func, p: PipelineParams) -> Union[PipelineRecord, PipelineQueueRecord]:
# instead of running right away, schedule pipeline as a task
if p.schedule:
p.schedule = False
task_id = session.tasks.add_task(
lambda x: call_pipeline(func, x),
p
)
return PipelineQueueRecord(task_id=task_id)
# match accessor IDs to loaded accessor objects
accessors_in = {}
Loading