Skip to content
Snippets Groups Projects
Commit 94ab89aa authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Read files from multiple locations with both pattern and explicit files

parent 662abdc4
No related branches found
No related tags found
No related merge requests found
import json import json
from pathlib import Path from pathlib import Path
from .py3 import HttpClient import pandas as pd
from .py3 import HttpClient, NonSuccessResponseError
class FileBatchRunnerClient(HttpClient): class FileBatchRunnerClient(HttpClient):
def __init__(self, conf_json: str, **kwargs): def __init__(self, conf_json: str, **kwargs):
...@@ -12,7 +15,7 @@ class FileBatchRunnerClient(HttpClient): ...@@ -12,7 +15,7 @@ class FileBatchRunnerClient(HttpClient):
with open(conf_json, 'r') as fh: with open(conf_json, 'r') as fh:
self.conf = json.load(fh) self.conf = json.load(fh)
self.input_acc_ids = [] self.input_files = self.get_files_dataframe()
return super().__init__(**kwargs) return super().__init__(**kwargs)
def message(self, message): def message(self, message):
...@@ -33,7 +36,8 @@ class FileBatchRunnerClient(HttpClient): ...@@ -33,7 +36,8 @@ class FileBatchRunnerClient(HttpClient):
raise(e) raise(e)
self.message('Verified server is online at: ' + self.uri) self.message('Verified server is online at: ' + self.uri)
def watch_path(self, key, path, make=True, verify=False): def watch_path(self, key, relpath, make=True, verify=False):
path = self.conf_root / relpath
if make: if make:
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
...@@ -60,30 +64,48 @@ class FileBatchRunnerClient(HttpClient): ...@@ -60,30 +64,48 @@ class FileBatchRunnerClient(HttpClient):
for v in self.conf['setup']: for v in self.conf['setup']:
resp = self.hit(**v) resp = self.hit(**v)
def read_files(self): def get_files_dataframe(self):
where = Path(self.conf['input']['directory']) paths = []
for inp in self.conf['inputs']:
loc = Path(inp['directory'])
# get explicit filenames
files = inp.get('files', [])
# get files by pattern
if pattern := inp.get('pattern'):
if pattern == '':
break
for f in list(loc.iterdir()):
if pattern.upper() in f.name.upper() and f.name not in files:
files.append(f.name)
paths = paths + [{'path': loc / f} for f in files]
df = pd.DataFrame(paths)
df['exists'] = df['path'].apply(lambda x: x.exists())
df['parent'] = df['path'].apply(lambda x: x.parent)
df['name'] = df['path'].apply(lambda x: x.name)
# TODO: raise if any files don't exist
return(df)
# get explicit filenames def read_files(self):
filelist = self.conf['input'].get('files', []) def _read(row):
try:
return self.put(f'accessors/read_from_file/{row.name}', query={'lazy': True}).json()
except NonSuccessResponseError:
return None
# get files by pattern self.input_files['accessor_id'] = self.input_files.apply(_read, axis=1)
if pattern := self.conf['input']['pattern']: self.input_files['success'] = self.input_files['accessor_id'].apply(lambda x: x is not None)
for f in list(where.iterdir()):
if pattern.upper() in f.name.upper() and f.name not in filelist:
filelist.append(f)
# iterate on files, assume all same directory # TODO: groupby directory
for fn in filelist:
self.input_acc_ids.append(
self.put(f'accessors/read_from_file/{fn}', query={'lazy': True}).json()
)
def queue_tasks(self): def queue_tasks(self):
for acc_id in self.input_acc_ids: def _task(acc_id):
for v in self.conf['analyze']: for v in self.conf['analyze']:
v['body']['schedule'] = True v['body']['schedule'] = True
v['body']['accessor_id'] = acc_id v['body']['accessor_id'] = acc_id
self.hit(**v) self.hit(**v)
self.input_files['accessor_id'].apply(_task)
def run_tasks(self): def run_tasks(self):
self.put('tasks/run_all', query={'write_all': True}) self.put('tasks/run_all', query={'write_all': True})
...@@ -98,5 +120,8 @@ class FileBatchRunnerClient(HttpClient): ...@@ -98,5 +120,8 @@ class FileBatchRunnerClient(HttpClient):
class Error(Exception): class Error(Exception):
pass pass
class WatchPathVerificationError(): class FileNotFoundError(Error):
pass
class WatchPathVerificationError(Error):
pass pass
\ No newline at end of file
...@@ -10,6 +10,7 @@ def parse_args(): ...@@ -10,6 +10,7 @@ def parse_args():
parser.add_argument( parser.add_argument(
'--json', '--json',
help='JSON file to configure batch job', help='JSON file to configure batch job',
required=True,
) )
parser.add_argument( parser.add_argument(
'--host', '--host',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment