#!/usr/bin/env bash ## # Simple helper script to submit multiple jobs as arrays to SGE, SLURM and LSF # # Copyright 2016-2017 Renato Alves # Licensed under MIT (https://en.wikipedia.org/wiki/MIT_License) ## ERROR="ERROR:" WARNING="WARNING:" NAME="Job" LOGFILE="job.log" CORES="1" RESERVE="" NODES="1" MEM="4" # 4 GB by default TIME="" MEMALL="$MEM" ACTIVE="10" QUEUE="" CHARGE="" PREEMPT="normal" TARGET="" FEATURES="" EMAIL="" EMAILWHEN="" WAITFOR="" RAW="" FATAL="fatal" BEGIN="" PRE_CMDS="" TASKS="1" SGE_MEM_REQ="h_vmem" SGE_MEMFREE="" VERBOSE="0" _NO_BORK=1 _DEBUG=0 PARSE_JOBID="awk 'match(\$0,/[0-9]+/){print substr(\$0, RSTART, RLENGTH)}'" # Use engine information from filename as default SYSTEM="$(basename "$0" | tr "[:lower:]" "[:upper:]" | cut -d'-' -f2)" _SELF="$(basename "$0" | tr "[:lower:]" "[:upper:]")" # Find location of current script on disk so we can find helper scripts SOURCE="${BASH_SOURCE[0]}" while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" SOURCE="$(readlink "$SOURCE")" [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located done _SELF_LOCATION="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # If running the base script, default to SLURM [ "$SYSTEM" == "$_SELF" ] && SYSTEM="SLURM" usage() { echo >&2 "" echo >&2 "This command takes a file containing one job per line and queues them as an array job" echo >&2 "" echo >&2 "Usage:" echo >&2 " $0 [parameters] jobs" echo >&2 "" echo >&2 "Parameters:" echo >&2 " Required:" echo >&2 " jobs = a file containing one job per line. Comments and empty lines are ignored." echo >&2 "" echo >&2 " Conditionally optional:" echo >&2 " -s --system = queueing system to use. Defaults to ${SYSTEM}" echo >&2 " Supported options: SGE, SLURM, LSF" echo >&2 "" echo >&2 " Standard options:" echo >&2 " -n --name = name to give to the job. Defaults to ${NAME}" echo >&2 " -c --cores = how many cores/slots for each job. Defaults to ${CORES}" echo >&2 " * -N --nodes = how many nodes to spread across (LSF/SLURM). Defaults to ${NODES}" echo >&2 " -m --mem = how much RAM per job (in GB). Defaults to ${MEM}G/job" echo >&2 " for less than 1G/job use fractionals such as 0.1 (100M/job)" echo >&2 " -a --active = limit number of simultaneously active jobs. Defaults to ${ACTIVE}" echo >&2 " -q --queue = which queue to use. Uses cluster's default if unspecified" echo >&2 " * -k --time = maximum execution time. Unlimited if unspecified. (hh:mm[:ss])" echo >&2 " * -p --preempt = specify preemption level (aka QoS) (SLURM). Defaults to ${PREEMPT}" echo >&2 " * -t --target = restricts to running on the given host(s) [comma separated] (SGE)" echo >&2 " * -F --features = request specific resources to constrain your jobs (SLURM)" echo >&2 " -l --logfile = filename to use as stdout/stderr of each job. Defaults to ${LOGFILE}" echo >&2 " -e --email = email to deliver job notifications. Defaults to no notifications" echo >&2 " * -E --emailwhen = when to send an email (SGE/SLURM). Defaults to end and fail" echo >&2 " * -w --waitfor = waits for another job to finish before start (SGE/SLURM)." echo >&2 " on SGE this can be either '--name' or the job_id. On others only job_id" echo >&2 " * -C --charge = charge computation to this account" echo >&2 " * -b --begin = delay the start of a job by providing a starting timestamp." echo >&2 " -f --nonfatal = if set non-zero exits do *not* abort the job. Defaults to ${FATAL}" echo >&2 "" echo >&2 " Advanced:" echo >&2 " -d --debug = dry-run and debug: 1 = Prints the job payload to screen and exits. (dry-run)" echo >&2 " 2 = Echoes the queued commands to log. Submits a dummy job. (dry-run)" echo >&2 " 3 = Runs the script locally without queue interaction. (no-queue)" echo >&2 " -v --verbose = submits the job with in high verbosity set in the executing shell" echo >&2 " useful to debug issues with % commands and .bashrc settings." echo >&2 " * -r --raw = raw options passed directly to the queue system. (e.g. --gres on SLURM)" echo >&2 " useful to use any option that is not directly supported by this script." echo >&2 " for more than one option use quotes (e.g. -r '--arg1 --arg2')" echo >&2 " * -T --tasks = request a specific number of tasks per job (SLURM). Defaults to ${TASKS}" echo >&2 " * -M --memrss = request for RSS memory instead of VMEM (SGE)" echo >&2 " * -R --memfree = in addition to VMEM/RSS also set mem_free to the same value (SGE)" echo >&2 " * -B --bork = load Bork Group specific modulefiles" echo >&2 "" echo >&2 " Notes:" echo >&2 " Options marked with * have system specific syntax or restrictions." echo >&2 " Check the manpage of qsub/bsub/sbatch for additional information." echo >&2 " The job_id is usually printed to the screen when the job is queued. Use for job dependencies." echo >&2 " \${Q_MEM} holds the computed value in MB. On SGE/SLURM this is MB/core, on LSF MB/job." echo >&2 " \${Q_MEMALL} holds the total job memory in MB. On LSF Q_MEM = Q_MEMALL." echo >&2 "" echo >&2 " Lines starting with the following characters have special meanings:" echo >&2 " cmd; cmd = normal commands pipelined as a single job." echo >&2 " # comment = comments will be ignored." echo >&2 " % cmd = commands that will run before the actual job starts (for all jobs)." echo >&2 " use this to 'module add conda' or 'export MYDB=/path/to/db'" echo >&2 " @# cmd = repeated/multiplexed jobs - where # is a number with 1-n digits." echo >&2 " e.g. @5 command - will run 'command' as 5 jobs." echo >&2 " This is useful with MPI jobs, worker threads/processes and" echo >&2 " parallelizable pipelines such ngless and jug" echo >&2 "" echo >&2 " In addition to standard SGE/LSF/SLURM environment variables, the following" echo >&2 " can be used in the jobs file to refer to the values specified/used." echo >&2 " \${Q_NAME} \${Q_CORES} \${Q_MEM} \${Q_MEMALL} \${Q_TIME} \${Q_ACTIVE}" echo >&2 " \${Q_QUEUE} \${Q_LOGFILE} \${Q_WAITFOR}" echo >&2 "" echo >&2 "" } required_arg() { usage echo >&2 "${ERROR} '$1' is a required argument." echo >&2 "" exit 1 } invalid_pattern() { usage echo >&2 "${ERROR} Line '$1' has an invalid repeat pattern" echo >&2 "" exit 1 } generic_warning() { echo >&2 "${WARNING} $1" echo >&2 "" } generic_error() { usage echo >&2 "${ERROR} $1" echo >&2 "" exit 1 } to_int() { # Convert a float to int by truncating on the decimal character echo "${1%%.*}" } float_eval() { # Evaluate a floating point number expression. local stat=0 local result=0.0 if [[ $# -gt 0 ]]; then result=$(awk "BEGIN { print $* }" 2>/dev/null) stat=$? if [[ $stat -eq 0 && -z "$result" ]]; then stat=1; fi fi echo "$result" return $stat } local_debug() { # Disable fatal error checking set +e if [ "$SYSTEM" == "SGE" ]; then export SGE_TASK_ID="" elif [ "$SYSTEM" == "SLURM" ]; then export SLURM_ARRAY_TASK_ID="" elif [ "$SYSTEM" == "LSF" ]; then export LSB_JOBINDEX="" fi } prepare_jobs() { pre_cmds=() s_jobs=() while read -r line; do if [[ $line = \@* ]]; then # Lines starting with @n are multiplied n times pattern='^@([1-9][0-9]*) *(.*)$' [[ $line =~ $pattern ]] || invalid_pattern "$line" num=${BASH_REMATCH[1]} cmd=${BASH_REMATCH[2]} for ((i=1;i<=num;i++)); do s_jobs+=("$cmd") done continue elif [[ $line = %* ]]; then # Lines starting with % are run for every job pattern='^% *(.*)$' [[ $line =~ $pattern ]] || invalid_pattern "$line" pre_cmds+=("${BASH_REMATCH[1]}") continue elif [[ $line = \#* ]]; then # Ignore lines with comments continue elif [[ $line = '' ]]; then # Ignore empty lines continue else s_jobs+=("$line") fi done < "${JOBFILE}" [ "$VERBOSE" == "1" ] && pre_cmds=("set -x" "${pre_cmds[@]}") PRE_CMDS="$(printf '%s\n' "${pre_cmds[@]}")" # sed here replaced all single ' by escaped versions '"'"' JOBS_TO_RUN="$(printf '%s\n' "${s_jobs[@]}" | sed "s:\x27:\x27\"\x27\"\x27:g")" if [ "$_NO_BORK" == "0" ]; then MODULE_PRELOAD="# If there are TCL modules available on the system, load them # this should be true for most Bork servers. if [ -f /etc/profile.d/modules.sh ]; then source /etc/profile.d/modules.sh fi # Then either if TCL or LUA modules is setup make bork modulefiles available if declare -f module > /dev/null ; then module load module use --append /g/scb2/bork/mocat/software/bork_modulefiles/ fi" fi } sge_submit() { # Submit job to Sun Grid Engine cat << EOF | ${LOCALCMD} #!/usr/bin/env bash #$ -t ${JOB_COUNT} #$ -pe smp ${CORES}${RESERVE} #$ -l ${SGE_MEM_REQ}=${MEM}M${SGE_MEMFREE}${TARGET} #$ -cwd #$ -o ${LOGFILE} #$ -j yes #$ -N ${NAME} #$ -S /bin/bash #$ -tc ${ACTIVE} ${BANG_BEGIN} ${BANG_TIME} ${BANG_QUEUE} ${BANG_EMAIL} ${BANG_EMAIL_WHEN} ${BANG_WAITFOR} ${BANG_CHARGE} ${BANG_RAW} export Q_NAME="${NAME}" export Q_LOGFILE="${LOGFILE}" export Q_CORES="${CORES}" export Q_MEM="${MEM}" export Q_MEMALL="${MEMALL}" export Q_TIME="${TIME}" export Q_ACTIVE="${ACTIVE}" export Q_QUEUE="${QUEUE}" export Q_WAITFOR="${WAITFOR}" # Source any local etc profile [ -f $HOME/.bashrc ] && . $HOME/.bashrc ${MODULE_PRELOAD} # Commands ran before the actual main work ${PRE_CMDS} # If requested, enable tracebacks only after pre-commands to avoid bash-strictness failures ${FATAL_CMD} # Comments and empty lines are removed from the job submission script SELECTED_JOB=\$(echo '${JOBS_TO_RUN} ' | sed -n "\${SGE_TASK_ID}p") ${REMOTECMD} "\${SELECTED_JOB}" EOF } slurm_submit() { # Submit job to SLURM cat << EOF | ${LOCALCMD} #!/usr/bin/srun /usr/bin/bash #SBATCH --array=${JOB_COUNT}%${ACTIVE} #SBATCH --cpus-per-task=${CORES} #SBATCH --nodes=${NODES} #SBATCH --ntasks=${TASKS} #SBATCH --mem-per-cpu=${MEM}M #SBATCH --output=${LOGFILE} #SBATCH --open-mode=append #SBATCH --job-name=${NAME} #SBATCH --qos=${PREEMPT} ${BANG_BEGIN} ${BANG_TIME} ${BANG_QUEUE} ${BANG_EMAIL} ${BANG_EMAIL_WHEN} ${BANG_WAITFOR} ${BANG_CHARGE} ${BANG_FEATURES} ${BANG_RAW} export Q_NAME="${NAME}" export Q_LOGFILE="${LOGFILE}" export Q_CORES="${CORES}" export Q_MEM="${MEM}" export Q_MEMALL="${MEMALL}" export Q_TIME="${TIME}" export Q_ACTIVE="${ACTIVE}" export Q_QUEUE="${QUEUE}" export Q_WAITFOR="${WAITFOR}" # Source any local etc profile [ -f $HOME/.bashrc ] && . $HOME/.bashrc ${MODULE_PRELOAD} # Commands ran before the actual main work ${PRE_CMDS} # If requested, enable tracebacks only after pre-commands to avoid bash-strictness failures ${FATAL_CMD} # Comments and empty lines are removed from the job submission script SELECTED_JOB=\$(echo '${JOBS_TO_RUN} ' | sed -n "\${SLURM_ARRAY_TASK_ID}p") ${REMOTECMD} "\${SELECTED_JOB}" EOF } lsf_submit() { # Submit job to LSF cat << EOF | ${LOCALCMD} #!/usr/bin/env bash #BSUB -n ${CORES} #BSUB -M ${MEM} #BSUB -o ${LOGFILE} #BSUB -J ${NAME}[${JOB_COUNT}]%${ACTIVE} #BSUB -R "select[(mem>=${MEM})] rusage[mem=${MEM}] span[hosts=${NODES}]" ${BANG_BEGIN} ${BANG_TIME} ${BANG_QUEUE} ${BANG_EMAIL} ${BANG_WAITFOR} ${BANG_RAW} ${EXPORT_PROJECT} export Q_NAME="${NAME}" export Q_LOGFILE="${LOGFILE}" export Q_CORES="${CORES}" export Q_MEM="${MEM}" export Q_MEMALL="${MEMALL}" export Q_TIME="${TIME}" export Q_ACTIVE="${ACTIVE}" export Q_QUEUE="${QUEUE}" export Q_WAITFOR="${WAITFOR}" # Source any local etc profile [ -f $HOME/.bashrc ] && . $HOME/.bashrc ${MODULE_PRELOAD} # Commands ran before the actual main work ${PRE_CMDS} # If requested, enable tracebacks only after pre-commands to avoid bash-strictness failures ${FATAL_CMD} # Comments and empty lines are removed from the job submission script SELECTED_JOB=\$(echo '${JOBS_TO_RUN} ' | sed -n "\${LSB_JOBINDEX}p") ${REMOTECMD} "\${SELECTED_JOB}" EOF } ARG_PARSE="getopt -o s:n:c:N:m:a:q:k:p:t:F:l:e:E:w:C:b:d:vr:T:MRBfh -l system:,name:,cores:,nodes:,mem:,active:,queue:,time:,preempt:,target:,features:,logfile:,email:,emailwhen:,waitfor:,charge:,begin:,debug:,verbose,raw:,tasks:,memrss,memfree,bork,fatal,help -n $0 --" # We process arguments twice to handle any argument parsing error: if ! ARG_ERROR=$($ARG_PARSE "$@" 2>&1 1>/dev/null); then generic_error "$ARG_ERROR" fi # Abort on any errors from this point onwards set -e # Parse args using getopt (instead of getopts) to allow arguments before options ARGS=$($ARG_PARSE "$@") # reorganize arguments as returned by getopt eval set -- "$ARGS" while true; do case "$1" in # Shift before to throw away option # Shift after if option has a required positional argument -s|--system) shift SYSTEM="$1" shift ;; -n|--name) shift NAME="$1" shift ;; -c|--cores) shift CORES="$1" shift ;; -N|--nodes) shift NODES="$1" shift ;; -m|--mem) shift MEM="$1" shift ;; -T|--tasks) shift TASKS="$1" shift ;; -M|--memrss) shift SGE_MEM_REQ="h_rss" ;; -R|--memfree) shift SGE_MEMFREE="1" ;; -B|--bork) shift _NO_BORK="0" ;; -a|--active) shift ACTIVE="$1" shift ;; -q|--queue) shift QUEUE="$1" shift ;; -k|--time) shift TIME="$1" shift ;; -p|--preempt) shift PREEMPT="$1" shift ;; -t|--target) shift TARGET="$1" shift ;; -F|--features) shift FEATURES="$1" shift ;; -l|--logfile) shift LOGFILE="$1" shift ;; -e|--email) shift EMAIL="$1" shift ;; -E|--emailwhen) shift EMAILWHEN="$1" shift ;; -w|--waitfor) shift WAITFOR="$1" shift ;; -C|--charge) shift CHARGE="$1" shift ;; -b|--begin) shift BEGIN="$1" shift ;; -d|--debug) shift _DEBUG="$1" shift ;; -v|--verbose) shift VERBOSE="1" ;; -r|--raw) shift RAW="$1" shift ;; -f|--nonfatal) shift FATAL="" ;; -h|--help) shift usage exit 1 ;; --) shift break ;; esac done # Software dependencies command -v awk &>/dev/null || generic_error "The command 'awk' was not found on the system and is required for execution." # Job requirements JOBFILE="$1" [ -n "$JOBFILE" ] || required_arg "jobs" [ ! -d "$JOBFILE" ] || generic_error "Path '$JOBFILE' cannot be a directory" { [ -e "$JOBFILE" ] && [ -r "$JOBFILE" ]; } || generic_error "File '$JOBFILE' doesn't exist or is not readable" [ "$CORES" -gt 0 ] || generic_error "Number of cores must be an integer and greater than 0" # Special variables. FATAL sets +e which causes any error to exit the script if [ "$FATAL" == "fatal" ]; then _TRACEBACK="${_SELF_LOCATION}/../modules/helpers/traceback.bash" if [ ! -f "$_TRACEBACK" ]; then generic_warning "Couldn't find helper module at '$_TRACEBACK'. For extra debugging information you may wish to download it from https://git.embl.de/ralves/embltilities/-/tree/master/modules/helpers" FATAL_CMD="set -eu" else FATAL_CMD=". ${_TRACEBACK}" fi fi case $MEM in ''|*[!.0-9]*|*..*) generic_error "Invalid memory specification - integer or float only (bad: -m 10G; good: -m 10)" ;; *) ;; esac prepare_jobs # defines PRE_CMDS and JOBS_TO_RUN JOB_COUNT=$(echo "${JOBS_TO_RUN}" | wc -l) # Set array range based on number of jobs. A single job is not an array if [ "$JOB_COUNT" == "0" ]; then generic_error "The jobfile contains no job lines" elif [ "$JOB_COUNT" != "1" ]; then JOB_COUNT="1-$JOB_COUNT" fi if [ "$SYSTEM" == "SGE" ]; then SCHEDULE_CMD="qsub" SCHEDULE_FUNC="sge_submit" [ "$QUEUE" != "" ] && BANG_QUEUE="#$ -q ${QUEUE}" [ "$WAITFOR" != "" ] && BANG_WAITFOR="#$ -hold_jid ${WAITFOR}" [ "$BEGIN" != "" ] && BANG_BEGIN="#$ -a ${BEGIN}" [ "$TIME" != "" ] && BANG_TIME="#$ -l h_rt=${TIME}" [ "$RAW" != "" ] && BANG_RAW="#$ ${RAW}" [ "$CHARGE" != "" ] && BANG_CHARGE="#$ -A ${CHARGE}" [ "$TARGET" != "" ] && TARGET=",h=${TARGET}" [ "$CORES" -gt "1" ] && RESERVE=" -R y" if [ "$EMAIL" != "" ]; then BANG_EMAIL="#$ -M ${EMAIL}" if [ "$EMAILWHEN" != "" ]; then BANG_EMAIL_WHEN="#$ -m ${EMAILWHEN}" else # End (e), Abort (a) and Suspend (s) BANG_EMAIL_WHEN="#$ -m eas" fi fi MEMALL=$(to_int "$(float_eval "$MEM * 1024")") # On SGE memory needs to be provided per/slot or core MEM=$(to_int "$(float_eval "$MEM / $CORES * 1024")") [ "$SGE_MEMFREE" != "" ] && SGE_MEMFREE=",mem_free=${MEM}M" elif [ "$SYSTEM" == "SLURM" ]; then SCHEDULE_CMD="sbatch" SCHEDULE_FUNC="slurm_submit" [ "$QUEUE" != "" ] && BANG_QUEUE="#SBATCH --partition=${QUEUE}" [ "$WAITFOR" != "" ] && BANG_WAITFOR="#SBATCH --depend=${WAITFOR}" [ "$BEGIN" != "" ] && BANG_BEGIN="#SBATCH --begin=${BEGIN}" [ "$TIME" != "" ] && BANG_TIME="#SBATCH --time=${TIME}" [ "$FEATURES" != "" ] && BANG_FEATURES="#SBATCH --constraint=${FEATURES}" [ "$RAW" != "" ] && BANG_RAW="#SBATCH ${RAW}" [ "$CHARGE" != "" ] && BANG_CHARGE="#SBATCH -A ${CHARGE}" if [ "$EMAIL" != "" ]; then BANG_EMAIL="#SBATCH --mail-user=${EMAIL}" if [ "$EMAILWHEN" != "" ]; then BANG_EMAIL_WHEN="#SBATCH --mail-type=${EMAILWHEN}" else BANG_EMAIL_WHEN="#SBATCH --mail-type=END,FAIL,REQUEUE" fi fi MEMALL=$(to_int "$(float_eval "$MEM * 1024")") # On SLURM memory also needs to be provided per CPU and we use MB so we can use an integer MEM=$(to_int "$(float_eval "$MEM / $CORES * 1024")") elif [ "$SYSTEM" == "LSF" ]; then SCHEDULE_CMD="bsub" SCHEDULE_FUNC="lsf_submit" [ "$QUEUE" != "" ] && BANG_QUEUE="#BSUB -q ${QUEUE}" [ "$WAITFOR" != "" ] && BANG_WAITFOR="#BSUB -w ${WAITFOR}" [ "$BEGIN" != "" ] && BANG_BEGIN="#BSUB -b ${BEGIN}" [ "$TIME" != "" ] && BANG_TIME="#BSUB -W ${TIME}" [ "$RAW" != "" ] && BANG_RAW="#BSUB ${RAW}" [ "$CHARGE" != "" ] && EXPORT_PROJECT="export LSB_DEFAULTPROJECT='${CHARGE}'" if [ "$EMAIL" != "" ]; then BANG_EMAIL="#BSUB -u ${EMAIL}" # LSF doesn't have different email options fi # On LSF, it needs to be provided as MB and strictly as an integer MEMALL=$(to_int "$(float_eval "$MEM * 1024")") # LSF MEM and MEMALL are the same MEM=${MEMALL} else generic_error "$SYSTEM system not supported" fi LOCALCMD="$SCHEDULE_CMD" REMOTECMD="eval" [ "$_DEBUG" == "1" ] && PARSE_JOBID="cat" && LOCALCMD="cat" # Prints the job payload to screen and exists [ "$_DEBUG" == "2" ] && REMOTECMD="echo" # Echoes the queued commands to log. Submits a dummy job [ "$_DEBUG" == "3" ] && local_debug && PARSE_JOBID="cat" && LOCALCMD="source /dev/stdin" # Runs the script locally eval "$SCHEDULE_FUNC" | eval "$PARSE_JOBID"