Newer
Older
from collections import OrderedDict

Christopher Randolph Rhodes
committed
import logging
from pathlib import Path, PureWindowsPath
from pydantic import BaseModel
from typing import Union
import pandas as pd
from ..conf import defaults
from .accessors import GenericImageDataAccessor, PatchStack
logger = logging.getLogger(__name__)
class CsvTable(object):
def __init__(self, fpath: Path):
self.path = fpath
self.empty = True
def append(self, coords: dict, data: pd.DataFrame) -> bool:
assert isinstance(data, pd.DataFrame)
for c in reversed(coords.keys()):
data.insert(0, c, coords[c])
if self.empty:
data.to_csv(self.path, index=False, mode='w', header=True)
else:
data.to_csv(self.path, index=False, mode='a', header=False)
self.empty = False
return True

Christopher Randolph Rhodes
committed
class _Session(object):

Christopher Randolph Rhodes
committed
Singleton class for a server session that persists data between API calls
log_format = '%(asctime)s - %(levelname)s - %(message)s'

Christopher Randolph Rhodes
committed
def __init__(self):
self.models = {} # model_id : model object
self.paths = self.make_paths()
self.accessors = OrderedDict()

Christopher Randolph Rhodes
committed
self.logfile = self.paths['logs'] / f'session.log'
logging.basicConfig(filename=self.logfile, level=logging.INFO, force=True, format=self.log_format)

Christopher Randolph Rhodes
committed
self.log_info('Initialized session')
self.tables = {}
def write_to_table(self, name: str, coords: dict, data: pd.DataFrame):
"""
Write data to a named data table, initializing if it does not yet exist.
:param name: name of the table to persist through session
:param coords: dictionary of coordinates to associate with all rows in this method call
:param data: DataFrame containing data
:return: True if successful
"""
try:
if name in self.tables.keys():
table = self.tables.get(name)
else:
table = CsvTable(self.paths['tables'] / (name + '.csv'))
self.tables[name] = table
except Exception:
raise CouldNotCreateTable(f'Unable to create table named {name}')
try:
table.append(coords, data)
return True
except Exception:
raise CouldNotAppendToTable(f'Unable to append data to table named {name}')

Christopher Randolph Rhodes
committed
def get_paths(self):
return self.paths
def set_data_directory(self, key: str, path: str):
if not key in self.paths.keys():
raise InvalidPathError(f'No such path {key}')
if not Path(path).exists():
raise InvalidPathError(f'Could not find {path}')
self.paths[key] = Path(path)

Christopher Randolph Rhodes
committed
def add_accessor(self, acc: GenericImageDataAccessor, accessor_id: str = None) -> str:
"""
Add an accessor to session context
:param acc: accessor to add
:param accessor_id: unique ID, or autogenerate if None
:return: ID of accessor
"""
if accessor_id in self.accessors.keys():
raise AccessorIdError(f'Access with ID {accessor_id} already exists')
if accessor_id is None:
idx = len(self.accessors)
accessor_id = f'auto_{idx:06d}'
self.accessors[accessor_id] = {'loaded': True, 'object': acc, **acc.info}
return accessor_id

Christopher Randolph Rhodes
committed
def del_accessor(self, accessor_id: str) -> str:
Remove accessor object but retain its info dictionary
:param accessor_id: accessor's ID
:return: ID of accessor
"""
if accessor_id not in self.accessors.keys():
raise AccessorIdError(f'No accessor with ID {accessor_id} is registered')
v = self.accessors[accessor_id]
if isinstance(v, dict) and v['loaded'] is False:
logger.warning(f'Accessor {accessor_id} is already deleted')
else:
assert isinstance(v['object'], GenericImageDataAccessor)
v['loaded'] = False
v['object'] = None
return accessor_id
def del_all_accessors(self) -> list[str]:
"""
Remove (unload) all accessors but keep their info in dictionary
:return: list of removed accessor IDs
"""
res = []
for k, v in self.accessors.items():
if v['loaded']:
v['object'] = None
v['loaded'] = False
res.append(k)
return res
def list_accessors(self) -> dict:
"""
List information about all accessors in JSON-readable format
"""
if len(self.accessors):
return pd.DataFrame(self.accessors).drop('object').to_dict()
else:
return {}

Christopher Randolph Rhodes
committed
def get_accessor_info(self, acc_id: str) -> dict:
"""
Get information about a single accessor
"""
if acc_id not in self.accessors.keys():
raise AccessorIdError(f'No accessor with ID {acc_id} is registered')
return self.list_accessors()[acc_id]

Christopher Randolph Rhodes
committed
def get_accessor(self, acc_id: str, pop: bool = True) -> GenericImageDataAccessor:
"""
Return an accessor object
:param acc_id: accessor's ID
:param pop: remove object from session accessor registry if True
:return: accessor object
"""
if acc_id not in self.accessors.keys():
raise AccessorIdError(f'No accessor with ID {acc_id} is registered')
acc = self.accessors[acc_id]['object']
if pop:
self.del_accessor(acc_id)
return acc
def write_accessor(self, acc_id: str, filename: Union[str, None] = None) -> str:
"""
Write an accessor to file and unload it from the session
:param acc_id: accessor's ID
:param filename: force use of a specific filename, raise InvalidPathError if this already exists
:return: name of file
"""
if filename is None:
fp = self.paths['outbound_images'] / f'{acc_id}.tif'
else:
fp = self.paths['outbound_images'] / filename
if fp.exists():
raise InvalidPathError(f'Cannot overwrite file {filename} when writing accessor')
acc = self.get_accessor(acc_id, pop=True)
old_fp = self.accessors[acc_id]['filepath']
if old_fp != '':
raise WriteAccessorError(
f'Cannot overwrite accessor that is already written to {old_fp}'
)
if isinstance(acc, PatchStack):
acc.export_pyxcz(fp)
else:
acc.write(fp)
self.accessors[acc_id]['filepath'] = fp.__str__()
@staticmethod

Christopher Randolph Rhodes
committed
def make_paths() -> dict:

Christopher Randolph Rhodes
committed
Set paths where images, logs, etc. are located in this session; can set custom session root data directory
with SVLT_SESSION_ROOT environmental variable
:return: dictionary of session paths
"""

Christopher Randolph Rhodes
committed
root = os.environ.get('SVLT_SESSION_ROOT', defaults.root)
root_path = Path(root)

Christopher Randolph Rhodes
committed
sid = _Session.create_session_id(root_path)
paths = {'root': root_path}
for pk in ['inbound_images', 'outbound_images', 'logs', 'tables']:
pa = root_path / sid / defaults.subdirectories[pk]
paths[pk] = pa
try:
pa.mkdir(parents=True, exist_ok=True)
except Exception:
raise CouldNotCreateDirectory(f'Could not create directory: {pa}')
return paths
@staticmethod
def create_session_id(look_where: Path) -> str:
"""
Autogenerate a session ID by incrementing from a list of log files.
"""
yyyymmdd = strftime('%Y%m%d', localtime())
idx = 0
while os.path.exists(look_where / f'{yyyymmdd}-{idx:04d}'):
idx += 1
return f'{yyyymmdd}-{idx:04d}'
def get_log_data(self) -> list:
log = []
with open(self.logfile, 'r') as fh:
for line in fh:
k = ['datatime', 'level', 'message']
v = line.strip().split(' - ')[0:3]
log.insert(0, dict(zip(k, v)))
return log

Christopher Randolph Rhodes
committed
def log_info(self, msg):
logger.info(msg)
def log_warning(self, msg):
logger.warning(msg)
def log_error(self, msg):
logger.error(msg)
def load_model(
self,
ModelClass: Model,
key: Union[str, None] = None,
params: Union[BaseModel, None] = None,
) -> dict:

Christopher Randolph Rhodes
committed
Load an instance of a given model class and attach to this session's model registry
:param ModelClass: subclass of Model
:param key: unique identifier of model, or autogenerate if None

Christopher Randolph Rhodes
committed
:param params: optional parameters that are passed to the model's construct

Christopher Randolph Rhodes
committed
:return: model_id of loaded model

Christopher Randolph Rhodes
committed
mi = ModelClass(params=params.dict() if params else None)

Christopher Randolph Rhodes
committed
assert mi.loaded, f'Error loading instance of {ModelClass.__name__}'
ii = 0
if key is None:
def mid(i):
while mid(ii) in self.models.keys():
ii += 1
key = mid(ii)
elif key in self.models.keys():
raise CouldNotInstantiateModelError(f'Model with key {key} already exists.')
self.models[key] = {

Christopher Randolph Rhodes
committed
'object': mi,

Christopher Randolph Rhodes
committed
'params': getattr(mi, 'params', None)

Christopher Randolph Rhodes
committed
}

Christopher Randolph Rhodes
committed
self.log_info(f'Loaded model {key}')

Christopher Randolph Rhodes
committed
return key

Christopher Randolph Rhodes
committed
def describe_loaded_models(self) -> dict:
return {
k: {
'class': self.models[k]['object'].__class__.__name__,
'params': self.models[k]['params'],
}
for k in self.models.keys()
}
def find_param_in_loaded_models(self, key: str, value: str, is_path=False) -> str:

Christopher Randolph Rhodes
committed
"""

Christopher Randolph Rhodes
committed
Returns model_id of first model where key and value match with .params field, or None

Christopher Randolph Rhodes
committed
:param is_path: uses platform-independent path comparison if True

Christopher Randolph Rhodes
committed
"""

Christopher Randolph Rhodes
committed

Christopher Randolph Rhodes
committed
models = self.describe_loaded_models()
for mid, det in models.items():

Christopher Randolph Rhodes
committed
if is_path:
if PureWindowsPath(det.get('params').get(key)).as_posix() == Path(value).as_posix():

Christopher Randolph Rhodes
committed
return mid

Christopher Randolph Rhodes
committed
else:
if det.get('params').get(key) == value:

Christopher Randolph Rhodes
committed
return mid

Christopher Randolph Rhodes
committed
return None
def restart(self, **kwargs):
self.__init__(**kwargs)
# create singleton instance

Christopher Randolph Rhodes
committed
session = _Session()
class Error(Exception):
pass
class InferenceRecordError(Error):

Christopher Randolph Rhodes
committed
pass
class CouldNotInstantiateModelError(Error):
pass
class AccessorIdError(Error):
pass
class WriteAccessorError(Error):
pass
class CouldNotCreateDirectory(Error):
pass
class CouldNotCreateTable(Error):
pass
class CouldNotAppendToTable(Error):
pass
class InvalidPathError(Error):