Skip to content
Snippets Groups Projects
Commit 3776345f authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Renamed task-management class since it is technically not a queue

parent 69283c41
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ def show_session_status():
'paths': session.get_paths(),
'rois': session.list_rois(),
'accessors': session.list_accessors(),
'tasks': session.queue.list_tasks(),
'tasks': session.tasks.list_tasks(),
}
......@@ -214,24 +214,20 @@ class TaskInfo(BaseModel):
error: Union[str, None]
result: Union[Dict, None]
# TODO: cover task API with tests, using dummy task resource endpoint
# TODO: test to cover reporting of exception in running tasks
@app.put('/tasks/{task_id}/run')
def run_task(task_id: str) -> PipelineRecord:
try:
return session.queue.run_task(task_id)
return session.tasks.run_task(task_id)
except RunTaskError as e:
raise HTTPException(409, str(e))
@app.get('/tasks/{task_id}')
def get_task(task_id: str) -> TaskInfo:
return session.queue.get_task_info(task_id)
return session.tasks.get_task_info(task_id)
@app.get('/tasks')
def list_tasks() -> Dict[str, TaskInfo]:
res = session.queue.list_tasks()
res = session.tasks.list_tasks()
return res
# TODO: implement
......
......@@ -53,7 +53,7 @@ def call_pipeline(func, p: PipelineParams) -> Union[PipelineRecord, PipelineQueu
# instead of running right away, schedule pipeline as a task
if p.schedule:
p.schedule = False
task_id = session.queue.add_task(
task_id = session.tasks.add_task(
lambda x: call_pipeline(func, x),
p
)
......
......@@ -33,8 +33,8 @@ class CsvTable(object):
self.empty = False
return True
# TODO: is there a standard library alternative here?
class Queue(object):
class TaskCollection(object):
status_codes = {
'waiting': 'WAITING',
......@@ -44,14 +44,13 @@ class Queue(object):
}
def __init__(self):
self._queue = OrderedDict()
self._tasks = OrderedDict()
self._handles = OrderedDict()
def add_task(self, func: callable, params: dict) -> str:
task_id = str(uuid.uuid4())
name = func.__name__
self._queue[task_id] = {
self._tasks[task_id] = {
'module': func.__module__,
'params': params,
'func_str': str(func),
......@@ -65,13 +64,13 @@ class Queue(object):
return str(task_id)
def get_task_info(self, task_id: str) -> dict:
return self._queue[task_id]
return self._tasks[task_id]
def list_tasks(self) -> OrderedDict:
return self._queue
return self._tasks
def run_task(self, task_id: str):
task = self._queue[task_id]
task = self._tasks[task_id]
f = self._handles[task_id]
p = task['params']
try:
......@@ -96,7 +95,7 @@ class _Session(object):
self.models = {} # model_id : model object
self.paths = self.make_paths()
self.accessors = OrderedDict()
self.queue = Queue()
self.tasks = TaskCollection()
self.rois = OrderedDict()
self.logfile = self.paths['logs'] / f'session.log'
......
......@@ -72,7 +72,7 @@ def create_dummy_task(params: DummyTaskParams) -> PipelineQueueRecord:
d['res'] = d.last.apply(lambda x: 2 * x)
return d
task_id = session.queue.add_task(
task_id = session.tasks.add_task(
lambda x: call_pipeline(_dummy_pipeline, x),
params
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment