From fe43460409efcfc394eb1df8dcd9ee3e975086ae Mon Sep 17 00:00:00 2001
From: Christopher Rhodes <christopher.rhodes@embl.de>
Date: Wed, 25 Oct 2023 14:23:32 +0200
Subject: [PATCH] Merged in batch running utility functions

---
 model_server/util.py | 143 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 143 insertions(+)
 create mode 100644 model_server/util.py

diff --git a/model_server/util.py b/model_server/util.py
new file mode 100644
index 00000000..8bc071ab
--- /dev/null
+++ b/model_server/util.py
@@ -0,0 +1,143 @@
+from pathlib import Path
+import re
+from time import localtime, strftime
+
+import pandas as pd
+
+from model_server.accessors import InMemoryDataAccessor, write_accessor_data_to_file
+
+def autonumber_new_directory(where: str, prefix: str) -> str:
+    """
+    Create a new subdirectory with a unique name that includes today's date
+    :param where: path of top-level directory in which to create a subdirectory
+    :param prefix: prefix of new subdirectory's name
+    :return: path to newly created subdirectory
+    """
+    Path(where).mkdir(parents=True, exist_ok=True)
+    yyyymmdd = strftime('%Y%m%d', localtime())
+
+    idx = 0
+    for ff in Path(where).iterdir():
+        ma = re.match(f'{prefix}-{yyyymmdd}-([\d]+)', ff.name)
+        if ma:
+            idx = max(idx, int(ma.groups()[0]) + 1)
+    new_path = (Path(where) / f'{prefix}-{yyyymmdd}-{idx:04d}')
+    new_path.mkdir(parents=True, exist_ok=False)
+    return new_path.__str__()
+
+def autonumber_new_file(where: str, prefix: str, ext: str) -> str:
+    """
+    Create a filename that is unique in the specified directory
+    :param where: path of top-level directory where new file should be
+    :param prefix: prefix of new file's name
+    :param ext: extension of new file, not including '.'
+    :return: full name of new file
+    """
+    idx = 0
+    for ff in Path(where).iterdir():
+        ma = re.match(f'{prefix}-([\d]+).{ext}', ff.name)
+        if ma:
+            idx = max(idx, int(ma.groups()[0]) + 1)
+    return f'{prefix}-{idx:04d}.{ext}'
+
+def get_matching_files(where: str, ext: str, coord_filter: dict={}) -> list:
+    """
+    Return a list of files in the specified directory with the given extension
+    :param where: path of directory in which to search for files
+    :param ext: search only for files with this extension, not including '.'
+    :param coord_filter: (optional) return only filenames with dash-delimited coordinates in this range:
+        e.g. {'X': (5, 10)} includes file_X06.ext but neither file_X02.ext nor file_X10.ext
+    :return: list of paths to files
+    """
+    files = []
+
+    def is_filtered_out(ff):
+        if ff.suffix.upper() != f'.{ext}'.upper():
+            return True
+        coords = {
+            m[0]: int(m[1]) for m in re.findall('-([a-zA-Z])(\d+)', ff.name)
+        }
+        for fk in coord_filter.keys():
+            if fk in coords.keys():
+                cmin, cmax = coord_filter[fk]
+                if coords[fk] < cmin or coords[fk] > cmax:
+                    return True
+        return False
+
+    for ff in Path(where).iterdir():
+        if is_filtered_out(ff):
+            continue
+        files.append(ff.__str__())
+    return files
+
+
+def loop_workflow(
+        files: list,
+        output_folder_path: str,
+        workflow_func: callable,
+        params: dict,
+        export_batch_csvs: bool = True,
+        write_intermediate_products: bool = True,
+        catch_and_continue: bool = True,
+):
+    """
+    Iteratively call the specified workflow function on each of a list of input files
+    :param files: list of filepaths
+    :param output_folder_path: path to top-level directory to which all results will be written
+    :param workflow_func: function where first two arguments are an input filename and an output directory
+    :param params: dictionary of keyword arguments that get passed to workflow_func
+    :param export_batch_csvs: if True, write any tabular data returned by workflow_func to CSV files
+    :param write_intermediate_products: if True, write any intermediate image products to TIF files
+    :param catch_and_continue: if True, catch exceptions returned by workflow_func and keep iterating
+    """
+    failures = []
+    for ii, ff in enumerate(files):
+        export_kwargs = {
+            'input_file_path': ff,
+            'output_folder_path': output_folder_path,
+            **params,
+        }
+
+        # record failure information
+        try:
+            result = workflow_func(**export_kwargs)
+        except Exception as e:
+            if catch_and_continue:
+                failures.append({
+                    'input_file': ff,
+                    'error_message': e.__str__(),
+                })
+                print(f'Caught failure on {ff}:\n{e.__str__()}')
+                continue
+            else:
+                raise e
+
+        # record dataframes associated with workflow results
+        if export_batch_csvs:
+            batch_csv = {
+                'workflow_data': result['dataframe'],
+                'timer_results': pd.DataFrame(result['timer_results'], index=[0]),
+                'workflow_parameters': pd.json_normalize(export_kwargs),
+            }
+            for k in batch_csv.keys():
+                df = batch_csv[k]
+                df['input_file'] = ff
+                if ii == 0:
+                    csv_args = {'mode': 'w', 'header': True}
+                else:  # append to existing file
+                    csv_args = {'mode': 'a', 'header': False}
+                csv_path = Path(output_folder_path) / f'{k}.csv'
+                df.to_csv(csv_path, index=False, **csv_args)
+
+        # export intermediate data if flagged
+        if write_intermediate_products:
+            for k in result['interm'].keys():
+                path = Path(output_folder_path) / k / (Path(ff).stem + '.tif')
+                path.parent.mkdir(parents=True, exist_ok=True)
+                write_accessor_data_to_file(
+                    path,
+                    InMemoryDataAccessor(result['interm'][k])
+                )
+
+    if len(failures) > 0:
+        pd.DataFrame(failures).to_csv(Path(output_folder_path) / 'failures.csv')
\ No newline at end of file
-- 
GitLab