Skip to content
Snippets Groups Projects
util.py 3.26 KiB
from pathlib import Path
import re
from time import localtime, strftime

import pandas as pd

from model_server.accessors import InMemoryDataAccessor, write_accessor_data_to_file

def autonumber_new_directory(where: str, prefix: str) -> str:
    yyyymmdd = strftime('%Y%m%d', localtime())

    idx = 0
    for ff in Path(where).iterdir():
        ma = re.match(f'{prefix}-{yyyymmdd}-([\d]+)', ff.name)
        if ma:
            idx = max(idx, int(ma.groups()[0]) + 1)
    new_path = (Path(where) / f'{prefix}-{yyyymmdd}-{idx:04d}')
    new_path.mkdir(parents=True, exist_ok=False)
    return new_path.__str__()

def autonumber_new_file(where: str, prefix: str, ext: str) -> str:
    idx = 0
    for ff in Path(where).iterdir():
        ma = re.match(f'{prefix}-([\d]+).{ext}', ff.name)
        if ma:
            idx = max(idx, int(ma.groups()[0]) + 1)
    return f'{prefix}-{idx:04d}.{ext}'

def get_matching_files(where: str, ext: str, coord_filter: dict={}) -> str:
    files = []

    def is_filtered_out(ff):
        if ff.suffix.upper() != f'.{ext}'.upper():
            return True
        coords = {
            m[0]: int(m[1]) for m in re.findall('-([a-zA-Z])(\d+)', ff.name)
        }
        for fk in coord_filter.keys():
            if fk in coords.keys():
                cmin, cmax = coord_filter[fk]
                if coords[fk] < cmin or coords[fk] > cmax:
                    return True
        return False

    for ff in Path(where).iterdir():
        if is_filtered_out(ff):
            continue
        files.append(ff.__str__())
    return files


def loop_workflow(files, where_output, workflow_func, params,
                  write_intermediate_products=True):
    failures = []
    for ii, ff in enumerate(files):
        export_kwargs = {
            'input_zstack_path': ff,
            'where_output': where_output,
            **params,
        }

        # record failure information
        try:
            result = workflow_func(**export_kwargs)
        except Exception as e:
            failures.append({
                'input_file': ff,
                'error_message': e.__str__(),
            })
            print(f'Caught failure on {ff}:\n{e.__str__()}')
            continue

        # record dataframes associated with workflow results
        batch_csv = {
            'workflow_data': result['dataframe'],
            'timer_results': pd.DataFrame(result['timer_results'], index=[0]),
            'workflow_parameters': pd.json_normalize(export_kwargs),
        }
        for k in batch_csv.keys():
            df = batch_csv[k]
            df['input_file'] = ff
            if ii == 0:
                csv_args = {'mode': 'w', 'header': True}
            else:  # append to existing file
                csv_args = {'mode': 'a', 'header': False}
            csv_path = Path(where_output) / f'{k}.csv'
            df.to_csv(csv_path, index=False, **csv_args)

        # export intermediate data if flagged
        if write_intermediate_products:
            for k in result['interm'].keys():
                write_accessor_data_to_file(
                    Path(where_output) / k / (Path(ff).stem + '.tif'),
                    InMemoryDataAccessor(result['interm'][k])
                )

    if len(failures) > 0:
        pd.DataFrame(failures).to_csv(Path(where_output) / 'failures.csv')