diff --git a/model_server/conf/testing.py b/model_server/conf/testing.py index f247638f96699002d850dfb5873b20c1e10f7cdb..10a2eafbb03f87fd00d305b47c12e1b67a79e92d 100644 --- a/model_server/conf/testing.py +++ b/model_server/conf/testing.py @@ -89,23 +89,42 @@ class TestServerBaseClass(unittest.TestCase): sesh.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries)) return sesh - def _get(self, endpoint): - return self._get_sesh().get(self.uri + endpoint) + def assertGetSuccess(self, endpoint): + resp = self._get_sesh().get(self.uri + endpoint) + self.assertEqual(resp.status_code, 200, resp.text) + return resp.json() + + def assertGetFailure(self, endpoint, code): + resp = self._get_sesh().get(self.uri + endpoint) + self.asserEqual(resp.status_code, code) + return resp + + def assertPutSuccess(self, endpoint, query=None, body=None): + resp = self._get_sesh().put( + self.uri + endpoint, + params=query, + data=json.dumps(body) + ) + self.assertEqual(resp.status_code, 200, resp.text) + return resp.json() - def _put(self, endpoint, query=None, body=None): - return self._get_sesh().put( + def assertPutFailure(self, endpoint, code, query=None, body=None): + resp = self._get_sesh().put( self.uri + endpoint, params=query, data=json.dumps(body) ) + self.assertEqual(resp.status_code, code) + return resp + def tearDown(self) -> None: self.server_process.terminate() self.server_process.join() def copy_input_file_to_server(self): - resp = self._get('paths') - pa = resp.json()['inbound_images'] + r = self.assertGetSuccess('paths') + pa = r['inbound_images'] copyfile( self.input_data['path'], Path(pa) / self.input_data['name'] @@ -113,14 +132,12 @@ class TestServerBaseClass(unittest.TestCase): return self.input_data['name'] def get_accessor(self, accessor_id, filename=None): - resp = self._put(f'/accessors/write_to_file/{accessor_id}', query={'filename': filename}) - where_out = self._get('paths').json()['outbound_images'] - fp_out = (Path(where_out) / resp.json()) + r = self.assertPutSuccess(f'/accessors/write_to_file/{accessor_id}', query={'filename': filename}) + where_out = self.assertGetSuccess('paths').json()['outbound_images'] + fp_out = (Path(where_out) / r) self.assertTrue(fp_out.exists()) return generate_file_accessor(fp_out) - # TODO: assert 200, return JSON if true, print(resp.text) if false - def setup_test_data(): """ diff --git a/tests/base/test_api.py b/tests/base/test_api.py index 74c6a79e5dda0df8073335ce38205b22c9d570d2..0698d701b6c37018b54c9724b00404b6568cc0ee 100644 --- a/tests/base/test_api.py +++ b/tests/base/test_api.py @@ -11,89 +11,77 @@ class TestApiFromAutomatedClient(TestServerBaseClass): input_data = czifile def test_trivial_api_response(self): - resp = self._get('') - self.assertEqual(resp.status_code, 200) + self.assertGetSuccess('') def test_bounceback_parameters(self): - resp = self._put('testing/bounce_back', body={'par1': 'hello', 'par2': ['ab', 'cd']}) - self.assertEqual(resp.status_code, 200, resp.content) - self.assertEqual(resp.json()['params']['par1'], 'hello', resp.json()) - self.assertEqual(resp.json()['params']['par2'], ['ab', 'cd'], resp.json()) + r = self.assertPutSuccess('testing/bounce_back', body={'par1': 'hello', 'par2': ['ab', 'cd']}) + self.assertEqual(r['params']['par1'], 'hello', r) + self.assertEqual(r['params']['par2'], ['ab', 'cd'], r) def test_default_session_paths(self): import model_server.conf.defaults - resp = self._get('paths') + r = self.assertGetSuccess('paths') conf_root = model_server.conf.defaults.root for p in ['inbound_images', 'outbound_images', 'logs']: - self.assertTrue(resp.json()[p].startswith(conf_root.__str__())) + self.assertTrue(r[p].startswith(conf_root.__str__())) suffix = Path(model_server.conf.defaults.subdirectories[p]).__str__() - self.assertTrue(resp.json()[p].endswith(suffix)) + self.assertTrue(r[p].endswith(suffix)) + # TODO: was this even passing before? def test_list_empty_loaded_models(self): - resp = self._get('models') - self.assertEqual(resp.status_code, 200) - self.assertEqual(resp.content, b'{}') + r = self.assertGetSuccess('models') + # self.assertEqual(resp.status_code, 200) + self.assertEqual(r, b'{}') def test_load_dummy_semantic_model(self): - resp_load = self._put(f'testing/models/dummy_semantic/load') - model_id = resp_load.json()['model_id'] - self.assertEqual(resp_load.status_code, 200, resp_load.json()) - resp_list = self._get('models') - self.assertEqual(resp_list.status_code, 200) - rj = resp_list.json() - self.assertEqual(rj[model_id]['class'], 'DummySemanticSegmentationModel') - return model_id + mid = self.assertPutSuccess(f'testing/models/dummy_semantic/load')['model_id'] + rl = self.assertGetSuccess('models') + self.assertEqual(rl[mid]['class'], 'DummySemanticSegmentationModel') + return mid def test_load_dummy_instance_model(self): - resp_load = self._put(f'testing/models/dummy_instance/load') - model_id = resp_load.json()['model_id'] - self.assertEqual(resp_load.status_code, 200, resp_load.json()) - resp_list = self._get('models') - self.assertEqual(resp_list.status_code, 200) - rj = resp_list.json() - self.assertEqual(rj[model_id]['class'], 'DummyInstanceSegmentationModel') - return model_id + mid = self.assertPutSuccess(f'testing/models/dummy_instance/load')['model_id'] + rl = self.assertGetSuccess('models') + self.assertEqual(rl[mid]['class'], 'DummyInstanceSegmentationModel') + return mid def test_respond_with_error_when_invalid_filepath_requested(self): - model_id = self.test_load_dummy_semantic_model() - - resp = self._put( + self.assertPutFailure( f'infer/from_image_file', - query={'model_id': model_id, 'input_filename': 'not_a_real_file.name'} + 404, + query={ + 'model_id': self.test_load_dummy_semantic_model(), + 'input_filename': 'not_a_real_file.name', + } ) - self.assertEqual(resp.status_code, 404, resp.content.decode()) def test_pipeline_errors_when_ids_not_found(self): fname = self.copy_input_file_to_server() - model_id = self._put(f'testing/models/dummy_semantic/load').json()['model_id'] - in_acc_id = self._put(f'accessors/read_from_file/{fname}').json() + model_id = self.assertPutSuccess(f'testing/models/dummy_semantic/load')['model_id'] + in_acc_id = self.assertPutSuccess(f'accessors/read_from_file/{fname}') # respond with 409 for invalid accessor_id - self.assertEqual( - self._put( - f'pipelines/segment', - body={'model_id': model_id, 'accessor_id': 'fake'} - ).status_code, - 409 + self.assertPutFailure( + f'pipelines/segment', + 409, + body={'model_id': model_id, 'accessor_id': 'fake'}, ) # respond with 409 for invalid model_id - self.assertEqual( - self._put( - f'pipelines/segment', - body={'model_id': 'fake', 'accessor_id': in_acc_id} - ).status_code, - 409 + self.assertPutFailure( + f'pipelines/segment', + 409, + body={'model_id': 'fake', 'accessor_id': in_acc_id} ) - + # TODO: this errors because it's overwriting, seems maybe session isn't refreshing each time def test_i2i_dummy_inference_by_api(self): fname = self.copy_input_file_to_server() - model_id = self._put(f'testing/models/dummy_semantic/load').json()['model_id'] - in_acc_id = self._put(f'accessors/read_from_file/{fname}').json() + model_id = self.assertPutSuccess(f'testing/models/dummy_semantic/load')['model_id'] + in_acc_id = self.assertPutSuccess(f'accessors/read_from_file/{fname}') # run segmentation pipeline on preloaded accessor - resp_infer = self._put( + r = self.assertPutSuccess( f'pipelines/segment', body={ 'accessor_id': in_acc_id, @@ -102,62 +90,56 @@ class TestApiFromAutomatedClient(TestServerBaseClass): 'keep_interm': True, }, ) - self.assertEqual(resp_infer.status_code, 200, resp_infer.content.decode()) - out_acc_id = resp_infer.json()['output_accessor_id'] - self.assertTrue(self._get(f'accessors/{out_acc_id}').json()['loaded']) + out_acc_id = r['output_accessor_id'] + self.assertTrue(self.assertGetSuccess(f'accessors/{out_acc_id}')['loaded']) acc_out = self.get_accessor(out_acc_id, 'dummy_semantic_output.tif') self.assertEqual(acc_out.shape_dict['C'], 1) # validate intermediate data - resp_list = self._get(f'accessors').json() + resp_list = self.assertGetSuccess(f'accessors') self.assertEqual(len([k for k in resp_list.keys() if '_step' in k]), 2) def test_restarting_session_clears_loaded_models(self): - resp_load = self._put(f'testing/models/dummy_semantic/load',) - self.assertEqual(resp_load.status_code, 200, resp_load.json()) - resp_list_0 = self._get('models') - self.assertEqual(resp_list_0.status_code, 200) - rj0 = resp_list_0.json() - self.assertEqual(len(rj0), 1, f'Unexpected models in response: {rj0}') - resp_restart = self._get('session/restart') - resp_list_1 = self._get('models') - rj1 = resp_list_1.json() - self.assertEqual(len(rj1), 0, f'Unexpected models in response: {rj1}') + self.assertPutSuccess(f'testing/models/dummy_semantic/load') + rl0 = self.assertGetSuccess('models') + self.assertEqual(len(rl0), 1, f'Unexpected models in response: {rl0}') + self.assertGetSuccess('session/restart') + rl1 = self.assertGetSuccess('models') + self.assertEqual(len(rl1), 0, f'Unexpected models in response: {rl1}') def test_change_inbound_path(self): - resp_inpath = self._get('paths') - resp_change = self._put( + r_inpath = self.assertGetSuccess('paths') + self.assertPutSuccess( f'paths/watch_output', - query={'path': resp_inpath.json()['inbound_images']} + query={'path': r_inpath['inbound_images']} ) - self.assertEqual(resp_change.status_code, 200) - resp_check = self._get('paths') - self.assertEqual(resp_check.json()['inbound_images'], resp_check.json()['outbound_images']) + r_check = self.assertGetSuccess('paths') + self.assertEqual(r_check['inbound_images'], r_check['outbound_images']) def test_exception_when_changing_inbound_path(self): - resp_inpath = self._get('paths') + r_inpath = self.assertGetSuccess('paths') fakepath = 'c:/fake/path/to/nowhere' - resp_change = self._put( + r_change = self.assertPutFailure( f'paths/watch_output', - query={'path': fakepath} + 404, + query={'path': fakepath}, ) - self.assertEqual(resp_change.status_code, 404) - self.assertIn(fakepath, resp_change.json()['detail']) - resp_check = self._get('paths') - self.assertEqual(resp_inpath.json()['outbound_images'], resp_check.json()['outbound_images']) + self.assertIn(fakepath, r_change.json()['detail']) + r_check = self.assertGetSuccess('paths') + self.assertEqual(r_inpath['outbound_images'], r_check['outbound_images']) def test_no_change_inbound_path(self): - resp_inpath = self._get('paths') - resp_change = self._put( + resp_inpath = self.assertGetSuccess('paths') + resp_change = self.assertPutSuccess( f'paths/watch_output', query={'path': resp_inpath.json()['outbound_images']} ) self.assertEqual(resp_change.status_code, 200) - resp_check = self._get('paths') + resp_check = self.assertGetSuccess('paths') self.assertEqual(resp_inpath.json()['outbound_images'], resp_check.json()['outbound_images']) def test_get_logs(self): - resp = self._get('session/logs') + resp = self.assertGetSuccess('session/logs') self.assertEqual(resp.status_code, 200) self.assertEqual(resp.json()[0]['message'], 'Initialized session') @@ -165,14 +147,14 @@ class TestApiFromAutomatedClient(TestServerBaseClass): fname = self.copy_input_file_to_server() # add accessor to session - resp_add_acc = self._put( + resp_add_acc = self.assertPutSuccess( f'accessors/read_from_file/{fname}', ) acc_id = resp_add_acc.json() self.assertTrue(acc_id.startswith('auto_')) # confirm that accessor is listed in session context - resp_list_acc = self._get( + resp_list_acc = self.assertGetSuccess( f'accessors', ) self.assertEqual(len(resp_list_acc.json()), 1) @@ -180,31 +162,31 @@ class TestApiFromAutomatedClient(TestServerBaseClass): self.assertTrue(resp_list_acc.json()[acc_id]['loaded']) # delete and check that its 'loaded' state changes - self.assertTrue(self._get(f'accessors/{acc_id}').json()['loaded']) - self.assertEqual(self._get(f'accessors/delete/{acc_id}').json(), acc_id) - self.assertFalse(self._get(f'accessors/{acc_id}').json()['loaded']) + self.assertTrue(self.assertGetSuccess(f'accessors/{acc_id}').json()['loaded']) + self.assertEqual(self.assertGetSuccess(f'accessors/delete/{acc_id}').json(), acc_id) + self.assertFalse(self.assertGetSuccess(f'accessors/{acc_id}').json()['loaded']) # and try a non-existent accessor ID - resp_wrong_acc = self._get('accessors/auto_123456') + resp_wrong_acc = self.assertGetSuccess('accessors/auto_123456') self.assertEqual(resp_wrong_acc.status_code, 404) # load another... then remove all - self._put(f'accessors/read_from_file/{fname}') - self.assertEqual(sum([v['loaded'] for v in self._get('accessors').json().values()]), 1) - self.assertEqual(len(self._get(f'accessors/delete/*').json()), 1) - self.assertEqual(sum([v['loaded'] for v in self._get('accessors').json().values()]), 0) + self.assertPutSuccess(f'accessors/read_from_file/{fname}') + self.assertEqual(sum([v['loaded'] for v in self.assertGetSuccess('accessors').json().values()]), 1) + self.assertEqual(len(self.assertGetSuccess(f'accessors/delete/*').json()), 1) + self.assertEqual(sum([v['loaded'] for v in self.assertGetSuccess('accessors').json().values()]), 0) def test_empty_accessor_list(self): - resp_list_acc = self._get( + resp_list_acc = self.assertGetSuccess( f'accessors', ) self.assertEqual(len(resp_list_acc.json()), 0) def test_write_accessor(self): - acc_id = self._put('/testing/accessors/dummy_accessor/load').json() - self.assertTrue(self._get(f'accessors/{acc_id}').json()['loaded']) - sd = self._get(f'accessors/{acc_id}').json()['shape_dict'] - self.assertEqual(self._get(f'accessors/{acc_id}').json()['filepath'], '') + acc_id = self.assertPutSuccess('/testing/accessors/dummy_accessor/load').json() + self.assertTrue(self.assertGetSuccess(f'accessors/{acc_id}').json()['loaded']) + sd = self.assertGetSuccess(f'accessors/{acc_id}').json()['shape_dict'] + self.assertEqual(self.assertGetSuccess(f'accessors/{acc_id}').json()['filepath'], '') acc_out = self.get_accessor(accessor_id=acc_id, filename='test_output.tif') self.assertEqual(sd, acc_out.shape_dict) \ No newline at end of file diff --git a/tests/test_ilastik/test_ilastik.py b/tests/test_ilastik/test_ilastik.py index e7744acb0c53e02353654602e7916b69de61679e..a78875f26781f5533b76454edd8636b6754b9c35 100644 --- a/tests/test_ilastik/test_ilastik.py +++ b/tests/test_ilastik/test_ilastik.py @@ -209,7 +209,7 @@ class TestServerTestCase(conf.TestServerBaseClass): class TestIlastikOverApi(TestServerTestCase): def test_httpexception_if_incorrect_project_file_loaded(self): - resp_load = self._put( + resp_load = self.assertPutSuccess( 'ilastik/seg/load/', body={'project_file': 'improper.ilp'}, ) @@ -217,13 +217,13 @@ class TestIlastikOverApi(TestServerTestCase): def test_load_ilastik_pixel_model(self): - resp_load = self._put( + resp_load = self.assertPutSuccess( 'ilastik/seg/load/', body={'project_file': str(ilastik_classifiers['px']['path'])}, ) self.assertEqual(resp_load.status_code, 200, resp_load.json()) model_id = resp_load.json()['model_id'] - resp_list = self._get('models') + resp_list = self.assertGetSuccess('models') self.assertEqual(resp_list.status_code, 200) rj = resp_list.json() self.assertEqual(rj[model_id]['class'], 'IlastikPixelClassifierModel') @@ -231,19 +231,19 @@ class TestIlastikOverApi(TestServerTestCase): def test_load_another_ilastik_pixel_model(self): model_id = self.test_load_ilastik_pixel_model() - resp_list_1st = self._get('models').json() + resp_list_1st = self.assertGetSuccess('models').json() self.assertEqual(len(resp_list_1st), 1, resp_list_1st) - resp_load_2nd = self._put( + resp_load_2nd = self.assertPutSuccess( 'ilastik/seg/load/', body={'project_file': str(ilastik_classifiers['px']['path']), 'duplicate': True}, ) - resp_list_2nd = self._get('models').json() + resp_list_2nd = self.assertGetSuccess('models').json() self.assertEqual(len(resp_list_2nd), 2, resp_list_2nd) - resp_load_3rd = self._put( + resp_load_3rd = self.assertPutSuccess( 'ilastik/seg/load/', body={'project_file': str(ilastik_classifiers['px']['path']), 'duplicate': False}, ) - resp_list_3rd = self._get('models').json() + resp_list_3rd = self.assertGetSuccess('models').json() self.assertEqual(len(resp_list_3rd), 2, resp_list_3rd) def test_load_ilastik_pixel_model_with_params(self): @@ -252,26 +252,26 @@ class TestIlastikOverApi(TestServerTestCase): 'px_class': 0, 'px_prob_threshold': 0.5 } - resp_load = self._put( + resp_load = self.assertPutSuccess( 'ilastik/seg/load/', body=params, ) self.assertEqual(resp_load.status_code, 200, resp_load.json()) model_id = resp_load.json()['model_id'] - mods = self._get('models').json() + mods = self.assertGetSuccess('models').json() self.assertEqual(len(mods), 1) self.assertEqual(mods[model_id]['params']['px_prob_threshold'], 0.5) def test_load_ilastik_pxmap_to_obj_model(self): - resp_load = self._put( + resp_load = self.assertPutSuccess( 'ilastik/pxmap_to_obj/load/', body={'project_file': str(ilastik_classifiers['pxmap_to_obj']['path'])}, ) model_id = resp_load.json()['model_id'] self.assertEqual(resp_load.status_code, 200, resp_load.json()) - resp_list = self._get('models') + resp_list = self.assertGetSuccess('models') self.assertEqual(resp_list.status_code, 200) rj = resp_list.json() self.assertEqual(rj[model_id]['class'], 'IlastikObjectClassifierFromPixelPredictionsModel') @@ -279,7 +279,7 @@ class TestIlastikOverApi(TestServerTestCase): def test_load_ilastik_model_with_model_id(self): mid = 'new_model_id' - resp_load = self._put( + resp_load = self.assertPutSuccess( 'ilastik/pxmap_to_obj/load/', body={ 'project_file': str(ilastik_classifiers['pxmap_to_obj']['path']), @@ -290,14 +290,14 @@ class TestIlastikOverApi(TestServerTestCase): self.assertEqual(res_mid, mid) def test_load_ilastik_seg_to_obj_model(self): - resp_load = self._put( + resp_load = self.assertPutSuccess( 'ilastik/seg_to_obj/load/', body={'project_file': str(ilastik_classifiers['seg_to_obj']['path'])}, ) model_id = resp_load.json()['model_id'] self.assertEqual(resp_load.status_code, 200, resp_load.json()) - resp_list = self._get('models') + resp_list = self.assertGetSuccess('models') self.assertEqual(resp_list.status_code, 200) rj = resp_list.json() self.assertEqual(rj[model_id]['class'], 'IlastikObjectClassifierFromSegmentationModel') @@ -306,9 +306,9 @@ class TestIlastikOverApi(TestServerTestCase): def test_ilastik_infer_pixel_probability(self): fname = self.copy_input_file_to_server() model_id = self.test_load_ilastik_pixel_model() - in_acc_id = self._put(f'accessors/read_from_file/{fname}').json() + in_acc_id = self.assertPutSuccess(f'accessors/read_from_file/{fname}').json() - resp_infer = self._put( + resp_infer = self.assertPutSuccess( f'pipelines/segment', body={'model_id': model_id, 'accessor_id': in_acc_id, 'channel': 0}, ) @@ -320,9 +320,9 @@ class TestIlastikOverApi(TestServerTestCase): px_model_id = self.test_load_ilastik_pixel_model() ob_model_id = self.test_load_ilastik_pxmap_to_obj_model() - in_acc_id = self._put(f'accessors/read_from_file/{fname}').json() + in_acc_id = self.assertPutSuccess(f'accessors/read_from_file/{fname}').json() - resp_infer = self._put( + resp_infer = self.assertPutSuccess( 'ilastik/pipelines/pixel_then_object_classification/infer/', body={ 'px_model_id': px_model_id, @@ -398,19 +398,19 @@ class TestIlastikOnMultichannelInputs(TestServerTestCase): """ copyfile( self.pa_input_image, - Path(self._get('paths').json()['inbound_images']) / self.pa_input_image.name + Path(self.assertGetSuccess('paths').json()['inbound_images']) / self.pa_input_image.name ) - in_acc_id = self._put(f'accessors/read_from_file/{self.pa_input_image.name}').json() + in_acc_id = self.assertPutSuccess(f'accessors/read_from_file/{self.pa_input_image.name}').json() - resp_load_px = self._put( + resp_load_px = self.assertPutSuccess( 'ilastik/seg/load/', body={'project_file': str(self.pa_px_classifier)}, ) self.assertEqual(resp_load_px.status_code, 200, resp_load_px.json()) px_model_id = resp_load_px.json()['model_id'] - resp_load_ob = self._put( + resp_load_ob = self.assertPutSuccess( 'ilastik/pxmap_to_obj/load/', body={'project_file': str(self.pa_ob_pxmap_classifier)}, ) @@ -418,7 +418,7 @@ class TestIlastikOnMultichannelInputs(TestServerTestCase): ob_model_id = resp_load_ob.json()['model_id'] # run the pipeline - resp_infer = self._put( + resp_infer = self.assertPutSuccess( 'ilastik/pipelines/pixel_then_object_classification/infer/', body={ 'accessor_id': in_acc_id, diff --git a/tests/test_ilastik/test_roiset_workflow.py b/tests/test_ilastik/test_roiset_workflow.py index cec05dbdbdb1be3c97c1ed53cccf3d57984fa34c..1b824feee589952774368f6841b333af7788c25a 100644 --- a/tests/test_ilastik/test_roiset_workflow.py +++ b/tests/test_ilastik/test_roiset_workflow.py @@ -136,15 +136,15 @@ class TestRoiSetWorkflowOverApi(conf.TestServerBaseClass, BaseTestRoiSetMonoProd return conf.TestServerBaseClass.setUp(self) def test_trivial_api_response(self): - resp = self._get('') + resp = self.assertGetSuccess('') self.assertEqual(resp.status_code, 200) def test_load_input_accessor(self): fname = self.copy_input_file_to_server() - return self._put(f'accessors/read_from_file/{fname}').json() + return self.assertPutSuccess(f'accessors/read_from_file/{fname}').json() def test_load_pixel_classifier(self): - resp = self._put( + resp = self.assertPutSuccess( 'ilastik/seg/load/', body={'project_file': self._get_models()['pixel_classifier_segmentation']['project_file']}, ) @@ -153,7 +153,7 @@ class TestRoiSetWorkflowOverApi(conf.TestServerBaseClass, BaseTestRoiSetMonoProd return model_id def test_load_object_classifier(self): - resp = self._put( + resp = self.assertPutSuccess( 'ilastik/seg_to_obj/load/', body={'project_file': self._get_models()['object_classifier']['project_file']}, ) @@ -162,7 +162,7 @@ class TestRoiSetWorkflowOverApi(conf.TestServerBaseClass, BaseTestRoiSetMonoProd return model_id def _object_map_workflow(self, ob_classifer_id): - resp = self._put( + resp = self.assertPutSuccess( 'pipelines/roiset_to_obmap/infer', body={ 'accessor_id': self.test_load_input_accessor(), @@ -176,8 +176,8 @@ class TestRoiSetWorkflowOverApi(conf.TestServerBaseClass, BaseTestRoiSetMonoProd ) self.assertEqual(resp.status_code, 200, resp.json()) oid = resp.json()['output_accessor_id'] - obmap_fn = self._put(f'/accessors/write_to_file/{oid}').json() - where_out = self._get('paths').json()['outbound_images'] + obmap_fn = self.assertPutSuccess(f'/accessors/write_to_file/{oid}').json() + where_out = self.assertGetSuccess('paths').json()['outbound_images'] obmap_fp = Path(where_out) / obmap_fn self.assertTrue(obmap_fp.exists()) return generate_file_accessor(obmap_fp)