import json from pathlib import Path from typing import List import requests from ..base.accessors import FileNotFoundError from .py3 import HttpClient class FileBatchRunnerClient(HttpClient): def __init__(self, conf_json: str, **kwargs): """ Iterate on many files on the same directory """ self.conf_root = Path(conf_json).parent with open(conf_json, 'r') as fh: self.conf = json.load(fh) # JSON: 'input': {'directory': <abspath>, 'files': [list of filenames]} # directory is from server's perspective # server_root = Path(self.conf['input']['directory']) # TODO: check on client-side, too? # local_root = None # self.fpaths = [root / fps for fps in self.conf['input']['files']] # # for fp in self.fpaths: # if not fp.exists(): # raise FileNotFoundError(f'Could not find file {fp}') # self.input_acc_ids = [] return super().__init__(**kwargs) def message(self, message): print(message) def hit(self, method, endpoint, params=None, body=None, **kwargs): resp = super(FileBatchRunnerClient, self).hit(method, endpoint, params=params, body=body) if resp.status_code != 200: self.message(f'Non-200 response from {endpoint}:\n{resp.text}') return resp def verify_server(self): try: # test server communication self.get('') except Exception as e: self.message('Could not find server at: ' + self.uri) raise(e) self.message('Verified server is online at: ' + self.uri) def setup(self): self.hit( 'put', '/paths/watch_conf', params={'path': self.conf_root.__str__()} ) # TODO: how to handle ilastik root? for v in self.conf['setup']: resp = self.hit(**v) def read_files(self): where = Path(self.conf['input']['directory']) self.hit( 'put', 'paths/watch_input', params={'path': where.__str__()}, ) # get explicit filenames filelist = self.conf['input'].get('files', []) # get files by pattern if pattern := self.conf['input']['pattern']: for f in list(where.iterdir()): if pattern.upper() in f.name.upper() and f.name not in filelist: filelist.append(f) # TODO: handle multiple chunks? # iterate on files, assume all same directory for fn in filelist: self.input_acc_ids.append( self.put(f'accessors/read_from_file/{fn}', query={'lazy': True}).json() ) def queue_tasks(self): for acc_id in self.input_acc_ids: for v in self.conf['analyze']: v['body']['schedule'] = True v['body']['accessor_id'] = acc_id self.hit(**v) def run_tasks(self): self.put('tasks/run_all', query={'write_all': True}) # TODO: how to export data? def run(self): self.verify_server() self.read_files() self.setup() self.queue_tasks() self.run_tasks()