Skip to content
Snippets Groups Projects
Commit f3689d5b authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Superclass for pipeline parameters

parent 6a682001
No related branches found
No related tags found
No related merge requests found
from fastapi import HTTPException
from pydantic import BaseModel, Field, validator
from ..session import AccessorIdError, session
class PipelineInputParams(BaseModel):
accessor_id: str = Field(description='ID(s) of previously loaded accessor(s) to use as pipeline input')
model_id: str = Field(description='ID(s) of previously loaded segmentation model(s)')
@validator('model_id')
def models_are_loaded(cls, model_id):
if model_id not in session.describe_loaded_models().keys():
raise HTTPException(status_code=409, detail=f'Model with ID {model_id} has not been loaded')
return model_id
@validator('accessor_id')
def accessors_are_loaded(cls, accessor_id):
try:
info = session.get_accessor_info(accessor_id)
except AccessorIdError as e:
raise HTTPException(status_code=409, detail=str(e))
if not info['loaded']:
raise HTTPException(status_code=409, detail=f'Accessor with ID {accessor_id} is not loaded')
return accessor_id
class PipelineOutputParams(BaseModel):
output_accessor_id: str
model_id: str
success: bool
timer: dict
\ No newline at end of file
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter
from ..accessors import GenericImageDataAccessor, generate_file_accessor, write_accessor_data_to_file
from ..accessors import GenericImageDataAccessor
from ..models import SemanticSegmentationModel
from .params import PipelineInputParams, PipelineOutputParams
from ..process import smooth
from ..util import PipelineTrace
from ..session import session, AccessorIdError
from ..session import session
from pydantic import BaseModel, Field, validator
from pydantic import Field
router = APIRouter(
prefix='/pipelines',
tags=['pipelines'],
)
class SegmentParams(BaseModel):
accessor_id: str = Field(description='ID of previously loaded image accessor to use as pipeline input')
model_id: str = Field(description='ID of previously loaded segmentation model')
class SegmentParams(PipelineInputParams):
channel: int = Field(None, description='Channel to use for segmentation; use all channels if empty.')
@validator('model_id')
def model_is_loaded(cls, v):
if v not in session.describe_loaded_models().keys():
raise HTTPException(status_code=409, detail=f'Model with ID {v} has not been loaded')
return v
@validator('accessor_id')
def accessor_is_loaded(cls, v):
try:
acc_info = session.get_accessor_info(v)
except AccessorIdError as e:
raise HTTPException(status_code=409, detail=str(e))
if not acc_info['loaded']:
raise HTTPException(status_code=409, detail=f'Accessor with ID {v} is not loaded')
return v
class SegmentRecord(BaseModel):
output_accessor_id: str
model_id: str
success: bool
timer: dict
class SegmentRecord(PipelineOutputParams):
pass
# TODO: handle registration of intermediate accessors
@router.put('/segment')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment