Commit efdaf0b9 authored by Martin Schorb's avatar Martin Schorb
Browse files

Merge branch 'dev' into 'main'

Dev

See merge request !3
parents 68f06f3b c7de341b
Pipeline #34813 passed with stage
in 1 minute and 4 seconds
......@@ -163,17 +163,20 @@ def pointmatch_mcown_dd_sel(mc_own_sel, new_mc, mc_dd_opt, init_match, thispage,
@app.callback([Output({'component': 'new_matchcoll', 'module': MATCH}, 'style'),
Output({'component': 'browse_mc_div', 'module': MATCH}, 'style'),
Output({'component': 'browse_mc', 'module': MATCH}, 'href')],
Input({'component': 'matchcoll_dd', 'module': MATCH}, 'value'),
[Input({'component': 'matchcoll_dd', 'module': MATCH}, 'value'),
Input({'component': 'stack_dd', 'module': MATCH}, 'value')],
[State({'component': 'mc_owner_dd', 'module': MATCH}, 'value'),
State({'component': 'store_owner', 'module': MATCH}, 'data'),
State({'component': 'store_project', 'module': MATCH}, 'data'),
State({'component': 'stack_dd', 'module': MATCH}, 'value'),
State({'component': 'owner_dd', 'module': MATCH}, 'value'),
State({'component': 'project_dd', 'module': MATCH}, 'value'),
State({'component': 'store_all_matchcolls', 'module': MATCH}, 'data')],
prevent_initial_call=True)
def new_matchcoll(mc_sel, mc_owner, owner, project, stack, all_mcs):
def new_matchcoll(mc_sel, stack, mc_owner, owner, project, all_mcs):
if not dash.callback_context.triggered:
raise PreventUpdate
if None in [mc_sel, stack, mc_owner, owner, project, all_mcs]:
raise PreventUpdate
if stack is None:
stack = ''
......
......@@ -417,6 +417,9 @@ for idx in range(params.max_tileviews):
url += '/z/' + str(section)
if 'imurl' not in imparams.keys():
imparams['imurl'] = url + '/jpeg-image?scale=' + str(scale)
if 'zoom' in trigger:
if rectsel == {}:
raise PreventUpdate
......@@ -519,8 +522,6 @@ for idx in range(params.max_tileviews):
outdims = dict()
scale = imparams['scale']
print('p2outerlims')
print(imparams)
for dim in ['X', 'Y']:
# adjust rectangle position in case slice does not cover full volume bounds
......
......@@ -27,11 +27,13 @@ module = 'convert'
inputtypes = [
{'label': 'SBEMImage', 'value': 'SBEM'},
{'label': 'SerialEM Montage', 'value': 'SerialEM'},
# {'label': 'Image stack - FIB/SEM', 'value': 'FIBSEM'},
]
inputmodules = [
'inputtypes.sbem_conv',
'inputtypes.serialem_conv'
'inputtypes.serialem_conv',
# 'inputtypes.tifstack_conv'
]
main = html.Div(children=[html.H3("Import volume EM datasets - Choose type:", id='conv_head'),
......
......@@ -120,6 +120,12 @@ page.append(pathbrowse)
]
)
def export_stacktodir(dir_trigger, trig2, stack_sel, owner, project, allstacks, browsedir):
if not dash.callback_context.triggered:
raise PreventUpdate
if None in [stack_sel, owner, project, allstacks, browsedir]:
raise PreventUpdate
dir_out = browsedir
trigger = hf.trigger()
......
......@@ -206,7 +206,7 @@ def sbem_conv_gobutton(stack_sel, in_dir, click, proj_dd_sel, compute_sel, run_s
run_state['status'] = 'running'
run_state['id'] = sbem_conv_p
run_state['type'] = launch_jobs.runtype(comp_sel)
run_state['type'] = launch_jobs.runtype(compute_sel)
run_state['logfile'] = log_file
else:
......
......@@ -67,7 +67,7 @@ us_out, us_in, us_state = render_selector.init_update_store(label, parent, comp_
@app.callback(us_out, us_in, us_state)
def tilepairs_update_store(*args):
def serialem_conv_update_store(*args):
thispage = args[-1]
args = args[:-1]
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 4 08:42:12 2020
@author: schorb
"""
import dash
from dash import dcc
from dash import html
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate
import os
import json
import requests
import importlib
from app import app
import params
from utils import launch_jobs, pages, checks
from utils import helper_functions as hf
from utils.checks import is_bad_filename
from callbacks import filebrowse, render_selector
# element prefix
parent = "convert"
label = parent + "_tifstack"
# SELECT input directory
# get user name and main group to pre-polulate input directory
group = params.group
# =============================================
# # Page content
store = pages.init_store({}, label)
# Pick source directory
# Pick source directory
directory_sel = html.Div(children=[html.H4("Select dataset root directory:"),
# html.Script(type="text/javascript",children="alert('test')"),
dcc.Input(id={'component': 'path_input', 'module': label}, type="text",
debounce=True,
value=params.default_dir,
persistence=True, className='dir_textinput')
])
pathbrowse = pages.path_browse(label)
page1 = [directory_sel, pathbrowse, html.Div(store)]
# # ===============================================
# RENDER STACK SELECTOR
# Pre-fill render stack selection from previous module
us_out, us_in, us_state = render_selector.init_update_store(label, parent, comp_in='store_render_init')
@app.callback(us_out, us_in, us_state)
def tifstack_conv_update_store(*args):
thispage = args[-1]
args = args[:-1]
thispage = thispage.lstrip('/')
if thispage == '' or thispage not in hf.trigger(key='module'):
raise PreventUpdate
return render_selector.update_store(*args)
page2 = []
page2.append(
html.Div(pages.render_selector(label, create=True, show=['stack', 'project'], header='Select target stack:'),
id={'component': 'render_seldiv', 'module': label})
)
# =============================================
# Start Button
gobutton = html.Div(children=[html.Br(),
html.Button('Start conversion', id=label + "go", disabled=True),
html.Div([], id=label + 'directory-popup', style={'color': '#E00'}),
# dcc.ConfirmDialog(
# id=label+'danger-novaliddir',displayed=False,
# message='The selected directory does not exist or is not readable!'
# ),
html.Br(),
html.Details([html.Summary('Compute location:'),
dcc.RadioItems(
options=[
{'label': 'Cluster (slurm)', 'value': 'slurm'},
{'label': 'locally (this submission node)', 'value': 'standalone'}
],
value='slurm',
labelStyle={'display': 'inline-block'},
id=label + 'compute_sel'
)],
id=label + 'compute')],
style={'display': 'inline-block'})
page2.append(gobutton)
# =============================================
# Processing status
# initialized with store
# embedded from callbacks import runstate
# # =============================================
# # PROGRESS OUTPUT
collapse_stdout = pages.log_output(label)
# ----------------
# Full page layout:
page2.append(collapse_stdout)
# =============================================
# LAUNCH CALLBACK FUNCTION
# =============================================
@app.callback([Output(label + 'go', 'disabled'),
Output(label + 'directory-popup', 'children'),
Output({'component': 'store_launch_status', 'module': label}, 'data'),
Output({'component': 'store_render_launch', 'module': label}, 'data')
],
[Input({'component': 'stack_dd', 'module': label}, 'value'),
Input({'component': 'path_input', 'module': label}, 'value'),
Input(label + 'go', 'n_clicks')
],
[State({'component': 'project_dd', 'module': label}, 'value'),
State(label + 'compute_sel', 'value'),
State({'component': 'store_run_status', 'module': label}, 'data'),
State({'component': 'store_render_launch', 'module': label}, 'data')],
prevent_initial_call=True)
def tifstack_conv_gobutton(stack_sel, in_dir, click, proj_dd_sel, compute_sel, run_state, outstore):
ctx = dash.callback_context
trigger = ctx.triggered[0]['prop_id'].split('.')[0].partition(label)[2]
but_disabled = True
popup = ''
out = run_state
log_file = run_state['logfile']
# outstore = dash.no_update
outstore = dict()
outstore['owner'] = 'FIBSEM'
outstore['project'] = proj_dd_sel
outstore['stack'] = stack_sel
if trigger == 'go':
# launch procedure
# prepare parameters:
run_prefix = launch_jobs.run_prefix()
param_file = params.json_run_dir + '/' + label + '_' + run_prefix + '.json'
run_params = params.render_json.copy()
run_params['render']['owner'] = outstore['owner']
run_params['render']['project'] = proj_dd_sel
with open(os.path.join(params.json_template_dir, 'SBEMImage_importer.json'), 'r') as f:
run_params.update(json.load(f))
run_params['image_file'] = in_dir
run_params['stack'] = stack_sel
with open(param_file, 'w') as f:
json.dump(run_params, f, indent=4)
log_file = params.render_log_dir + '/' + 'serialem_conv_' + run_prefix
err_file = log_file + '.err'
log_file += '.log'
# launch
# -----------------------
sbem_conv_p = launch_jobs.run(target=compute_sel,
pyscript=params.rendermodules_dir +
'/dataimport/generate_EM_tilespecs_from_SerialEMmontage.py',
jsonfile=param_file, run_args=None, logfile=log_file, errfile=err_file)
run_state['status'] = 'running'
run_state['id'] = sbem_conv_p
run_state['type'] = launch_jobs.runtype(comp_sel)
run_state['logfile'] = log_file
else:
outstore = dash.no_update
# check launch conditions and enable/disable button
if any([in_dir == '', in_dir is None]):
if not (run_state['status'] == 'running'):
run_state['status'] = 'wait'
popup = 'No input file chosen.'
elif is_bad_filename(in_dir):
run_state['status'] = 'wait'
popup = 'Wrong characters in input directory path. Please fix!'
elif os.path.isfile(in_dir):
# print(in_dir)
if any([stack_sel == 'newstack', proj_dd_sel == 'newproj']):
if not (run_state['status'] == 'running'):
run_state['status'] = 'wait'
else:
if not (run_state['status'] == 'running'):
run_state['status'] = 'input'
but_disabled = False
else:
if not (run_state['status'] == 'running'):
run_state['status'] = 'wait'
popup = 'Input Data not accessible.'
out['logfile'] = log_file
out['status'] = run_state['status']
return but_disabled, popup, out, outstore
......@@ -5,6 +5,7 @@
import json
import os
import getpass
import glob
import subprocess
import requests
......@@ -23,14 +24,15 @@ conda_dir = '/g/emcf/software/python/miniconda'
render_log_dir = '/g/emcf/software/render-logs'
rendermodules_dir = '/g/emcf/schorb/code/rendermodules-addons/rmaddons'
asap_dir = '/g/emcf/schorb/code/asap-modules/asap/'
spark_dir = '/g/emcf/software/spark-3.0.0-bin-hadoop3.2'
# derived base directories for launchers etc...
# derived directories for launchers etc...
# you can point these to other targets if desired
workdir = os.path.join(base_dir, 'dash')
workdir = os.path.join(base_dir, 'dashUI')
launch_dir = os.path.join(base_dir, 'launchers')
......@@ -44,7 +46,7 @@ json_match_dir = os.path.join(base_dir, 'JSON_parameters', 'MatchTrials')
# defined at the end!
# notification and documentation
user = os.getlogin()
user = getpass.getuser()
email = '@embl.de'
doc_url = 'https://schorb.embl-community.io/volumealign/usage/'
......@@ -89,7 +91,7 @@ min_chunksize = 5e5 # minimum chunk size for n5/zarr export (in bytes)
time_add_buffer = 0.2 # time buffer for job submission (relative)
n_cpu_script = 4
mem_per_cpu = 2 # GB
mem_per_cpu = 4 # GB
n_jobs_default = 8
# standalone
......@@ -99,7 +101,7 @@ n_cpu_standalone = 8
# spark
n_cpu_spark = 200
cpu_pernode_spark = 15
cpu_pernode_spark = 10
spark_port = '8080'
spark_job_port = '4040'
......
......@@ -101,6 +101,14 @@ page.append(page2)
State('url', 'pathname'),
prevent_initial_call=True)
def pointmatch_tp_dd_fill(stack, thispage):
"""
Fills the tilepair dropdown.
:param str stack:
:param str thispage: current page URL
:return: options and value of tilepair dropdown "tp_dd"
:rtype: (list of dict, str)
"""
if stack in (None, ''):
raise PreventUpdate
......@@ -160,6 +168,16 @@ compute_settings = html.Details(children=[html.Summary('Compute settings:'),
State('url', 'pathname')],
prevent_initial_call=True)
def pointmatch_update_compute_settings(*inputs):
"""
Calculates the dynamic fields of `compute_table_cols` when changes of its input parameters occur.
:param (list, dict, str) inputs: [:-2] Input values from "compute_table_cols"<Br>
[-2] factors: factors to multiply the input values with.<b>&#8592; pointmatch_comp_set</b><Br>
[-1] thispage: current page URL
:return: factors that update compute_table_cols values when they are changed themselves.
:rtype: (list, int)
"""
thispage = inputs[-1]
inputs = inputs[:-1]
thispage = thispage.lstrip('/')
......@@ -197,6 +215,13 @@ def pointmatch_update_compute_settings(*inputs):
[Input({'component': 'input_' + col, 'module': module}, 'value') for col in compute_table_cols],
prevent_initial_call=True)
def pointmatch_store_compute_settings(*inputs):
"""
Updates the store of `compute_table_cols` values when changes of input parameters occur.
:param list inputs: Input values from "compute_table_cols"
:return: Store of all values in "compute_table_cols"
:rtype: dict
"""
storage = dict()
in_labels, in_values = hf.input_components()
......
......@@ -85,6 +85,13 @@ page.append(pages.substack_sel(module))
Input({'component': 'pairmode', 'module': module}, 'value'),
prevent_initial_call=True)
def tilepairs_3D_status(pairmode):
"""
Toggles visibility of the 3D parameter selectors.
:param str pairmode: The choice of mode dimensionality. "sec_input1"
:return: CSS display mode
:rtype: dict
"""
if pairmode == '2D':
style = {'display': 'none'}
val = 0
......
......@@ -10,6 +10,7 @@ import os
import subprocess
import dash
import params
import time
import datetime
......@@ -20,7 +21,11 @@ import requests
def args2string(args, separator='='):
"""
Converts arguments as list or dict into a tring to be issued on CLI
Converts arguments as list or dict into a string to be issued on CLI.
If a keyword argument contains a list, it will be issued multiple times.
{'arg: [1,2,3]} -> ' arg=1 arg=2 arg=3 '
This is according to what some Render CL scripts expect when defining
multiple input files.
:param args: list, dict or str of command line arguments
:param str separator: char to separate/link arguments
......@@ -31,12 +36,12 @@ def args2string(args, separator='='):
if args is None:
argstring = ''
elif type(args) == list:
argstring = " ".join(map(str, args))
argstring = ' ' + " ".join(map(str, args))
elif type(args) == dict:
argstring = str()
for item in args.items():
if type(item[1]) is list:
argstring += ' ' + ' '.join([str(item[0]) + separator + currit for currit in item[1]])
argstring += ' ' + ' '.join([str(item[0]) + separator + str(currit) for currit in item[1]])
else:
argstring += ' ' + separator.join(map(str, item))
elif type(args) == str:
......@@ -69,6 +74,7 @@ def status(run_state):
:return: string describing the global processing status and link to status page if available
:rtype: (str, str)
"""
run_state = dict(run_state)
(res_status, link), logfile, jobs = checkstatus(run_state)
......@@ -125,7 +131,7 @@ def checkstatus(run_state):
jobs = j_id
if type(j_id) == dict:
if type(j_id) is dict:
if 'par' in j_id.keys():
runvars = [job for job in j_id['par']]
jobs = runvars
......@@ -145,7 +151,8 @@ def checkstatus(run_state):
if 'done' in checkstatus(newrunstate)[0][0]:
if 'logfile' not in runjob.keys():
runjob['logfile'] = os.path.splitext(logfile)[0] + '_' + str(idx) + os.path.splitext(logfile)[-1]
runjob['logfile'] = os.path.splitext(logfile)[0] + '_' + str(idx) \
+ os.path.splitext(logfile)[-1]
# start next job with these parameters
......@@ -165,14 +172,20 @@ def checkstatus(run_state):
newrunstate['id'] = runjob
return checkstatus(newrunstate)
elif not list(j_id.keys())[0] in params.remote_compute:
elif not 'localhost' in j_id.keys() and not list(j_id.keys())[0] in params.remote_compute:
# parameter list for next sequential job
return 'pending', '', logfile, jobs
if type(j_id) is list:
elif type(j_id) is list:
runvars = j_id
if run_state['type'] in ['standalone', 'generic']:
elif type(j_id) is int:
runvars = [j_id]
elif type(j_id) is str:
runvars = [j_id]
if run_state['type'] in ['standalone', 'generic', 'localhost']:
if run_state['status'] in ['running', 'launch']:
for runvar in runvars:
if type(runvar) is dict:
......@@ -181,7 +194,7 @@ def checkstatus(run_state):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
remotehost = list(runvar.keys())[0]
ssh.connect(remotehost, username=remote_user(remotehost))
ssh.connect(remotehost, username=remote_user(remotehost), timeout=10)
command = 'ps aux | grep "' + params.user + ' *' + str(runvar[remotehost]) + ' "'
......@@ -197,7 +210,7 @@ def checkstatus(run_state):
outstat.append('launch')
continue
else:
elif type(runvar) is int:
if psutil.pid_exists(runvar):
p = psutil.Process(runvar)
......@@ -206,6 +219,8 @@ def checkstatus(run_state):
if not p.status() == 'zombie':
outstat.append('running')
continue
else:
raise TypeError('JOB ID for standalone jobs needs to be dict (host:id) or int for local call.')
if os.path.exists(run_state['logfile'] + '_exit'):
outstat.append('error')
......@@ -411,6 +426,9 @@ def find_activejob(run_state):
:return: single JobID, path to associated log file
"""
if 'seq' not in run_state['id'].keys():
raise TypeError('Jobs need to be sequential!')
for idx, job in enumerate(run_state['id']['seq']):
thisstate = run_state.copy()
thisstate['id'] = job
......@@ -477,8 +495,8 @@ def run(target='standalone',
run_args='',
target_args=None,
special_args=None,
logfile=os.path.join(params.render_log_dir, 'render.out'),
errfile=os.path.join(params.render_log_dir, 'render.err'),
logfile=os.path.join(params.render_log_dir, 'render.log'),
errfile='',
inputs={}):
"""
Launcher of a processing task.
......@@ -529,11 +547,18 @@ def run(target='standalone',
return {'par': outids}
# check target format
if type(target) is not str:
raise TypeError('Target needs to be string.')
my_env = os.environ.copy()
logbase = os.path.basename(logfile).split('.log')[0]
logbase = os.path.splitext(os.path.basename(logfile))[0]
logdir = os.path.dirname(logfile)
if errfile == '':
errfile = os.path.join(logdir, logbase + '.err')
runscriptfile = os.path.join(logdir, logbase + '.sh')
if run_args is None:
......@@ -550,7 +575,7 @@ def run(target='standalone',
print('launching - ')
print(target)
if target == 'standalone' or target in params.remote_compute:
if target in ['standalone', 'localhost'] or target in params.remote_compute:
command = 'bash ' + runscriptfile
runscript.replace('#launch message', 'echo "Launching Render standalone processing script on " `hostname`')
......@@ -561,12 +586,12 @@ def run(target='standalone',