Skip to content
Snippets Groups Projects
Commit fe434604 authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Merged in batch running utility functions

parent 2368e26d
No related branches found
No related tags found
No related merge requests found
from pathlib import Path
import re
from time import localtime, strftime
import pandas as pd
from model_server.accessors import InMemoryDataAccessor, write_accessor_data_to_file
def autonumber_new_directory(where: str, prefix: str) -> str:
"""
Create a new subdirectory with a unique name that includes today's date
:param where: path of top-level directory in which to create a subdirectory
:param prefix: prefix of new subdirectory's name
:return: path to newly created subdirectory
"""
Path(where).mkdir(parents=True, exist_ok=True)
yyyymmdd = strftime('%Y%m%d', localtime())
idx = 0
for ff in Path(where).iterdir():
ma = re.match(f'{prefix}-{yyyymmdd}-([\d]+)', ff.name)
if ma:
idx = max(idx, int(ma.groups()[0]) + 1)
new_path = (Path(where) / f'{prefix}-{yyyymmdd}-{idx:04d}')
new_path.mkdir(parents=True, exist_ok=False)
return new_path.__str__()
def autonumber_new_file(where: str, prefix: str, ext: str) -> str:
"""
Create a filename that is unique in the specified directory
:param where: path of top-level directory where new file should be
:param prefix: prefix of new file's name
:param ext: extension of new file, not including '.'
:return: full name of new file
"""
idx = 0
for ff in Path(where).iterdir():
ma = re.match(f'{prefix}-([\d]+).{ext}', ff.name)
if ma:
idx = max(idx, int(ma.groups()[0]) + 1)
return f'{prefix}-{idx:04d}.{ext}'
def get_matching_files(where: str, ext: str, coord_filter: dict={}) -> list:
"""
Return a list of files in the specified directory with the given extension
:param where: path of directory in which to search for files
:param ext: search only for files with this extension, not including '.'
:param coord_filter: (optional) return only filenames with dash-delimited coordinates in this range:
e.g. {'X': (5, 10)} includes file_X06.ext but neither file_X02.ext nor file_X10.ext
:return: list of paths to files
"""
files = []
def is_filtered_out(ff):
if ff.suffix.upper() != f'.{ext}'.upper():
return True
coords = {
m[0]: int(m[1]) for m in re.findall('-([a-zA-Z])(\d+)', ff.name)
}
for fk in coord_filter.keys():
if fk in coords.keys():
cmin, cmax = coord_filter[fk]
if coords[fk] < cmin or coords[fk] > cmax:
return True
return False
for ff in Path(where).iterdir():
if is_filtered_out(ff):
continue
files.append(ff.__str__())
return files
def loop_workflow(
files: list,
output_folder_path: str,
workflow_func: callable,
params: dict,
export_batch_csvs: bool = True,
write_intermediate_products: bool = True,
catch_and_continue: bool = True,
):
"""
Iteratively call the specified workflow function on each of a list of input files
:param files: list of filepaths
:param output_folder_path: path to top-level directory to which all results will be written
:param workflow_func: function where first two arguments are an input filename and an output directory
:param params: dictionary of keyword arguments that get passed to workflow_func
:param export_batch_csvs: if True, write any tabular data returned by workflow_func to CSV files
:param write_intermediate_products: if True, write any intermediate image products to TIF files
:param catch_and_continue: if True, catch exceptions returned by workflow_func and keep iterating
"""
failures = []
for ii, ff in enumerate(files):
export_kwargs = {
'input_file_path': ff,
'output_folder_path': output_folder_path,
**params,
}
# record failure information
try:
result = workflow_func(**export_kwargs)
except Exception as e:
if catch_and_continue:
failures.append({
'input_file': ff,
'error_message': e.__str__(),
})
print(f'Caught failure on {ff}:\n{e.__str__()}')
continue
else:
raise e
# record dataframes associated with workflow results
if export_batch_csvs:
batch_csv = {
'workflow_data': result['dataframe'],
'timer_results': pd.DataFrame(result['timer_results'], index=[0]),
'workflow_parameters': pd.json_normalize(export_kwargs),
}
for k in batch_csv.keys():
df = batch_csv[k]
df['input_file'] = ff
if ii == 0:
csv_args = {'mode': 'w', 'header': True}
else: # append to existing file
csv_args = {'mode': 'a', 'header': False}
csv_path = Path(output_folder_path) / f'{k}.csv'
df.to_csv(csv_path, index=False, **csv_args)
# export intermediate data if flagged
if write_intermediate_products:
for k in result['interm'].keys():
path = Path(output_folder_path) / k / (Path(ff).stem + '.tif')
path.parent.mkdir(parents=True, exist_ok=True)
write_accessor_data_to_file(
path,
InMemoryDataAccessor(result['interm'][k])
)
if len(failures) > 0:
pd.DataFrame(failures).to_csv(Path(output_folder_path) / 'failures.csv')
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment