Skip to content
Snippets Groups Projects
Commit e170e845 authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Split out local and remot path roots, but successfully completed a test run on remote

parent 5d36fa57
No related branches found
No related tags found
No related merge requests found
from collections import OrderedDict
import json
from pathlib import Path
import shutil
import pandas as pd
......@@ -21,34 +20,27 @@ class FileBatchRunnerClient(HttpClient):
with open(conf_json, 'r') as fh:
self.conf = json.load(fh)
self.paths = {
'conf': Path(self.conf['paths']['conf']),
'input': Path(self.conf['paths']['input']),
'output': Path(self.conf['paths']['output']),
}
for pa in self.paths.values():
pa.mkdir(parents=True, exist_ok=True)
self.local_paths = {k: Path(v) for k, v in self.conf['paths']['local'].items()}
self.remote_paths = {k: Path(v).as_posix() for k, v in self.conf['paths']['remote'].items()}
shutil.copy(conf_json, self.paths['output'] / conf_json.name)
for pa in self.local_paths.values():
pa.mkdir(parents=True, exist_ok=True)
self.stacks = self.get_stacks(max_count=max_count)
self.tasks = {}
self.write_df()
if not self.stacks['exists'].all():
raise FileNotFoundError(f'Found non-existent files, described in {self.pa_csv}')
raise FileNotFoundError(f'Trying to process non-existent image files')
return super().__init__(**kwargs)
def message(self, message):
print(message)
def write_df(self):
for rt in [self.conf_root, self.paths['output']]:
pa = rt / 'filelist.csv'
self.stacks.to_csv(pa)
self.message(f'Wrote stacks table to {pa}.')
pa = self.conf_root / 'filelist.csv'
self.stacks.to_csv(pa)
self.message(f'Wrote stacks table to {pa}.')
def hit(self, method, endpoint, params=None, body=None, catch=True, **kwargs):
resp = super(FileBatchRunnerClient, self).hit(method, endpoint, params=params, body=body)
......@@ -70,17 +62,18 @@ class FileBatchRunnerClient(HttpClient):
raise(e)
self.message('Verified server is online at: ' + self.uri)
def watch_path(self, key, path, make=True, verify=False):
def watch_path(self, key, remote_path, local_path, make=True, verify=False, catch=False):
if make:
path.mkdir(parents=True, exist_ok=True)
local_path.mkdir(parents=True, exist_ok=True)
touch_uuid = self.hit(
'put',
f'/paths/watch_{key}',
params={'path': path.__str__(), 'touch': verify}
params={'path': remote_path.__str__(), 'touch': verify},
catch=catch
)
if verify:
pa_touch = path / 'svlt.touch'
pa_touch = local_path / 'svlt.touch'
try:
with open(pa_touch, 'r') as fh:
cont = fh.read()
......@@ -88,12 +81,33 @@ class FileBatchRunnerClient(HttpClient):
pa_touch.unlink()
except Exception as e:
raise WatchPathVerificationError(e)
self.message(f'Watching {path} for {key} data')
def setup(self):
self.watch_path('conf', self.paths['conf'], verify=True, make=False)
self.watch_path('output', self.paths['output'], verify=True, make=True)
self.watch_path('input', self.paths['input'], verify=False, make=False)
self.message(f'Watching {remote_path} (remote), {local_path} (local) for {key} data')
def setup(self, catch=True,):
self.watch_path(
'conf',
self.remote_paths['conf'],
self.local_paths['conf'],
verify=False,
make=False,
catch=catch,
)
self.watch_path(
'output',
self.remote_paths['output'],
self.local_paths['output'],
verify=False,
make=True,
catch=catch,
)
self.watch_path(
'input',
self.remote_paths['input'],
self.local_paths['input'],
verify=False,
make=False,
catch=catch,
)
for v in self.conf['setup']:
resp = self.hit(**v, catch=False)
......@@ -102,7 +116,7 @@ class FileBatchRunnerClient(HttpClient):
def get_stacks(self, max_count=None):
paths = []
for inp in self.conf['inputs']:
loc = Path(self.paths['input']) / inp['directory']
where_local = Path(self.local_paths['input']) / inp['directory']
# get explicit filenames
files = inp.get('files', [])
......@@ -111,20 +125,28 @@ class FileBatchRunnerClient(HttpClient):
if pattern := inp.get('pattern'):
if pattern == '':
break
for f in list(loc.iterdir()):
for f in list(where_local.iterdir()):
if pattern.upper() in f.name.upper() and f.name not in files:
files.append(f.name)
is_multiposition = inp.get('multiposition', False)
paths = paths + [{'path': loc / f, 'is_multiposition': is_multiposition} for f in files]
where_remote = Path(self.remote_paths['input']) / inp['directory']
def _get_file_info(filename):
return {
'remote_path': (where_remote / filename).as_posix(),
'local_path': where_local / filename,
'is_multiposition': is_multiposition,
}
paths = paths + [_get_file_info(f) for f in files]
if max_count is not None:
df = pd.DataFrame(paths).head(min(max_count, len(paths)))
else:
df = pd.DataFrame(paths)
if len(df) == 0:
raise EmptyFileListError('No files were found')
df['exists'] = df['path'].apply(lambda x: x.exists())
df['parent'] = df['path'].apply(lambda x: x.parent)
df['filename'] = df['path'].apply(lambda x: x.name)
df['exists'] = df['local_path'].apply(lambda x: x.exists())
df['parent'] = df['local_path'].apply(lambda x: x.parent)
df['filename'] = df['local_path'].apply(lambda x: x.name)
df['accessor_id'] = None
self.message(f'Found {len(df)} input files.')
return df
......@@ -140,8 +162,15 @@ class FileBatchRunnerClient(HttpClient):
return None
accessor_ids = []
for pa_dir, df_gb in self.stacks.groupby('parent'):
self.watch_path('input', self.paths['input'] / pa_dir, verify=False, make=False)
for loc_pa, df_gb in self.stacks.groupby('parent'):
pa_dir = loc_pa.relative_to(self.local_paths['input']).as_posix()
self.watch_path(
'input',
(Path(self.remote_paths['input']) / pa_dir).as_posix(),
self.local_paths['input'] / pa_dir,
verify=False,
make=False
)
df_gb['accessor_id'] = df_gb.apply(_read, axis=1)
df_gb['position'] = df_gb['accessor_id'].apply(lambda x: range(0, len(x)))
df_gb = df_gb.explode(['accessor_id', 'position'])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment