Skip to content
Snippets Groups Projects

Pipeline task management

Merged Christopher Randolph Rhodes requested to merge dev_task_queue into staging
1 file
+ 11
2
Compare changes
  • Side-by-side
  • Inline
@@ -60,6 +60,7 @@ class TaskCollection(object):
}
self._handles[task_id] = func
logger.info(f'Added task {task_id}: {str(func)}')
return str(task_id)
@@ -84,14 +85,17 @@ class TaskCollection(object):
f = self._handles[task_id]
p = task['params']
try:
logger.info(f'Started running task {task_id}')
task['status'] = self.status_codes['in_progress']
result = f(p)
logger.info(f'Finished running task {task_id}')
task['status'] = self.status_codes['finished']
task['result'] = result
return result
except Exception as e:
task['status'] = self.status_codes['failed']
task['error'] = str(e)
logger.error(f'Error running task {task_id}: {str(e)}')
raise RunTaskError(e)
class _Session(object):
@@ -160,6 +164,7 @@ class _Session(object):
idx = len(self.accessors)
accessor_id = f'acc_{idx:06d}'
self.accessors[accessor_id] = {'loaded': True, 'object': acc, **acc.info}
self.log_info(f'Added accessor {accessor_id}')
return accessor_id
def del_accessor(self, accessor_id: str) -> str:
@@ -177,6 +182,7 @@ class _Session(object):
assert isinstance(v['object'], GenericImageDataAccessor)
v['loaded'] = False
v['object'] = None
self.log_info(f'Deleted accessor {accessor_id}')
return accessor_id
def del_all_accessors(self) -> list[str]:
@@ -190,6 +196,7 @@ class _Session(object):
v['object'] = None
v['loaded'] = False
res.append(k)
self.log_info(f'Deleted accessor {k}')
return res
@@ -224,11 +231,12 @@ class _Session(object):
self.del_accessor(acc_id)
return acc
def write_accessor(self, acc_id: str, filename: Union[str, None] = None, pop=True) -> str:
def write_accessor(self, acc_id: str, filename: Union[str, None] = None, pop: bool = True) -> str:
"""
Write an accessor to file and unload it from the session
Write an accessor to file and optionally unload it from the session
:param acc_id: accessor's ID
:param filename: force use of a specific filename, raise InvalidPathError if this already exists
:param pop: unload accessor from the session if True
:return: name of file
"""
if filename is None:
@@ -250,6 +258,7 @@ class _Session(object):
else:
acc.write(fp)
self.accessors[acc_id]['filepath'] = fp.__str__()
self.log_info(f'Wrote accessor {acc_id} to {fp.__str__()}')
return fp.name
def add_roiset(self, roiset: RoiSet, roiset_id: str = None) -> str:
Loading