Skip to content
Snippets Groups Projects
session.py 10.8 KiB
Newer Older
from collections import OrderedDict
import os

from pathlib import Path, PureWindowsPath
from pydantic import BaseModel
from time import strftime, localtime
from .accessors import GenericImageDataAccessor, PatchStack
from .models import Model
logger = logging.getLogger(__name__)

class CsvTable(object):
    def __init__(self, fpath: Path):
        self.path = fpath
        self.empty = True

    def append(self, coords: dict, data: pd.DataFrame) -> bool:
        assert isinstance(data, pd.DataFrame)
        for c in reversed(coords.keys()):
            data.insert(0, c, coords[c])
        if self.empty:
            data.to_csv(self.path, index=False, mode='w', header=True)
        else:
            data.to_csv(self.path, index=False, mode='a', header=False)
        self.empty = False
        return True

    Singleton class for a server session that persists data between API calls
    log_format = '%(asctime)s - %(levelname)s - %(message)s'

    def __init__(self):
        self.models = {}  # model_id : model object
        self.paths = self.make_paths()
        self.accessors = OrderedDict()
        logging.basicConfig(filename=self.logfile, level=logging.INFO, force=True, format=self.log_format)
        self.tables = {}

    def write_to_table(self, name: str, coords: dict, data: pd.DataFrame):
        """
        Write data to a named data table, initializing if it does not yet exist.
        :param name: name of the table to persist through session
        :param coords: dictionary of coordinates to associate with all rows in this method call
        :param data: DataFrame containing data
        :return: True if successful
        """
        try:
            if name in self.tables.keys():
                table = self.tables.get(name)
            else:
                table = CsvTable(self.paths['tables'] / (name + '.csv'))
                self.tables[name] = table
        except Exception:
            raise CouldNotCreateTable(f'Unable to create table named {name}')

        try:
            table.append(coords, data)
            return True
        except Exception:
            raise CouldNotAppendToTable(f'Unable to append data to table named {name}')
    def get_paths(self):
        return self.paths

    def set_data_directory(self, key: str, path: str):
        if not key in self.paths.keys():
            raise InvalidPathError(f'No such path {key}')
        if not Path(path).exists():
            raise InvalidPathError(f'Could not find {path}')
    def add_accessor(self, acc: GenericImageDataAccessor, accessor_id: str = None) -> str:
        """
        Add an accessor to session context
        :param acc: accessor to add
        :param accessor_id: unique ID, or autogenerate if None
        :return: ID of accessor
        """
        if accessor_id in self.accessors.keys():
            raise AccessorIdError(f'Access with ID {accessor_id} already exists')
        if accessor_id is None:
            idx = len(self.accessors)
            accessor_id = f'auto_{idx:06d}'
        self.accessors[accessor_id] = {'loaded': True, 'object': acc, **acc.info}
    def del_accessor(self, accessor_id: str) -> str:
        Remove accessor object but retain its info dictionary
        :param accessor_id: accessor's ID
        :return: ID of accessor
        """
        if accessor_id not in self.accessors.keys():
            raise AccessorIdError(f'No accessor with ID {accessor_id} is registered')
        v = self.accessors[accessor_id]
        if isinstance(v, dict) and v['loaded'] is False:
            logger.warning(f'Accessor {accessor_id} is already deleted')
        else:
            assert isinstance(v['object'], GenericImageDataAccessor)
            v['loaded'] = False
            v['object'] = None
    def del_all_accessors(self) -> list[str]:
        """
        Remove (unload) all accessors but keep their info in dictionary
        :return: list of removed accessor IDs
        """
        res = []
        for k, v in self.accessors.items():
            if v['loaded']:
                v['object'] = None
                v['loaded'] = False
                res.append(k)
        return res


    def list_accessors(self) -> dict:
        """
        List information about all accessors in JSON-readable format
        """
        if len(self.accessors):
            return pd.DataFrame(self.accessors).drop('object').to_dict()
        else:
            return {}
    def get_accessor_info(self, acc_id: str) -> dict:
        """
        Get information about a single accessor
        """
        if acc_id not in self.accessors.keys():
            raise AccessorIdError(f'No accessor with ID {acc_id} is registered')
        return self.list_accessors()[acc_id]

    def get_accessor(self, acc_id: str, pop: bool = True) -> GenericImageDataAccessor:
        """
        Return an accessor object
        :param acc_id: accessor's ID
        :param pop: remove object from session accessor registry if True
        :return: accessor object
        """
        if acc_id not in self.accessors.keys():
            raise AccessorIdError(f'No accessor with ID {acc_id} is registered')
        acc = self.accessors[acc_id]['object']
        if pop:
            self.del_accessor(acc_id)
        return acc

    def write_accessor(self, acc_id: str, filename: Union[str, None] = None) -> str:
        """
        Write an accessor to file and unload it from the session
        :param acc_id: accessor's ID
        :param filename: force use of a specific filename, raise InvalidPathError if this already exists
        :return: name of file
        """
        if filename is None:
            fp = self.paths['outbound_images'] / f'{acc_id}.tif'
        else:
            fp = self.paths['outbound_images'] / filename
            if fp.exists():
                raise InvalidPathError(f'Cannot overwrite file {filename} when writing accessor')
        acc = self.get_accessor(acc_id, pop=True)

        old_fp = self.accessors[acc_id]['filepath']
        if old_fp != '':
            raise WriteAccessorError(
                f'Cannot overwrite accessor that is already written to {old_fp}'
            )

        if isinstance(acc, PatchStack):
            acc.export_pyxcz(fp)
        else:
            acc.write(fp)
        self.accessors[acc_id]['filepath'] = fp.__str__()
        Set paths where images, logs, etc. are located in this session; can set custom session root data directory
        with SVLT_SESSION_ROOT environmental variable
        :return: dictionary of session paths
        """

        root = os.environ.get('SVLT_SESSION_ROOT', defaults.root)
        root_path = Path(root)
        for pk in ['inbound_images', 'outbound_images', 'logs', 'tables']:
            pa = root_path / sid / defaults.subdirectories[pk]
            paths[pk] = pa
            try:
                pa.mkdir(parents=True, exist_ok=True)
            except Exception:
                raise CouldNotCreateDirectory(f'Could not create directory: {pa}')
        return paths

    @staticmethod
    def create_session_id(look_where: Path) -> str:
        """
        Autogenerate a session ID by incrementing from a list of log files.
        """
        yyyymmdd = strftime('%Y%m%d', localtime())
        idx = 0
        while os.path.exists(look_where / f'{yyyymmdd}-{idx:04d}'):
            idx += 1
        return f'{yyyymmdd}-{idx:04d}'

    def get_log_data(self) -> list:
        log = []
        with open(self.logfile, 'r') as fh:
            for line in fh:
                k = ['datatime', 'level', 'message']
                v = line.strip().split(' - ')[0:3]
                log.insert(0, dict(zip(k, v)))
        return log

        logger.info(msg)

    def log_warning(self, msg):
        logger.warning(msg)

    def log_error(self, msg):
        logger.error(msg)
    def load_model(
            self,
            ModelClass: Model,
            key: Union[str, None] = None,
            params: Union[BaseModel, None] = None,
    ) -> dict:
        Load an instance of a given model class and attach to this session's model registry
        :param ModelClass: subclass of Model
        :param key: unique identifier of model, or autogenerate if None
        :param params: optional parameters that are passed to the model's construct
        mi = ModelClass(params=params.dict() if params else None)
        assert mi.loaded, f'Error loading instance of {ModelClass.__name__}'
        ii = 0
        if key is None:
            def mid(i):
                return f'{mi.name}_{i:02d}'
            while mid(ii) in self.models.keys():
                ii += 1

            key = mid(ii)
        elif key in self.models.keys():
            raise CouldNotInstantiateModelError(f'Model with key {key} already exists.')
        return {
            k: {
                'class': self.models[k]['object'].__class__.__name__,
                'params': self.models[k]['params'],
            }
            for k in self.models.keys()
        }
    def find_param_in_loaded_models(self, key: str, value: str, is_path=False) -> str:
        Returns model_id of first model where key and value match with .params field, or None
        :param is_path: uses platform-independent path comparison if True
        models = self.describe_loaded_models()
        for mid, det in models.items():
                if PureWindowsPath(det.get('params').get(key)).as_posix() == Path(value).as_posix():
    def restart(self, **kwargs):
        self.__init__(**kwargs)
class Error(Exception):
    pass

class InferenceRecordError(Error):
class WriteAccessorError(Error):
    pass

class CouldNotCreateDirectory(Error):
class CouldNotCreateTable(Error):
    pass

class CouldNotAppendToTable(Error):
    pass