Skip to content
Snippets Groups Projects
Commit 2a3f7bab authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Tasks runs, just need to hook up result with pipeline test

parent ff6e182d
No related branches found
No related tags found
2 merge requests!102Merge staging as release,!72Pipeline task management
This commit is part of merge request !72. Comments created here will be created in the context of that merge request.
......@@ -213,13 +213,18 @@ class TaskInfo(BaseModel):
error: Union[str, None]
result: Union[Dict, None]
# TODO: cover is API tests, with dummy task resource endpoint
# TODO: cover task API with tests, using dummy task resource endpoint
@app.get('/tasks/{task_id')
# TODO: return something smarter than bool
@app.put('/tasks/{task_id}/run')
def run_task(task_id: str) -> bool:
return session.queue.run_task(task_id)
@app.get('/tasks/{task_id}')
def get_task(task_id: str) -> TaskInfo:
return session.queue.get_task_info(task_id)
@app.get('/tasks')
def list_tasks() -> Dict[str, TaskInfo]:
res = session.queue.list_tasks()
return res
\ No newline at end of file
return res
......@@ -45,6 +45,7 @@ class Queue(object):
def __init__(self):
self._queue = OrderedDict()
self._handles = OrderedDict()
def add_task(self, func: callable, params: dict) -> str:
task_id = str(uuid.uuid4())
......@@ -59,6 +60,8 @@ class Queue(object):
'result': None,
}
self._handles[task_id] = func
return str(task_id)
def get_task_info(self, task_id: str) -> dict:
......@@ -69,7 +72,7 @@ class Queue(object):
def run_task(self, task_id: str):
task = self._queue[task_id]
f = task['func']
f = self._handles[task_id]
p = task['params']
try:
task['status'] = self.status_codes['in_progress']
......
......@@ -262,7 +262,7 @@ class TestRoiSetWorkflowOverApi(conf.TestServerBaseClass, BaseTestRoiSetMonoProd
class TestTaskQueuedRoiSetWorkflowOverApi(TestRoiSetWorkflowOverApi):
def _object_map_workflow(self, ob_classifer_id):
res = self.assertPutSuccess(
res_queue = self.assertPutSuccess(
'pipelines/queue/roiset_to_obmap',
body={
'accessor_id': self.test_load_input_accessor(),
......@@ -275,8 +275,16 @@ class TestTaskQueuedRoiSetWorkflowOverApi(TestRoiSetWorkflowOverApi):
}
)
tasks = self.assertGetSuccess('tasks')
# check that task in enqueued
task_id = res_queue['task_id']
task_info = self.assertGetSuccess(f'tasks/{task_id}')
self.assertEqual(task_info['status'], 'WAITING')
# run the task
res_run = self.assertPutSuccess(
f'tasks/{task_id}/run'
)
self.assertTrue(res_run)
self.assertEqual(self.assertGetSuccess(f'tasks/{task_id}')['status'], 'FINISHED')
# check on enqueued task
task_id = res['task_id']
return res
\ No newline at end of file
return False
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment