-
Christopher Randolph Rhodes authoredChristopher Randolph Rhodes authored
util.py 3.26 KiB
from pathlib import Path
import re
from time import localtime, strftime
import pandas as pd
from model_server.accessors import InMemoryDataAccessor, write_accessor_data_to_file
def autonumber_new_directory(where: str, prefix: str) -> str:
yyyymmdd = strftime('%Y%m%d', localtime())
idx = 0
for ff in Path(where).iterdir():
ma = re.match(f'{prefix}-{yyyymmdd}-([\d]+)', ff.name)
if ma:
idx = max(idx, int(ma.groups()[0]) + 1)
new_path = (Path(where) / f'{prefix}-{yyyymmdd}-{idx:04d}')
new_path.mkdir(parents=True, exist_ok=False)
return new_path.__str__()
def autonumber_new_file(where: str, prefix: str, ext: str) -> str:
idx = 0
for ff in Path(where).iterdir():
ma = re.match(f'{prefix}-([\d]+).{ext}', ff.name)
if ma:
idx = max(idx, int(ma.groups()[0]) + 1)
return f'{prefix}-{idx:04d}.{ext}'
def get_matching_files(where: str, ext: str, coord_filter: dict={}) -> str:
files = []
def is_filtered_out(ff):
if ff.suffix.upper() != f'.{ext}'.upper():
return True
coords = {
m[0]: int(m[1]) for m in re.findall('-([a-zA-Z])(\d+)', ff.name)
}
for fk in coord_filter.keys():
if fk in coords.keys():
cmin, cmax = coord_filter[fk]
if coords[fk] < cmin or coords[fk] > cmax:
return True
return False
for ff in Path(where).iterdir():
if is_filtered_out(ff):
continue
files.append(ff.__str__())
return files
def loop_workflow(files, where_output, workflow_func, params,
write_intermediate_products=True):
failures = []
for ii, ff in enumerate(files):
export_kwargs = {
'input_zstack_path': ff,
'where_output': where_output,
**params,
}
# record failure information
try:
result = workflow_func(**export_kwargs)
except Exception as e:
failures.append({
'input_file': ff,
'error_message': e.__str__(),
})
print(f'Caught failure on {ff}:\n{e.__str__()}')
continue
# record dataframes associated with workflow results
batch_csv = {
'workflow_data': result['dataframe'],
'timer_results': pd.DataFrame(result['timer_results'], index=[0]),
'workflow_parameters': pd.json_normalize(export_kwargs),
}
for k in batch_csv.keys():
df = batch_csv[k]
df['input_file'] = ff
if ii == 0:
csv_args = {'mode': 'w', 'header': True}
else: # append to existing file
csv_args = {'mode': 'a', 'header': False}
csv_path = Path(where_output) / f'{k}.csv'
df.to_csv(csv_path, index=False, **csv_args)
# export intermediate data if flagged
if write_intermediate_products:
for k in result['interm'].keys():
write_accessor_data_to_file(
Path(where_output) / k / (Path(ff).stem + '.tif'),
InMemoryDataAccessor(result['interm'][k])
)
if len(failures) > 0:
pd.DataFrame(failures).to_csv(Path(where_output) / 'failures.csv')