Skip to content
Snippets Groups Projects
Commit 5ba5f929 authored by Christopher Randolph Rhodes's avatar Christopher Randolph Rhodes
Browse files

Implemented batch runner client, can lazy load accessors, still much to do

parent b28389d3
No related branches found
No related tags found
No related merge requests found
import json
from pathlib import Path
from typing import List
import requests
from ..base.accessors import FileNotFoundError
from .py3 import HttpClient
class FileBatchRunnerClient(HttpClient):
def __init__(self, conf_json: Path, **kwargs):
"""
Iterate on many files on the same directory
"""
with open(conf_json, 'r') as fh:
self.conf = json.load(fh)
# JSON: 'input': {'directory': <abspath>, 'files': [list of filenames]}
# directory is from server's perspective
# server_root = Path(self.conf['input']['directory'])
# TODO: check on client-side, too?
# local_root = None
# self.fpaths = [root / fps for fps in self.conf['input']['files']]
#
# for fp in self.fpaths:
# if not fp.exists():
# raise FileNotFoundError(f'Could not find file {fp}')
#
self.input_acc_ids = []
return super().__init__(**kwargs)
def message(self, message):
print(message)
def hit(self, method, endpoint, params=None, body=None):
resp = super(FileBatchRunnerClient, self).hit(method, endpoint, params=params, body=body)
if resp.status_code != 200:
self.message(f'Non-200 response from {endpoint}:\n{resp.text}')
return resp
def verify_server(self):
try:
# test server communication
self.get('')
except Exception as e:
self.message('Could not find server at: ' + self.uri)
raise(e)
self.message('Verified server is online at: ' + self.uri)
def setup_svlt(self):
# TODO: how to handle ilastik root?
for k, v in self.conf['setup']:
resp = self.hit(**v)
def read_files(self):
where = Path(self.conf['input']['directory'])
self.hit(
'put',
'paths/watch_input',
params={'path': where.__str__()},
)
# get explicit filenames
filelist = self.conf['input'].get('files', [])
# get files by pattern
if pattern := self.conf['input']['pattern']:
for f in list(where.iterdir()):
if pattern.upper() in f.name.upper() and f.name not in filelist:
filelist.append(f)
# TODO: handle multiple chunks?
# iterate on files, assume all same directory
for fn in filelist:
self.input_acc_ids.append(
self.put(f'accessors/read_from_file/{fn}', query={'lazy': True})
)
def queue_tasks(self):
for k, v in self.conf['analyze']:
pass
def run_tasks(self):
pass
# TODO: how to export data?
def run(self):
self.verify_server()
self.read_files()
self.setup()
\ No newline at end of file
import argparse
from model_server.clients.batch_runner import FileBatchRunnerClient
def parse_args():
parser = argparse.ArgumentParser(
description='Push batch analysis of image files to server',
)
parser.add_argument(
'--json',
help='JSON file to configure batch job',
)
parser.add_argument(
'--host',
default='127.0.0.1',
help='bind socket to this host'
)
parser.add_argument(
'--port',
default=8000,
help='bind socket to this port',
)
return parser.parse_args()
def main(args):
client = FileBatchRunnerClient(conf_json=args.json, host=args.host, port=args.port)
client.run()
return
if __name__ == '__main__':
args = parse_args()
print('CLI args:\n' + str(args))
main(args)
print('Finished')
\ No newline at end of file
cs = {
'paths': {
'ilastik': '.',
},
'input': {
'directory': 'Y:/TREC_STOP_26_Porto/MobileLab/LSM900/231026_automic/20231026-152512_lowzoom_data/LowZoom',
'files': [
],
'pattern': '.czi',
},
'setup': [
{
'description': 'Load an ilastik pixel classifier for segmentation',
'method': 'PUT',
'endpoint': 'ilastik/seg/load/',
'params': {
'model_id': 'px_seg_mod',
},
'body': {
'project_file': 'ilastik/px-2d-cAF405-10x.ilp',
'duplicate': False,
},
},
],
'analyze': [
{
'description': 'Run segmentation with ilastik model',
'method': 'PUT',
'endpoint': 'chaeo/with_derived_channels/infer',
'body': {
'api': False,
'keep_interm': True,
'model_id': 'px_seg_mod',
'channel': 0,
},
},
],
'teardown': [
],
}
if __name__ == '__main__':
import json
from pathlib import Path
root = Path('C:\\Users\\rhodes\\projects\\proj0015-model-server\\dev\\dev_serverside_batch')
fp = root / 'conf.json'
with open(fp, 'w') as fh:
json.dump(cs, fh, ensure_ascii=True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment