diff --git a/model_server/util.py b/model_server/util.py new file mode 100644 index 0000000000000000000000000000000000000000..8bc071ab056570a594658a0be4fa99eeb9c875dd --- /dev/null +++ b/model_server/util.py @@ -0,0 +1,143 @@ +from pathlib import Path +import re +from time import localtime, strftime + +import pandas as pd + +from model_server.accessors import InMemoryDataAccessor, write_accessor_data_to_file + +def autonumber_new_directory(where: str, prefix: str) -> str: + """ + Create a new subdirectory with a unique name that includes today's date + :param where: path of top-level directory in which to create a subdirectory + :param prefix: prefix of new subdirectory's name + :return: path to newly created subdirectory + """ + Path(where).mkdir(parents=True, exist_ok=True) + yyyymmdd = strftime('%Y%m%d', localtime()) + + idx = 0 + for ff in Path(where).iterdir(): + ma = re.match(f'{prefix}-{yyyymmdd}-([\d]+)', ff.name) + if ma: + idx = max(idx, int(ma.groups()[0]) + 1) + new_path = (Path(where) / f'{prefix}-{yyyymmdd}-{idx:04d}') + new_path.mkdir(parents=True, exist_ok=False) + return new_path.__str__() + +def autonumber_new_file(where: str, prefix: str, ext: str) -> str: + """ + Create a filename that is unique in the specified directory + :param where: path of top-level directory where new file should be + :param prefix: prefix of new file's name + :param ext: extension of new file, not including '.' + :return: full name of new file + """ + idx = 0 + for ff in Path(where).iterdir(): + ma = re.match(f'{prefix}-([\d]+).{ext}', ff.name) + if ma: + idx = max(idx, int(ma.groups()[0]) + 1) + return f'{prefix}-{idx:04d}.{ext}' + +def get_matching_files(where: str, ext: str, coord_filter: dict={}) -> list: + """ + Return a list of files in the specified directory with the given extension + :param where: path of directory in which to search for files + :param ext: search only for files with this extension, not including '.' + :param coord_filter: (optional) return only filenames with dash-delimited coordinates in this range: + e.g. {'X': (5, 10)} includes file_X06.ext but neither file_X02.ext nor file_X10.ext + :return: list of paths to files + """ + files = [] + + def is_filtered_out(ff): + if ff.suffix.upper() != f'.{ext}'.upper(): + return True + coords = { + m[0]: int(m[1]) for m in re.findall('-([a-zA-Z])(\d+)', ff.name) + } + for fk in coord_filter.keys(): + if fk in coords.keys(): + cmin, cmax = coord_filter[fk] + if coords[fk] < cmin or coords[fk] > cmax: + return True + return False + + for ff in Path(where).iterdir(): + if is_filtered_out(ff): + continue + files.append(ff.__str__()) + return files + + +def loop_workflow( + files: list, + output_folder_path: str, + workflow_func: callable, + params: dict, + export_batch_csvs: bool = True, + write_intermediate_products: bool = True, + catch_and_continue: bool = True, +): + """ + Iteratively call the specified workflow function on each of a list of input files + :param files: list of filepaths + :param output_folder_path: path to top-level directory to which all results will be written + :param workflow_func: function where first two arguments are an input filename and an output directory + :param params: dictionary of keyword arguments that get passed to workflow_func + :param export_batch_csvs: if True, write any tabular data returned by workflow_func to CSV files + :param write_intermediate_products: if True, write any intermediate image products to TIF files + :param catch_and_continue: if True, catch exceptions returned by workflow_func and keep iterating + """ + failures = [] + for ii, ff in enumerate(files): + export_kwargs = { + 'input_file_path': ff, + 'output_folder_path': output_folder_path, + **params, + } + + # record failure information + try: + result = workflow_func(**export_kwargs) + except Exception as e: + if catch_and_continue: + failures.append({ + 'input_file': ff, + 'error_message': e.__str__(), + }) + print(f'Caught failure on {ff}:\n{e.__str__()}') + continue + else: + raise e + + # record dataframes associated with workflow results + if export_batch_csvs: + batch_csv = { + 'workflow_data': result['dataframe'], + 'timer_results': pd.DataFrame(result['timer_results'], index=[0]), + 'workflow_parameters': pd.json_normalize(export_kwargs), + } + for k in batch_csv.keys(): + df = batch_csv[k] + df['input_file'] = ff + if ii == 0: + csv_args = {'mode': 'w', 'header': True} + else: # append to existing file + csv_args = {'mode': 'a', 'header': False} + csv_path = Path(output_folder_path) / f'{k}.csv' + df.to_csv(csv_path, index=False, **csv_args) + + # export intermediate data if flagged + if write_intermediate_products: + for k in result['interm'].keys(): + path = Path(output_folder_path) / k / (Path(ff).stem + '.tif') + path.parent.mkdir(parents=True, exist_ok=True) + write_accessor_data_to_file( + path, + InMemoryDataAccessor(result['interm'][k]) + ) + + if len(failures) > 0: + pd.DataFrame(failures).to_csv(Path(output_folder_path) / 'failures.csv') \ No newline at end of file