Newer
Older
#! /usr/bin/python
import os
import json
import sys
from subprocess import check_output, CalledProcessError
import luigi
import cluster_tools.utils.function_utils as fu
import cluster_tools.utils.volume_utils as vu
from cluster_tools.utils.task_utils import DummyTask
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from cluster_tools.cluster_tasks import SlurmTask, LocalTask, LSFTask
class ApplyRegistrationBase(luigi.Task):
""" ApplyRegistration base class
"""
task_name = 'apply_registration'
src_file = os.path.abspath(__file__)
allow_retry = False
input_path_file = luigi.Parameter()
output_path_file = luigi.Parameter()
transformation_file = luigi.Parameter()
fiji_executable = luigi.Parameter(default='/g/almf/software/Fiji.app/ImageJ-linux64')
elastix_directory = luigi.Parameter(default='/g/almf/software/elastix_v4.8')
dependency = luigi.TaskParameter(default=DummyTask())
def requires(self):
return self.dependency
def run_impl(self):
# get the global config and init configs
shebang = self.global_config_values()[0]
self.init(shebang)
with open(self.input_path_file) as f:
inputs = json.load(f)
with open(self.output_path_file) as f:
outputs = json.load(f)
assert len(inputs) == len(outputs)
assert all(os.path.exists(inp) for inp in inputs)
n_files = len(inputs)
assert os.path.exists(self.transformation_file)
assert os.path.exists(self.fiji_executable)
assert os.path.exists(self.elastix_directory)
# get the split of file-ids to the volume
file_list = vu.blocks_in_volume((n_files,), (1,))
# we don't need any additional config besides the paths
config = {"input_path_file": self.input_path_file,
"output_path_file": self.output_path_file,
"transformation_file": self.transformation_file,
"fiji_executable": self.fiji_executable,
"elastix_directory": self.elastix_directory,
"tmp_folder": self.tmp_folder}
# prime and run the jobs
n_jobs = min(self.max_jobs, n_files)
self.prepare_jobs(n_jobs, file_list, config)
self.submit_jobs(n_jobs)
# wait till jobs finish and check for job success
self.wait_for_jobs()
self.check_jobs(n_jobs)
class ApplyRegistrationLocal(ApplyRegistrationBase, LocalTask):
"""
ApplyRegistration on local machine
"""
pass
class ApplyRegistrationSlurm(ApplyRegistrationBase, SlurmTask):
"""
ApplyRegistration on slurm cluster
"""
pass
class ApplyRegistrationLSF(ApplyRegistrationBase, LSFTask):
"""
ApplyRegistration on lsf cluster
"""
pass
#
# Implementation
#
def apply_for_file(input_path, output_path,
transformation_file, fiji_executable,
elastix_directory, tmp_folder, n_threads):
assert os.path.exists(elastix_directory)
assert os.path.exists(tmp_folder)
assert os.path.exists(input_path)
assert os.path.exists(transformation_file)
assert os.path.exists(os.path.split(output_path)[0])
# transformix arguments need to be passed as one string,
# with individual arguments comma separated
# the argument to transformaix needs to be one large comma separated string
transformix_argument = ["elastixDirectory=\'%s\'" % elastix_directory,
"workingDirectory=\'%s\'" % tmp_folder,
"inputImageFile=\'%s\'" % input_path,
"transformationFile=\'%s\'" % transformation_file,
"outputFile=\'%s\'" % output_path,
"outputModality=\'Save as BigDataViewer .xml/.h5\'",
"numThreads=\'1\'"] # TODO why do we use numThreads=1 and not the same as in -c?
transformix_argument = ",".join(transformix_argument)
transformix_argument = "\"%s\"" % transformix_argument
# command based on https://github.com/embl-cba/fiji-plugin-elastixWrapper/issues/2:
# srun --mem 16000 -n 1 -N 1 -c 8 -t 30:00 -o $OUT -e $ERR
# /g/almf/software/Fiji.app/ImageJ-linux64 --ij2 --headless --run "Transformix"
# "elastixDirectory='/g/almf/software/elastix_v4.8', workingDirectory='$TMPDIR',
# inputImageFile='$INPUT_IMAGE',transformationFile='/g/cba/exchange/platy-trafos/linear/TransformParameters.BSpline10-3Channels.0.txt
# outputFile='$OUTPUT_IMAGE',outputModality='Save as BigDataViewer .xml/.h5',numThreads='1'"
# NOTE: I ommit --run here, because fiji throws a warning that it does not recognise the argument
cmd = [fiji_executable, "--ij2", "--headless", "\"Transformix\"", transformix_argument]
cmd_str = " ".join(cmd)
fu.log("Calling the following command:")
fu.log(cmd_str)
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
try:
check_output(cmd)
except CalledProcessError as e:
raise RuntimeError(e.output)
def apply_registration(job_id, config_path):
fu.log("start processing job %i" % job_id)
fu.log("reading config from %s" % config_path)
# read the config
with open(config_path) as f:
config = json.load(f)
# get list of the input and output paths
input_file = config['input_path_file']
with open(input_file) as f:
inputs = json.load(f)
output_file = config['output_path_file']
with open(output_file) as f:
outputs = json.load(f)
transformation_file = config['transformation_file']
fiji_executable = config['fiji_executable']
elastix_directory = config['elastix_directory']
tmp_folder = config['tmp_folder']
working_dir = os.path.join(tmp_folder, 'work_dir%i' % job_id)
os.makedirs(working_dir, exist_ok=True)
file_list = config['block_list']
n_threads = config.get('threads_per_job', 1)
fu.log("Applying registration with:")
fu.log("transformation_file: %s" % transformation_file)
fu.log("fiji_executable: %s" % fiji_executable)
fu.log("elastix_directory: %s" % elastix_directory)
for file_id in file_list:
fu.log("start processing block %i" % file_id)
infile = inputs[file_id]
outfile = outputs[file_id]
fu.log("Input: %s" % infile)
fu.log("Output: %s" % outfile)
apply_for_file(infile, outfile,
transformation_file, fiji_executable,
elastix_directory, working_dir, n_threads)
fu.log_block_success(file_id)
fu.log_job_success(job_id)
if __name__ == '__main__':
path = sys.argv[1]
assert os.path.exists(path), path
job_id = int(os.path.split(path)[1].split('.')[0].split('_')[-1])
apply_registration(job_id, path)