From 224f2ea2d13af44124edb419236ea8a6f504d073 Mon Sep 17 00:00:00 2001
From: Constantin Pape <constantin.pape@iwr.uni-heidelberg.de>
Date: Thu, 13 Jun 2019 10:11:59 +0200
Subject: [PATCH] Wrap morphology calculation in cluster task; parallelize it

---
 scripts/attributes/master.py               |  18 +-
 scripts/attributes/morphology.py           | 327 ++++---------------
 scripts/extension/attributes/__init__.py   |   1 +
 scripts/extension/attributes/morphology.py | 349 +++++++++++++++++++++
 scripts/extension/attributes/workflow.py   |  82 +++++
 test/attributes/test_genes.py              |   2 +-
 test/attributes/test_morphology.py         |  44 ++-
 7 files changed, 541 insertions(+), 282 deletions(-)
 create mode 100644 scripts/extension/attributes/morphology.py
 create mode 100644 scripts/extension/attributes/workflow.py

diff --git a/scripts/attributes/master.py b/scripts/attributes/master.py
index 2f3129a..67ff44e 100644
--- a/scripts/attributes/master.py
+++ b/scripts/attributes/master.py
@@ -55,7 +55,10 @@ def make_cell_tables(folder, name, tmp_folder, resolution,
 
     # make table with morphology
     morpho_out = os.path.join(table_folder, 'morphology.csv')
-    write_morphology_cells(seg_path, base_out, map_out, morpho_out)
+    n_labels = len(label_ids)
+    write_morphology_cells(seg_path, base_out, map_out, morpho_out,
+                           n_labels, resolution, tmp_folder,
+                           target, max_jobs)
 
     # TODO additional tables:
     # regions / semantics
@@ -73,13 +76,18 @@ def make_nucleus_tables(folder, name, tmp_folder, resolution,
 
     # make the basic attributes table
     base_out = os.path.join(table_folder, 'default.csv')
-    base_attributes(seg_path, seg_key, base_out, resolution,
-                    tmp_folder, target=target, max_jobs=max_jobs,
-                    correct_anchors=True)
+    label_ids = base_attributes(seg_path, seg_key, base_out, resolution,
+                                tmp_folder, target=target, max_jobs=max_jobs,
+                                correct_anchors=True)
 
+    xml_raw = os.path.join(folder, 'images', 'em-raw-full-res.xml')
+    raw_path = get_h5_path_from_xml(xml_raw, return_absolute_path=True)
     # make the morphology attribute table
     morpho_out = os.path.join(table_folder, 'morphology.csv')
-    write_morphology_nuclei(seg_path, base_out, morpho_out)
+    n_labels = len(label_ids)
+    write_morphology_nuclei(seg_path, raw_path, base_out, morpho_out,
+                            n_labels, resolution, tmp_folder,
+                            target, max_jobs)
 
     # TODO additional tables:
     # ???
diff --git a/scripts/attributes/morphology.py b/scripts/attributes/morphology.py
index 0095e6d..f81ec10 100644
--- a/scripts/attributes/morphology.py
+++ b/scripts/attributes/morphology.py
@@ -1,271 +1,52 @@
-# calculate morphology stats for cells / nuclei
+import luigi
+import os
+import json
+from ..extension.attributes import MorphologyWorkflow
 
-import pandas as pd
-import h5py
-import numpy as np
-from skimage.measure import regionprops, marching_cubes_lewiner, mesh_surface_area
-from skimage.transform import resize
-from skimage.util import pad
 
+def write_config(config_folder, config):
+    # something eats a lot of threads, let's at least reserve a few ...
+    config.update({'mem_limit': 24, 'threads_per_job': 4})
+    with open(os.path.join(config_folder, 'morphology.config'), 'w') as f:
+        json.dump(config, f)
 
-def calculate_slice(scale, mins, maxs):
-    """
-    Takes a scale (get this from xml file of object), and list of min and
-    max coordinates in microns [min_coord_x_microns, min_coord_y_microns,
-    min_coord_z_microns, max_coord_x_microns, max_coord_y_microns,
-    max_coord_z_microns].
-    Return a slice to use to extract
-    that bounding box from the original image.
-    """
-
-    # convert min and max in microns to pixel ranges
-    mins = [int(mi / sca) for mi, sca in zip(mins, scale)]
-    maxs = [int(ma / sca) + 1 for ma, sca in zip(maxs, scale)]
-
-    return tuple(slice(mi, ma) for mi, ma in zip(mins, maxs))
-
-
-def calculate_stats_binary(mask, scale):
-    """
-    Calculate morphology statistics from the binary mask of an object
-    1 in object, 0 outside.
-    mask - numpy array for image, should be binary, 1 in object, 0 outside
-    scale - list of form [x, y, z] stating scale in microns of image
-    """
-    # Calculate stats from skimage
-    ski_morph = regionprops(mask)
-
-    # volume in pixels
-    volume_in_pix = ski_morph[0]['area']
-
-    # extent
-    extent = ski_morph[0]['extent']
-
-    # The mesh calculation below fails if an edge of the segmentation is right up against the
-    # edge of the volume - gives an open, rather than a closed surface
-    # Pad by a few pixels to avoid this
-    mask = pad(mask, 10, mode='constant')
-
-    # surface area of mesh around object (other ways to calculate better?)
-    # spacing z, y , x to match mask dimensions
-    verts, faces, normals, values = marching_cubes_lewiner(mask, spacing=(scale[2], scale[1], scale[0]))
-    surface_area = mesh_surface_area(verts, faces)
-
-    # volume in microns
-    volume_in_microns = scale[0]*scale[1]*scale[2]*volume_in_pix
-
-    # sphericity (as in morpholibj)
-    # Should run from zero to one
-    sphericity = (36*np.pi*(float(volume_in_microns)**2))/(float(surface_area)**3)
-
-    return volume_in_microns, extent, surface_area, sphericity
-
-
-def calculate_stats_intensity(raw, mask):
-    """
-    Calculate intensity stats on the raw EM data for the region covered by the
-    provided binary mask. Mask and raw must have the same dimensions!
-
-    raw - numpy array of raw image data
-    mask - numpy array of binary mask, 1 in object, 0 outside
-    """
-
-    intensity_vals_in_mask = raw[mask == 1]
-
-    # mean and stdev - use float64 to avoid silent overflow errors
-    mean_intensity = np.mean(intensity_vals_in_mask, dtype=np.float64)
-    st_dev = np.std(intensity_vals_in_mask, dtype=np.float64)
-
-    return mean_intensity, st_dev
-
-
-# TODO - possibility that downscaled version
-# of cell segmentation may not include some of the smallest cells in the table -
-# will fail if this happens.
-def calculate_row_stats(row, seg_path, input_type, morphology, intensity):
-    """
-    Calculate all morphology stats for one row of the table
-    row - named tuple of values from that row
-    input_type - 'nucleus' or 'cell'
-    morphology - whether to calculate stats based on shape of segmentation: volume, extent, surface area, sphericity
-    intensity - whether to calculate stats based on raw data covered by segmentation: mean_intensity, st_dev_intensity
-    """
-    if row.label_id % 100 == 0:
-        print('Processing ' + str(row.label_id))
-
-    # tuple to hold result
-    result = (row.label_id,)
-
-    if input_type == 'nucleus':
-        full_seg_scale = [0.1, 0.08, 0.08]
-        # use full res of segmentation - 0.08, 0.08, 0.1
-        seg_level = 0
-        # Use the /t00000/s00/3/cells for raw data - gives pixel size similar to the nuclear segmentation
-        raw_level = 3
-
-    elif input_type == 'cell':
-        full_seg_scale = [0.025, 0.02, 0.02]
-        # look at 4x downsampled segmentation - close to 0.08, 0.08, 0.1
-        seg_level = 2
-
-    bb_min = [row.bb_min_z, row.bb_min_y, row.bb_min_x]
-    bb_max = [row.bb_max_z, row.bb_max_y, row.bb_max_x]
-    with h5py.File(seg_path, 'r') as f:
-
-        # get shape of full data & downsampled
-        full_data_shape = f['t00000/s00/0/cells'].shape
-        down_data_shape = f['t00000/s00/' + str(seg_level) + '/cells'].shape
-
-        # scale for downsampled
-        down_scale = [full_scale * (full_shape / down_shape)
-                      for full_scale, full_shape, down_shape in zip(full_seg_scale,
-                                                                    full_data_shape,
-                                                                    down_data_shape)]
-        # slice for seg file
-        seg_slice = calculate_slice(down_scale, bb_min, bb_max)
-
-        # get specified layer in bdv file
-        data = f['t00000/s00/' + str(seg_level) + '/cells']
-        img_array = data[seg_slice]
-
-    # make into a binary mask
-    binary = img_array == int(row.label_id)
-    binary = binary.astype('uint8')
-
-    # we need to skip for empty labels
-    n_fg = binary.sum()
-    if n_fg == 0:
-        print("Skipping label", row.label_id, "due to empty segmentation mask")
-        n_stats = 0
-        if morphology:
-            n_stats += 4
-        if intensity:
-            n_stats += 2
-        dummy_result = (0.,) * n_stats
-        result = result + dummy_result
-        return result
 
-    img_array = None
-
-    # calculate morphology stats straight from segmentation
-    if morphology:
-        result = result + calculate_stats_binary(binary, down_scale)
-
-    # calculate intensity statistics
-    if intensity:
-
-        # scale for full resolution raw data
-        full_raw_scale = [0.025, 0.01, 0.01]
-
-        # FIXME don't hardcode this path here
-        with h5py.File('/g/arendt/EM_6dpf_segmentation/EM-Prospr/em-raw-full-res.h5', 'r') as f:
-
-            # get shape of full data & downsampled
-            full_data_shape = f['t00000/s00/0/cells'].shape
-            down_data_shape = f['t00000/s00/' + str(raw_level) + '/cells'].shape
-
-            # scale for downsampled
-            down_scale = [full_scale * (full_shape / down_shape)
-                          for full_scale, full_shape, down_shape in zip(full_raw_scale,
-                                                                        full_data_shape,
-                                                                        down_data_shape)]
-
-            # slice for raw file
-            raw_slice = calculate_slice(down_scale, bb_min, bb_max)
-
-            # get specified layer in bdv file
-            data = f['t00000/s00/' + str(raw_level) + '/cells']
-            img_array = data[raw_slice]
-
-        if img_array.shape != binary.shape:
-
-            # rescale the binary to match the raw, using nearest neighbour interpolation (order = 0)
-            binary = resize(binary, img_array.shape, order=0, mode='reflect', anti_aliasing=True, preserve_range=True)
-            binary = binary.astype('uint8')
-
-        # Calculate statistics on raw data
-        result = result + calculate_stats_intensity(img_array, binary)
-
-    return result
-
-
-def calculate_morphology_stats(table, seg_path, input_type, morphology=True, intensity=True):
-    '''
-    Calculate statistics based on EM & segmentation.
-    table - input table (pandas dataframe)
-    input_type - 'nucleus' or 'cell'
-    morphology - whether to calculate stats based on shape of segmentation:
-        volume_in_microns, extent, surface area, sphericity
-    intensity - whether to calculate stats based on raw data covered by segmentation: mean_intensity, st_dev_intensity
-    '''
-
-    # size cutoffs (at the moment just browsed through segmentation and chose sensible values)
-    # not very stringent, still some rather large / small things included
-    if input_type == 'nucleus':
-        table = table.loc[table['n_pixels'] >= 18313.0, :]
-
-    elif input_type == 'cell':
-        criteria = np.logical_and(table['n_pixels'] > 88741.0, table['n_pixels'] < 600000000.0)
-        table = table.loc[criteria, :]
-
-    stats = [calculate_row_stats(row, seg_path, input_type, morphology, intensity)
-             for row in table.itertuples(index=False)]
-
-    # convert back to pandas dataframe
-    stats = pd.DataFrame(stats)
-
-    # names for columns
-    columns = ['label_id']
-
-    if morphology:
-        columns = columns + ['shape_volume_in_microns', 'shape_extent', 'shape_surface_area', 'shape_sphericity']
-
-    if intensity:
-        columns = columns + ['intensity_mean', 'intensity_st_dev']
-
-    # set column names
-    stats.columns = columns
-
-    return stats
-
-
-def load_cell_nucleus_mapping(cell_nuc_mapping_path):
-    # read in numpy array of mapping of cells to nuclei - first column cell id, second nucleus id
-    cell_nucleus_mapping = np.genfromtxt(cell_nuc_mapping_path, skip_header=1, delimiter='\t')[:, :2]
-    cell_nucleus_mapping = cell_nucleus_mapping.astype('uint64')
-    # remove zero labels from this table too, if exist
-    cell_nucleus_mapping = cell_nucleus_mapping[np.logical_and(cell_nucleus_mapping[:, 0] != 0,
-                                                               cell_nucleus_mapping[:, 1] != 0)]
-    return cell_nucleus_mapping
-
-
-# TODO wrap this in a luigi task / cluster tools task, so we have caching and can move computation
-# to the cluster
-def write_morphology_nuclei(seg_path, table_in_path, table_out_path):
+def write_morphology_nuclei(seg_path, raw_path, table_in_path, table_out_path,
+                            n_labels, resolution, tmp_folder, target, max_jobs):
     """
-    Write csv files of morphology stats for both the nucleus and cell segmentation
+    Write csv files of morphology stats for the nucleus segmentation
 
     seg_path - string, file path to nucleus segmentation
+    raw_path - string, file path to raw data
     cell_table_path - string, file path to cell table
     table_in_path - string, file path to nucleus table
     table_out_path - string, file path to save new nucleus table
+    n_labels - int, number of labels
+    resolution - list or tuple, resolution of segmentation at scale 0
+    tmp_folder - string, temporary folder
+    target - string, computation target (slurm or local)
+    max_jobs - maximal number of jobs
     """
+    task = MorphologyWorkflow
+    config_folder = os.path.join(tmp_folder, 'configs')
+    write_config(config_folder, task.get_config()['morphology'])
+
+    seg_scale = 0
+    min_size = 18313  # Kimberly's lower size cutoff for nuclei
+    t = task(tmp_folder=tmp_folder, max_jobs=max_jobs,
+             config_dir=config_folder, target=target,
+             segmentation_path=seg_path, in_table_path=table_in_path,
+             output_path=table_out_path, resolution=list(resolution),
+             seg_scale=seg_scale, raw_scale=3, min_size=min_size, max_size=None,
+             raw_path=raw_path, prefix='nuclei', number_of_labels=n_labels)
+    ret = luigi.build([t], local_scheduler=True)
+    if not ret:
+        raise RuntimeError("Nucleus morphology computation failed")
+
+
+def write_morphology_cells(seg_path, table_in_path, cell_nuc_mapping_path, table_out_path,
+                           n_labels, resolution, tmp_folder, target, max_jobs):
 
-    nuclei_table = pd.read_csv(table_in_path, sep='\t')
-
-    # remove zero label if it exists
-    nuclei_table = nuclei_table.loc[nuclei_table['label_id'] != 0, :]
-
-    # calculate stats for both tables and save results to csv
-    result_nuclei = calculate_morphology_stats(nuclei_table, seg_path, 'nucleus',
-                                               morphology=True, intensity=True)
-    result_nuclei.to_csv(table_out_path, index=False, sep='\t')
-
-
-# TODO wrap this in a luigi task / cluster tools task, so we have caching and can move computation
-# to the cluster
-def write_morphology_cells(seg_path, table_in_path,
-                           cell_nuc_mapping_path, table_out_path):
     """
     Write csv files of morphology stats for both the nucleus and cell segmentation
 
@@ -274,16 +55,26 @@ def write_morphology_cells(seg_path, table_in_path,
     cell_nuc_mapping_path - string, file path to numpy array mapping cells to nuclei
         (first column cell id, second nucleus id)
     table_out_path - string, file path to save new cell table
+    n_labels - int, number of labels
+    resolution - list or tuple, resolution of segmentation at scale 0
+    tmp_folder - string, temporary folder
+    target - string, computation target (slurm or local)
+    max_jobs - maximal number of jobs
     """
-
-    cell_table = pd.read_csv(table_in_path, sep='\t')
-
-    # remove zero label if it exists
-    cell_table = cell_table.loc[cell_table['label_id'] != 0, :]
-    cell_nucleus_mapping = load_cell_nucleus_mapping(cell_nuc_mapping_path)
-
-    # only keep cells in the cell_nuc_mapping (i.e those that have assigned nuclei)
-    cell_table = cell_table.loc[np.isin(cell_table['label_id'], cell_nucleus_mapping[:, 0]), :]
-
-    result_cells = calculate_morphology_stats(cell_table, seg_path, 'cell', morphology=True, intensity=False)
-    result_cells.to_csv(table_out_path, index=False, sep='\t')
+    task = MorphologyWorkflow
+    config_folder = os.path.join(tmp_folder, 'configs')
+    write_config(config_folder, task.get_config()['morphology'])
+
+    seg_scale = 2
+    min_size = 88741  # Kimberly's lower size cutoff for cells
+    max_size = 600000000  # Kimberly's upper size cutoff for cells
+    t = task(tmp_folder=tmp_folder, max_jobs=max_jobs,
+             config_dir=config_folder, target=target,
+             segmentation_path=seg_path, in_table_path=table_in_path,
+             output_path=table_out_path, resolution=list(resolution),
+             seg_scale=seg_scale, min_size=min_size, max_size=max_size,
+             mapping_path=cell_nuc_mapping_path, prefix='cells',
+             number_of_labels=n_labels)
+    ret = luigi.build([t], local_scheduler=True)
+    if not ret:
+        raise RuntimeError("Cell morphology computation failed")
diff --git a/scripts/extension/attributes/__init__.py b/scripts/extension/attributes/__init__.py
index 330dcad..4a52cda 100644
--- a/scripts/extension/attributes/__init__.py
+++ b/scripts/extension/attributes/__init__.py
@@ -1 +1,2 @@
 from .genes import GenesLocal, GenesSlurm
+from .workflow import MorphologyWorkflow
diff --git a/scripts/extension/attributes/morphology.py b/scripts/extension/attributes/morphology.py
new file mode 100644
index 0000000..d38be4f
--- /dev/null
+++ b/scripts/extension/attributes/morphology.py
@@ -0,0 +1,349 @@
+#! /bin/python
+
+import os
+import sys
+import json
+
+import luigi
+import numpy as np
+import pandas as pd
+import nifty.tools as nt
+from skimage.measure import regionprops, marching_cubes_lewiner, mesh_surface_area
+from skimage.transform import resize
+from skimage.util import pad
+
+import cluster_tools.utils.volume_utils as vu
+import cluster_tools.utils.function_utils as fu
+from cluster_tools.utils.task_utils import DummyTask
+from cluster_tools.cluster_tasks import SlurmTask, LocalTask
+
+#
+# Morphology Attribute Tasks
+#
+
+
+# FIXME something in here uses a lot of threads
+class MorphologyBase(luigi.Task):
+    """ Morphology base class
+    """
+
+    task_name = 'morphology'
+    src_file = os.path.abspath(__file__)
+    allow_retry = False
+
+    # input volumes and graph
+    segmentation_path = luigi.Parameter()
+    in_table_path = luigi.Parameter()
+    output_prefix = luigi.Parameter()
+    # resolution of the segmentation at full scale
+    resolution = luigi.ListParameter()
+    # scales of segmentation and raw data used for the computation
+    seg_scale = luigi.IntParameter()
+    raw_scale = luigi.IntParameter(default=3)
+    # prefix
+    prefix = luigi.Parameter()
+    number_of_labels = luigi.IntParameter()
+    # minimum and maximum sizes for objects
+    min_size = luigi.IntParameter()
+    max_size = luigi.IntParameter(default=None)
+    # path for cell nucleus mapping, that is used for additional
+    # table filtering
+    mapping_path = luigi.IntParameter(default='')
+    # input path for intensity calcuation
+    # if '', intensities will not be calculated
+    raw_path = luigi.Parameter(default='')
+    dependency = luigi.TaskParameter(default=DummyTask())
+
+    def requires(self):
+        return self.dependency
+
+    def run_impl(self):
+        # get the global config and init configs
+        shebang = self.global_config_values()[0]
+        self.init(shebang)
+
+        # load the task config
+        config = self.get_task_config()
+        # we hard-code the chunk-size to 1000 for now
+        block_list = vu.blocks_in_volume([self.number_of_labels], [1000])
+
+        # update the config with input and graph paths and keys
+        # as well as block shape
+        config.update({'segmentation_path': self.segmentation_path,
+                       'output_prefix': self.output_prefix,
+                       'in_table_path': self.in_table_path,
+                       'raw_path': self.raw_path,
+                       'mapping_path': self.mapping_path,
+                       'seg_scale': self.seg_scale,
+                       'raw_scale': self.raw_scale,
+                       'resolution': self.resolution,
+                       'min_size': self.min_size,
+                       'max_size': self.max_size})
+
+        # prime and run the job
+        n_jobs = min(len(block_list), self.max_jobs)
+        self.prepare_jobs(n_jobs, block_list, config, self.prefix)
+        self.submit_jobs(n_jobs, self.prefix)
+
+        # wait till jobs finish and check for job success
+        self.wait_for_jobs()
+        self.check_jobs(n_jobs, self.prefix)
+
+    def output(self):
+        out_path = os.path.join(self.tmp_folder,
+                                '%s_%s.log' % (self.task_name, self.prefix))
+        return luigi.LocalTarget(out_path)
+
+
+class MorphologyLocal(MorphologyBase, LocalTask):
+    """ Morphology on local machine
+    """
+    pass
+
+
+class MorphologySlurm(MorphologyBase, SlurmTask):
+    """ Morphology on slurm cluster
+    """
+    pass
+
+
+#
+# Implementation
+#
+
+
+# get shape of full data & downsampling factor
+def get_scale_factor(path, key_full, key, resolution):
+    with vu.file_reader(path, 'r') as f:
+        full_shape = f[key_full].shape
+        shape = f[key].shape
+
+    # scale factor for downsampling
+    scale_factor = [res * (fs / sh)
+                    for res, fs, sh in zip(resolution,
+                                           full_shape,
+                                           shape)]
+    return scale_factor
+
+
+def filter_table(table, min_size, max_size):
+    if max_size is None:
+        table = table.loc[table['n_pixels'] >= min_size, :]
+    else:
+        criteria = np.logical_and(table['n_pixels'] > min_size, table['n_pixels'] < max_size)
+        table = table.loc[criteria, :]
+    return table
+
+
+def filter_table_from_mapping(table, mapping_path):
+    # read in numpy array of mapping of cells to nuclei - first column cell id, second nucleus id
+    mapping = np.genfromtxt(mapping_path, skip_header=1, delimiter='\t')[:, :2].astype('uint64')
+    # remove zero labels from this table too, if exist
+    mapping = mapping[np.logical_and(mapping[:, 0] != 0,
+                                     mapping[:, 1] != 0)]
+    table = table.loc[np.isin(table['label_id'], mapping[:, 0]), :]
+    return table
+
+
+def load_data(ds, row, scale):
+    # compute the bounding box from the row information
+    mins = [row.bb_min_z, row.bb_min_y, row.bb_min_x]
+    maxs = [row.bb_max_z, row.bb_max_y, row.bb_max_x]
+    mins = [int(mi / sca) for mi, sca in zip(mins, scale)]
+    maxs = [int(ma / sca) + 1 for ma, sca in zip(maxs, scale)]
+    bb = tuple(slice(mi, ma) for mi, ma in zip(mins, maxs))
+    # load the data from the bounding box
+    return ds[bb]
+
+
+def morphology_row_features(mask, scale):
+
+    # Calculate stats from skimage
+    ski_morph = regionprops(mask.astype('uint8'))
+
+    # volume in pixels
+    volume_in_pix = ski_morph[0]['area']
+
+    # extent
+    extent = ski_morph[0]['extent']
+
+    # The mesh calculation below fails if an edge of the segmentation is right up against the
+    # edge of the volume - gives an open, rather than a closed surface
+    # Pad by a few pixels to avoid this
+    mask = pad(mask, 10, mode='constant')
+
+    # surface area of mesh around object (other ways to calculate better?)
+    verts, faces, normals, values = marching_cubes_lewiner(mask, spacing=tuple(scale))
+    surface_area = mesh_surface_area(verts, faces)
+
+    # volume in microns
+    volume_in_microns = np.prod(scale)*volume_in_pix
+
+    # sphericity (as in morpholibj)
+    # Should run from zero to one
+    sphericity = (36*np.pi*(float(volume_in_microns)**2))/(float(surface_area)**3)
+
+    return [volume_in_microns, extent, surface_area, sphericity]
+
+
+def intensity_row_features(raw, mask):
+    intensity_vals_in_mask = raw[mask]
+    # mean and stdev - use float64 to avoid silent overflow errors
+    mean_intensity = np.mean(intensity_vals_in_mask, dtype=np.float64)
+    st_dev = np.std(intensity_vals_in_mask, dtype=np.float64)
+    return mean_intensity, st_dev
+
+
+# compute morphology (and intensity features) for label range
+def morphology_features_for_label_range(table, ds, ds_raw,
+                                        scale_factor_seg, scale_factor_raw,
+                                        label_begin, label_end):
+    n_features = 4 if ds_raw is None else 6
+    label_range = np.logical_and(table['label_id'] >= label_begin, table['label_id'] < label_end)
+    sub_table = table.loc[label_range, :]
+    stats = []
+    for row in sub_table.itertuples(index=False):
+        label_id = int(row.label_id)
+
+        # load the segmentation data from the bounding box corresponding
+        # to this row
+        seg = load_data(ds, row, scale_factor_seg)
+
+        # compute the segmentation mask and check that we have
+        # foreground in the mask
+        seg_mask = seg == label_id
+        if seg_mask.sum() == 0:
+            result = [float(label_id)] + [0.] * n_features
+            stats.append(result)
+            continue
+
+        # compute the morphology features from the segmentation mask
+        result = [float(label_id)] + morphology_row_features(seg_mask, scale_factor_seg)
+
+        # compute the intensiry features from raw data and segmentation mask
+        if ds_raw is not None:
+            raw = load_data(ds_raw, row, scale_factor_raw)
+            # resize the segmentation mask if it does not fit the raw data
+            if seg_mask.shape != raw.shape:
+                seg_mask = resize(seg_mask, raw.shape,
+                                  order=0, mode='reflect',
+                                  anti_aliasing=True, preserve_range=True).astype('bool')
+            result += intensity_row_features(raw, seg_mask)
+        stats.append(result)
+    return stats
+
+
+def compute_morphology_features(table, segmentation_path, raw_path,
+                                seg_key, raw_key,
+                                scale_factor_seg, scale_factor_raw,
+                                blocking, block_list):
+
+    if raw_path != '':
+        assert raw_key is not None and scale_factor_raw is not None
+        f_raw = vu.file_reader(raw_path, 'r')
+        ds_raw = f_raw[raw_key]
+    else:
+        f_raw = ds_raw = None
+
+    with vu.file_reader(segmentation_path, 'r') as f:
+        ds = f[seg_key]
+
+        stats = []
+        for block_id in block_list:
+            block = blocking.getBlock(block_id)
+            label_a, label_b = block.begin[0], block.end[0]
+            fu.log("Computing features from label-id %i to %i" % (label_a, label_b))
+            stats.extend(morphology_features_for_label_range(table, ds, ds_raw,
+                                                             scale_factor_seg, scale_factor_raw,
+                                                             label_a, label_b))
+    if f_raw is not None:
+        f_raw.close()
+
+    # convert to pandas table and add column names
+    stats = pd.DataFrame(stats)
+    columns = ['label_id',
+               'shape_volume_in_microns', 'shape_extent', 'shape_surface_area', 'shape_sphericity']
+    if raw_path != '':
+        columns += ['intensity_mean', 'intensity_st_dev']
+    stats.columns = columns
+    return stats
+
+
+def morphology(job_id, config_path):
+
+    fu.log("start processing job %i" % job_id)
+    fu.log("reading config from %s" % config_path)
+
+    # get the config
+    with open(config_path) as f:
+        config = json.load(f)
+    segmentation_path = config['segmentation_path']
+    in_table_path = config['in_table_path']
+    raw_path = config['raw_path']
+    mapping_path = config['mapping_path']
+    output_prefix = config['output_prefix']
+
+    min_size = config['min_size']
+    max_size = config['max_size']
+
+    resolution = config['resolution']
+    raw_scale = config['raw_scale']
+    seg_scale = config['seg_scale']
+
+    block_list = config['block_list']
+
+    # keys to segmentation and raw data for the different scales
+    seg_key_full = 't00000/s00/0/cells'
+    seg_key = 't00000/s00/%i/cells' % seg_scale
+    raw_key_full = 't00000/s00/0/cells'
+    raw_key = 't00000/s00/%i/cells' % raw_scale
+
+    # get scale factor for the segmentation
+    scale_factor_seg = get_scale_factor(segmentation_path, seg_key_full, seg_key, resolution)
+
+    # get scale factor for raw data (if it's given)
+    if raw_path != '':
+        fu.log("Have raw path; compute intensity features")
+        # NOTE for now we can hard-code the resolution for the raw data here,
+        # but we might need to change this if we get additional dataset(s)
+        raw_resolution = [0.025, 0.01, 0.01]
+        scale_factor_raw = get_scale_factor(raw_path, raw_key_full, raw_key, raw_resolution)
+    else:
+        fu.log("Don't have raw path; do not compute intensity features")
+        raw_resolution = scale_factor_raw = None
+
+    # read the base table
+    table = pd.read_csv(in_table_path, sep='\t')
+    n_labels = table.shape[0]
+    fu.log("Initial number of labels: %i" % n_labels)
+    # remove zero label if it exists
+    table = table.loc[table['label_id'] != 0, :]
+
+    # if we have a mappin, only keep objects in the mapping
+    # (i.e cells that have assigned nuclei)
+    if mapping_path != '':
+        fu.log("Have mapping path %s" % mapping_path)
+        table = filter_table_from_mapping(table, mapping_path)
+        fu.log("Number of labels after filter with mapping: %i" % table.shape[0])
+    # filter by size
+    table = filter_table(table, min_size, max_size)
+    fu.log("Number of labels after size filte: %i" % table.shape[0])
+
+    blocking = nt.blocking([0], [n_labels], [1000])
+
+    fu.log("Computing morphology features")
+    stats = compute_morphology_features(table, segmentation_path, raw_path,
+                                        seg_key, raw_key, scale_factor_seg, scale_factor_raw,
+                                        blocking, block_list)
+
+    output_path = output_prefix + '_job%i.csv' % job_id
+    fu.log("Save result to %s" % output_path)
+    stats.to_csv(output_path, index=False, sep='\t')
+    fu.log_job_success(job_id)
+
+
+if __name__ == '__main__':
+    path = sys.argv[1]
+    assert os.path.exists(path), path
+    job_id = int(os.path.split(path)[1].split('.')[0].split('_')[-1])
+    morphology(job_id, path)
diff --git a/scripts/extension/attributes/workflow.py b/scripts/extension/attributes/workflow.py
new file mode 100644
index 0000000..9c344cc
--- /dev/null
+++ b/scripts/extension/attributes/workflow.py
@@ -0,0 +1,82 @@
+import os
+import pandas as pd
+import luigi
+from cluster_tools.cluster_tasks import WorkflowBase
+
+from . import morphology as morpho_tasks
+
+
+class MergeTables(luigi.Task):
+    output_prefix = luigi.Parameter()
+    output_path = luigi.Parameter()
+    max_jobs = luigi.IntParameter()
+    dependency = luigi.TaskParameter()
+
+    def requires(self):
+        return self.dependency
+
+    def run(self):
+        # load all job sub results
+        tables = []
+        for job_id in range(self.max_jobs):
+            path = self.output_prefix + '_job%i.csv' % job_id
+            # NOTE: not all jobs might have been scheduled, so
+            # we neeed to check if the result actually exists
+            if not os.path.exists(path):
+                continue
+            sub_table = pd.read_csv(path, sep='\t')
+            tables.append(sub_table)
+
+        table = pd.concat(tables)
+        table.sort_values('label_id', inplace=True)
+        table.to_csv(self.output_path, index=False, sep='\t')
+
+    def output(self):
+        return luigi.LocalTarget(self.output_path)
+
+
+class MorphologyWorkflow(WorkflowBase):
+    # input volumes and graph
+    segmentation_path = luigi.Parameter()
+    in_table_path = luigi.Parameter()
+    output_path = luigi.Parameter()
+    # resolution of the segmentation at full scale
+    resolution = luigi.ListParameter()
+    # scales of segmentation and raw data used for the computation
+    seg_scale = luigi.IntParameter()
+    raw_scale = luigi.IntParameter(default=3)
+    # prefix
+    prefix = luigi.Parameter()
+    number_of_labels = luigi.IntParameter()
+    # minimum and maximum sizes for objects
+    min_size = luigi.IntParameter()
+    max_size = luigi.IntParameter(default=None)
+    # path for cell nucleus mapping, that is used for additional
+    # table filtering
+    mapping_path = luigi.IntParameter(default='')
+    # input path for intensity calcuation
+    # if '', intensities will not be calculated
+    raw_path = luigi.Parameter(default='')
+
+    def requires(self):
+        out_prefix = os.path.join(self.tmp_folder, 'sub_table_%s' % self.prefix)
+        morpho_task = getattr(morpho_tasks,
+                              self._get_task_name('Morphology'))
+        dep = morpho_task(tmp_folder=self.tmp_folder, config_dir=self.config_dir,
+                          dependency=self.dependency, max_jobs=self.max_jobs,
+                          segmentation_path=self.segmentation_path,
+                          in_table_path=self.in_table_path, output_prefix=out_prefix,
+                          resolution=self.resolution, seg_scale=self.seg_scale, raw_scale=self.raw_scale,
+                          prefix=self.prefix, number_of_labels=self.number_of_labels,
+                          min_size=self.min_size, max_size=self.max_size, mapping_path=self.mapping_path,
+                          raw_path=self.raw_path)
+        dep = MergeTables(output_prefix=out_prefix, output_path=self.output_path,
+                          max_jobs=self.max_jobs, dependency=dep)
+
+        return dep
+
+    @staticmethod
+    def get_config():
+        configs = super(MorphologyWorkflow, MorphologyWorkflow).get_config()
+        configs.update({'morphology': morpho_tasks.MorphologyLocal.default_task_config()})
+        return configs
diff --git a/test/attributes/test_genes.py b/test/attributes/test_genes.py
index ff8fe80..a0a1ca6 100644
--- a/test/attributes/test_genes.py
+++ b/test/attributes/test_genes.py
@@ -12,7 +12,7 @@ class TestGeneAttributes(unittest.TestCase):
     tmp_folder = 'tmp_genes'
     test_file = 'tmp_genes/test_table.csv'
 
-    def _tearDown(self):
+    def tearDown(self):
         try:
             rmtree(self.tmp_folder)
         except OSError:
diff --git a/test/attributes/test_morphology.py b/test/attributes/test_morphology.py
index f92f87e..93207c7 100644
--- a/test/attributes/test_morphology.py
+++ b/test/attributes/test_morphology.py
@@ -1,17 +1,19 @@
 import unittest
 import sys
 import os
+import json
 import numpy as np
+from shutil import rmtree
 sys.path.append('../..')
 
 
 # check new version of gene mapping against original
 class TestMorphologyAttributes(unittest.TestCase):
-    test_file = 'test_table.csv'
+    tmp_folder = 'tmp_morpho'
 
-    def tearDown(self):
+    def _tearDown(self):
         try:
-            os.remove(self.test_file)
+            rmtree(self.tmp_folder)
         except OSError:
             pass
 
@@ -20,15 +22,34 @@ class TestMorphologyAttributes(unittest.TestCase):
                               dtype='float32')
         return table
 
+    def write_global_config(self, conf):
+        config_folder = os.path.join(self.tmp_folder, 'configs')
+        os.makedirs(config_folder, exist_ok=True)
+        shebang = '#! /g/kreshuk/pape/Work/software/conda/miniconda3/envs/cluster_env37/bin/python'
+        conf.update({'shebang': shebang})
+        with open(os.path.join(config_folder, 'global.config'), 'w') as f:
+            json.dump(conf, f)
+
     def test_nucleus_morphology(self):
         from scripts.attributes.morphology import write_morphology_nuclei
+        from scripts.extension.attributes import MorphologyWorkflow
+        from scripts.files import get_h5_path_from_xml
+
+        self.write_global_config(MorphologyWorkflow.get_config()['global'])
+
+        raw_path = '../../data/0.0.0/images/em-raw-full-res.xml'
+        raw_path = get_h5_path_from_xml(raw_path)
 
         # compute and load the morpho table
         seg_path = '../../data/0.0.0/segmentations/em-segmented-nuclei-labels.h5'
         table_in_path = '../../data/0.0.0/tables/em-segmented-nuclei-labels/default.csv'
-        table_out_path = self.test_file
+        table_out_path = os.path.join(self.tmp_folder, 'table_nuclei.csv')
+        res = [.1, .08, .08]
+        n_labels = np.genfromtxt(table_in_path, delimiter='\t', skip_header=1).shape[0]
+
         print("Start computation ...")
-        write_morphology_nuclei(seg_path, table_in_path, table_out_path)
+        write_morphology_nuclei(seg_path, raw_path, table_in_path, table_out_path,
+                                n_labels, res, self.tmp_folder, 'local', 32)
         table = self.load_table(table_out_path)
 
         # load original table, make sure new and old table agree
@@ -39,17 +60,24 @@ class TestMorphologyAttributes(unittest.TestCase):
 
     def test_cell_morphology(self):
         from scripts.attributes.morphology import write_morphology_cells
+        from scripts.extension.attributes import MorphologyWorkflow
+
+        self.write_global_config(MorphologyWorkflow.get_config()['global'])
 
         # compute and load the morpho table
         seg_path = '../../data/0.0.0/segmentations/em-segmented-cells-labels.h5'
         mapping_path = '../../data/0.0.0/tables/em-segmented-cells-labels/objects.csv'
         table_in_path = '../../data/0.0.0/tables/em-segmented-cells-labels/default.csv'
-        table_out_path = self.test_file
+        table_out_path = os.path.join(self.tmp_folder, 'table_cells.csv')
+        res = [.025, .02, .02]
+        n_labels = np.genfromtxt(table_in_path, delimiter='\t', skip_header=1).shape[0]
+
         print("Start computation ...")
-        write_morphology_cells(seg_path, table_in_path, mapping_path, table_out_path)
+        write_morphology_cells(seg_path, table_in_path, mapping_path, table_out_path,
+                               n_labels, res, self.tmp_folder, 'local', 32)
         table = self.load_table(table_out_path)
 
-        # load original table, make sure new and old table agree
+        # make sure new and old table agree
         original_table_file = '../../data/0.0.0/tables/em-segmented-cells-labels/morphology.csv'
         original_table = self.load_table(original_table_file)
         self.assertEqual(table.shape, original_table.shape)
-- 
GitLab