Newer
Older

Christopher Randolph Rhodes
committed
from pathlib import Path
from model_server.conf.testing import output_path, roiset_test_data
from model_server.base.roiset import RoiSetExportParams, RoiSetMetaParams
from model_server.base.roiset import _get_label_ids, RoiSet
from model_server.base.accessors import generate_file_accessor, InMemoryDataAccessor, write_accessor_data_to_file
from model_server.base.models import DummyInstanceSegmentationModel
class BaseTestRoiSetMonoProducts(object):

Christopher Randolph Rhodes
committed
# set up test raw data and segmentation from file
self.stack = generate_file_accessor(roiset_test_data['multichannel_zstack']['path'])
self.stack_ch_pa = self.stack.get_one_channel_data(roiset_test_data['pipeline_params']['patches_channel'])
self.seg_mask = generate_file_accessor(roiset_test_data['multichannel_zstack']['mask_path'])
class TestRoiSetMonoProducts(BaseTestRoiSetMonoProducts, unittest.TestCase):
def _make_roi_set(self, mask_type='boxes', **kwargs):
id_map = _get_label_ids(self.seg_mask)

Christopher Randolph Rhodes
committed
roiset = RoiSet.from_2d_obj_ids(
self.stack_ch_pa,
params=RoiSetMetaParams(
filters=kwargs.get('filters', {'area': {'min': 1e3, 'max': 1e4}}),
expand_box_by=(64, 2)
)
return roiset
def test_roi_mask_shape(self, **kwargs):
roiset = self._make_roi_set(**kwargs)
zmask = roiset.get_zmask()
zmask_acc = InMemoryDataAccessor(zmask)
self.assertTrue(zmask_acc.is_mask())
# assert dimensionality of zmask
self.assertEqual(zmask_acc.nz, roiset.acc_raw.nz)
self.assertEqual(zmask_acc.chroma, 1)
write_accessor_data_to_file(output_path / 'zmask.tif', zmask_acc)
# mask values are not just all True or all False
self.assertTrue(np.any(zmask))
self.assertFalse(np.all(zmask))
# assert non-trivial meta info in boxes

Christopher Randolph Rhodes
committed
self.assertGreater(roiset.count, 1)
sh = roiset.get_df().iloc[1]['mask'].shape
ar = roiset.get_df().iloc[1]['area']
self.assertGreaterEqual(sh[0] * sh[1], ar)
acc_zstack_slice = InMemoryDataAccessor(self.stack_ch_pa.data[:, :, :, 0])
self.assertEqual(acc_zstack_slice.nz, 1)
id_map = _get_label_ids(self.seg_mask)

Christopher Randolph Rhodes
committed
roiset = RoiSet.from_2d_obj_ids(acc_zstack_slice, id_map, params=RoiSetMetaParams(mask_type='boxes'))
zmask = roiset.get_zmask()
zmask_acc = InMemoryDataAccessor(zmask)
self.assertTrue(zmask_acc.is_mask())
roiset = self._make_roi_set()
for s in roiset.get_slices():
ebb = roiset.acc_raw.data[s]
self.assertEqual(len(ebb.shape), 4)
self.assertTrue(np.all([si >= 1 for si in ebb.shape]))
roiset = self._make_roi_set()
for roi in roiset:
ebb = roiset.acc_raw.data[roi.slice]
self.assertEqual(len(ebb.shape), 4)
self.assertTrue(np.all([si >= 1 for si in ebb.shape]))
rbb = ebb[roi.relative_slice]
self.assertEqual(len(rbb.shape), 4)
self.assertTrue(np.all([si >= 1 for si in rbb.shape]))
files = roiset.export_patches(
output_path / '2d_patches',
draw_bounding_box=True,
)
self.assertGreaterEqual(len(files), 1)
files = roiset.export_patches(
output_path / '3d_patches',
make_3d=True)
self.assertGreaterEqual(len(files), 1)
def test_export_annotated_zstack(self):
file = roiset.export_annotated_zstack(
output_path / 'annotated_zstack',
)
result = generate_file_accessor(Path(file['location']) / file['filename'])
self.assertEqual(result.shape, roiset.acc_raw.shape)

Christopher Randolph Rhodes
committed
def test_flatten_image(self):
id_map = _get_label_ids(self.seg_mask)
roiset = RoiSet(self.stack_ch_pa, id_map, params=RoiSetMetaParams(mask_type='boxes'))
df = roiset.get_df()

Christopher Randolph Rhodes
committed
from model_server.base.roiset import project_stack_from_focal_points

Christopher Randolph Rhodes
committed
img = project_stack_from_focal_points(

Christopher Randolph Rhodes
committed
df['centroid-0'].to_numpy(),
df['centroid-1'].to_numpy(),
df['zi'].to_numpy(),
self.stack,
degree=4,

Christopher Randolph Rhodes
committed
)
self.assertEqual(img.shape[0:2], self.stack.shape[0:2])
write_accessor_data_to_file(
output_path / 'flattened.tif',
InMemoryDataAccessor(img)
files = roiset.export_patch_masks(output_path / '2d_mask_patches', )
self.assertGreaterEqual(len(files), 1)

Christopher Randolph Rhodes
committed
def test_classify_by(self):
roiset = self._make_roi_set()
roiset.classify_by('dummy_class', 0, DummyInstanceSegmentationModel())
self.assertTrue(all(roiset.get_df()['classify_by_dummy_class'].unique() == [1]))
self.assertTrue(all(np.unique(roiset.object_class_maps['dummy_class'].data) == [0, 1]))
return roiset
def test_export_object_classes(self):
record = self.test_classify_by().run_exports(
output_path / 'object_class_maps',
0,
'obmap',
RoiSetExportParams(object_classes=True)
)

Christopher Randolph Rhodes
committed
opa = record['object_classes_dummy_class']
self.assertTrue(Path(opa).exists())

Christopher Randolph Rhodes
committed
acc = generate_file_accessor(opa)
self.assertTrue(all(np.unique(acc.data) == [0, 1]))

Christopher Randolph Rhodes
committed
def test_raw_patches_are_correct_shape(self):
roiset = self._make_roi_set()
patches = roiset.get_raw_patches()
np, h, w, nc, nz = patches.shape
self.assertEqual(np, roiset.count)
self.assertEqual(nc, roiset.acc_raw.chroma)
def test_patch_masks_are_correct_shape(self):
roiset = self._make_roi_set()
patch_masks = roiset.get_patch_masks()
np, h, w, nc, nz = patch_masks.shape
self.assertEqual(np, roiset.count)
self.assertEqual(nc, 1)

Christopher Randolph Rhodes
committed
class TestRoiSetMultichannelProducts(BaseTestRoiSetMonoProducts, unittest.TestCase):
def setUp(self) -> None:
super().setUp()
id_map = _get_label_ids(self.seg_mask)
self.roiset = RoiSet(
self.stack,
params=RoiSetMetaParams(
expand_box_by=(128, 2),
mask_type='boxes',
filters={'area': {'min': 1e3, 'max': 1e4}},
)
)
def test_multichannel_to_mono_2d_patches(self):
files = self.roiset.export_patches(
output_path / 'multichannel' / 'mono_2d_patches',

Christopher Randolph Rhodes
committed
white_channel=3,
draw_bounding_box=True,

Christopher Randolph Rhodes
committed
)

Christopher Randolph Rhodes
committed
result = generate_file_accessor(Path(files[0]['location']) / files[0]['patch_filename'])
self.assertEqual(result.chroma, 1)

Christopher Randolph Rhodes
committed
def test_multichannnel_to_mono_2d_patches_rgb_bbox(self):
files = self.roiset.export_patches(
output_path / 'multichannel' / 'mono_2d_patches_rgb_bbox',

Christopher Randolph Rhodes
committed
white_channel=3,
draw_bounding_box=True,
bounding_box_channel=1,
)
result = generate_file_accessor(Path(files[0]['location']) / files[0]['patch_filename'])
self.assertEqual(result.chroma, 3)
def test_multichannnel_to_rgb_2d_patches_bbox(self):
files = self.roiset.export_patches(
output_path / 'multichannel' / 'rgb_2d_patches_bbox',

Christopher Randolph Rhodes
committed
white_channel=4,

Christopher Randolph Rhodes
committed
rgb_overlay_channels=(3, None, None),

Christopher Randolph Rhodes
committed
draw_mask=True,
mask_channel=0,

Christopher Randolph Rhodes
committed
rgb_overlay_weights=(0.1, 1.0, 1.0)

Christopher Randolph Rhodes
committed
)

Christopher Randolph Rhodes
committed
result = generate_file_accessor(Path(files[0]['location']) / files[0]['patch_filename'])

Christopher Randolph Rhodes
committed
self.assertEqual(result.chroma, 3)

Christopher Randolph Rhodes
committed
def test_multichannnel_to_rgb_2d_patches_contour(self):
files = self.roiset.export_patches(
output_path / 'multichannel' / 'rgb_2d_patches_contour',
rgb_overlay_channels=(3, None, None),

Christopher Randolph Rhodes
committed
draw_contour=True,
contour_channel=1,
rgb_overlay_weights=(0.1, 1.0, 1.0)

Christopher Randolph Rhodes
committed
result = generate_file_accessor(Path(files[0]['location']) / files[0]['patch_filename'])
self.assertEqual(result.chroma, 3)
self.assertEqual(result.get_one_channel_data(2).data.max(), 0) # blue channel is black

Christopher Randolph Rhodes
committed
def test_multichannel_to_multichannel_tif_patches(self):
files = self.roiset.export_patches(
output_path / 'multichannel' / 'multichannel_tif_patches',

Christopher Randolph Rhodes
committed
)
result = generate_file_accessor(Path(files[0]['location']) / files[0]['patch_filename'])
self.assertEqual(result.chroma, 5)
def test_multichannel_annotated_zstack(self):
output_path / 'multichannel' / 'annotated_zstack',
'test_multichannel_annotated_zstack',
)
result = generate_file_accessor(Path(file['location']) / file['filename'])
self.assertEqual(result.chroma, self.stack.chroma)
self.assertEqual(result.nz, self.stack.nz)
def test_export_single_channel_annotated_zstack(self):
file = self.roiset.export_annotated_zstack(
output_path / 'annotated_zstack',
channel=3,
)
result = generate_file_accessor(Path(file['location']) / file['filename'])
self.assertEqual(result.hw, self.roiset.acc_raw.hw)
self.assertEqual(result.nz, self.roiset.acc_raw.nz)
self.assertEqual(result.chroma, 1)
class TestRoiSetFromZmask(unittest.TestCase):
def setUp(self) -> None:
# set up test raw data and segmentation from file
self.stack = generate_file_accessor(roiset_test_data['multichannel_zstack']['path'])
self.stack_ch_pa = self.stack.get_one_channel_data(roiset_test_data['pipeline_params']['segmentation_channel'])
self.seg_mask_3d = generate_file_accessor(roiset_test_data['multichannel_zstack']['mask_path_3d'])
id_map = _get_label_ids(self.seg_mask_3d, allow_3d=True)
self.assertGreater(id_map.nz, 1)
roiset = RoiSet.from_3d_obj_ids(
self.stack_ch_pa,
id_map,
params=RoiSetMetaParams(
mask_type='contours',
filters={'area': {'min': 1e3, 'max': 1e4}},
)
)
self.roiset = roiset
self.zmask = InMemoryDataAccessor(roiset.get_zmask())

Christopher Randolph Rhodes
committed
@staticmethod
def _label_is_2d(id_map, la): # single label's zmask has same counts as its MIP
mask_3d = (id_map == la)
mask_mip = mask_3d.max(axis=-1)
return mask_3d.sum() == mask_mip.sum()
def test_id_map_connects_z(self):

Christopher Randolph Rhodes
committed
id_map = _get_label_ids(self.seg_mask_3d, allow_3d=True, connect_3d=True)
labels = np.unique(id_map.data)[1:]

Christopher Randolph Rhodes
committed
is_2d = all([self._label_is_2d(id_map.data, la) for la in labels])
self.assertFalse(is_2d)

Christopher Randolph Rhodes
committed
def test_id_map_disconnects_z(self):
id_map = _get_label_ids(self.seg_mask_3d, allow_3d=True, connect_3d=False)
labels = np.unique(id_map.data)[1:]
is_2d = all([self._label_is_2d(id_map.data, la) for la in labels])
self.assertTrue(is_2d)