############################################ # Libraries, versions, authors and license # ############################################ from snakemake.utils import min_version import subprocess import os import pandas import numpy import socket # Enforce a minimum Snakemake version because of various features min_version("4.3") __author__ = "Christian Arnold & Ivan Berest" __license__ = "MIT" ############################################ # Working directory and configuration file # ############################################ # Not needed, will be provided via the command line in Snakemake ######## # MISC # ######## # Make the output nicer and easier to follow ruleDisplayMessage = "\n\n########################\n# START EXECUTING RULE #\n########################\n" # The onsuccess handler is executed if the workflow finished without error. onsuccess: print("\n\n###############################\n# Workflow finished, no error #\n###############################\n\n") # Else, the onerror handler is executed. onerror: print("\n\n#####################\n# An error occurred #\n#####################\n\n") #shell("mail -s "an error occurred" carnold@embl.de < {log}") # onstart handler will be executed before the workflow starts. Note that dry-runs do not trigger any of the handlers onstart: print("Reading samples and metadata....\n") print ("Running workflow for the following " + str(len(allTF)) + " TF:\n " + ' \n '.join(map(str, allTF))) print ("Running workflow for the following BAM files:\n " + ' \n '.join(map(str, allBamFiles))) def read_samplesTable(samplesSummaryFile, consensusPeaks): """text""" data = pandas.read_table(samplesSummaryFile) # Expect a particular number of columns, do a sanity check here if not consensusPeaks: if not {'SampleID', 'bamReads', 'conditionSummary', 'Peaks'}.issubset(data.columns.values): raise KeyError("The samples file must contain at least the following named columns (TAB separated!): 'SampleID', 'bamReads', 'conditionSummary', 'Peaks'") else: if not {'SampleID', 'bamReads', 'conditionSummary'}.issubset(data.columns.values): raise KeyError("The samples file must contain at least the following named columns (TAB separated!): 'SampleID', 'bamReads', 'conditionSummary'") return data ############################# # DIRECTORIES AND VARIABLES # ############################# # Maximum number of cores per rule. This value will never be achieved because the minimum of this value and the --cores parameter will define the number of CPUs per rule in the end. threadsMax = 16 # Input files samplesSummaryFile = config["samples"]["summaryFile"] extDir = "extension" + str(config["par_general"]["regionExtension"]) ROOT_DIR = config["par_general"]["outdir"] FINAL_DIR = ROOT_DIR + "/FINAL_OUTPUT/" + extDir TF_DIR = ROOT_DIR + "/TF-SPECIFIC" PEAKS_DIR = ROOT_DIR + "/PEAKS" LOG_BENCHMARK_DIR = ROOT_DIR + "/LOGS_AND_BENCHMARKS" TEMP_DIR = ROOT_DIR + "/TEMP" TEMP_EXTENSION_DIR = ROOT_DIR + "/TEMP/" + extDir global samplesData samplesData = read_samplesTable(config["samples"]["summaryFile"], config["peaks"]["consensusPeaks"]) allBamFiles = samplesData.loc[:,"bamReads"] allBamFilesBasename = [] for fileCur in allBamFiles: if not os.path.isfile(fileCur): raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") not found.") if not os.path.isfile(fileCur + ".bai"): raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") does not have an index (corresponding *.bam.bai). All BAM files must be indexed (e.g., use samtools index)") basename = os.path.splitext(os.path.basename(fileCur))[0] allBamFilesBasename.append(basename) # Add new column samplesData = samplesData.assign(basename = allBamFilesBasename) # Check existance of all specified peak files if not config["peaks"]["consensusPeaks"]: allPeakFiles = samplesData.loc[:,"Peaks"] for fileCur in allPeakFiles: if not os.path.isfile(fileCur): raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") not found.") else: allPeakFiles = [] regionExt = config["par_general"]["regionExtension"] nPerm = config["par_general"]["nPermutations"] nBootstraps = config["par_general"]["nBootstraps"] nCGBins = config["par_general"]["nCGBins"] if config["par_general"]["comparisonType"] != "": compType = config["par_general"]["comparisonType"] + "." else: compType = "" allTF = [] if config["par_general"]["TFs"] == "all": PWM_FILES = os.popen("ls " + config["additionalInputFiles"]["dir_PWMScan"]).readlines() for TFCur in PWM_FILES: TFCurBasename = os.path.basename(TFCur.replace('\n', '').replace('_pwmscan.bed', '')) allTF.append(TFCurBasename) else: TFArray = config["par_general"]["TFs"].replace(" ", "").split(',') for TFCur in TFArray: fileCur = config["additionalInputFiles"]["dir_PWMScan"] + "/" + TFCur + "_pwmscan.bed" if not os.path.isfile(fileCur): raise IOError("The TF " + TFCur + " is in the list of TFs to process, but the file \"" + fileCur + "\" is missing. Check the folder " + config["additionalInputFiles"]["dir_PWMScan"] + " and \"par_general\": \"TFs\")") allTF.append(TFCur) if len(allTF) == 0: raise WorkflowError("The list of TFs is empty. Adjust the parameter \"par_general\": \"TFs\".") if not os.path.isfile(config["additionalInputFiles"]["refGenome_fasta"]): raise IOError("File \"" + config["additionalInputFiles"]["refGenome_fasta"] + "\" not found.") if config["par_general"]["RNASeqIntegration"]: filenameCur = config["additionalInputFiles"]["HOCOMOCO_mapping"] if not os.path.isfile(filenameCur): raise IOError("File \"" + filenameCur + "\" not found.") filenameCur = config["additionalInputFiles"]["RNASeqCounts"] if not os.path.isfile(filenameCur): raise IOError("File \"" + filenameCur + "\" not found.") if config["peaks"]["consensusPeaks"]: filenameCur = config["peaks"]["consensusPeaks"] if not os.path.isfile(filenameCur): raise IOError("File \"" + filenameCur + "\" not found.") # Check if it contains scientific notification # the || true in the end ensures the exit status of grep is 0, because this would raise an error otherwise nHits = int(subprocess.check_output('grep -c "e+" ' + config["peaks"]["consensusPeaks"] + " || true", shell=True)) if nHits > 0: raise AssertionError("File " + config["peaks"]["consensusPeaks"] + " contains at least one line with the scientific notation (e+). This will cause errors in subsequent steps. Check the file and transform all \"e+\" coordinates.") dir_scripts = config["par_general"]["dir_scripts"] script_createConsensusPeaks = dir_scripts + "/1.createConsensusPeaks.R" script_DESeqPeaks = dir_scripts + "/2.DESeqPeaks.R" script_analyzeTF = dir_scripts + "/3.analyzeTFNew.R" script_summary1 = dir_scripts + "/4.summary1.R" script_prepareBinning = dir_scripts + "/5.prepareBinning.R" script_binningTF = dir_scripts + "/6.binningTF.R" script_summaryFinal = dir_scripts + "/7.summaryFinal.R" for scriptNameCur in [script_createConsensusPeaks, script_DESeqPeaks, script_analyzeTF, script_summary1, script_prepareBinning, script_binningTF, script_summaryFinal]: if not os.path.isfile(scriptNameCur): raise IOError("File " + scriptNameCur + " not found.") ######### # RULES # ######### # For cluster usage: The keyword localrules allows to mark a rule as local, so that it is not submitted to the cluster and instead executed on the host node localrules: all, link_inputFiles rule all: input: perm = FINAL_DIR + "/" + compType + "summary.circular.pdf", summaryLogs = LOG_BENCHMARK_DIR + "/" + compType + "all.warnings.log" rule produceConsensusPeaks: input: peaks = allPeakFiles output: consensusPeaks_bed = PEAKS_DIR + "/" + compType + "consensusPeaks.bed", summaryPlot = PEAKS_DIR + "/" + compType + "consensusPeaks_lengthDistribution.pdf" log: expand('{dir}/1.produceConsensusPeaks.R.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Calculate consensus peaks for all peak files with the script {script_createConsensusPeaks}..." threads: 1 params: script: script_createConsensusPeaks # Forces the execution of the rule above just when the user did not provide a consensus peak file def retrieveConsensusPeakFile (par_consensusPeaks): if not par_consensusPeaks: return rules.produceConsensusPeaks.output.consensusPeaks_bed else: return config["peaks"]["consensusPeaks"] rule filterSexChromosomesAndSortPeaks: input: consensusPeaks = retrieveConsensusPeakFile(config["peaks"]["consensusPeaks"]) output: consensusPeaks_filtered = TEMP_DIR + "/" + compType + "consensusPeaks.filtered.bed", consensusPeaks_sorted = PEAKS_DIR + "/" + compType + "consensusPeaks.filtered.sorted.bed" message: "{ruleDisplayMessage}Filter sex and unassembled chromosomes..." threads: 1 shell: """ grep -vP "^chrX|^chrY|^chrM|^chrUn|random|hap|_gl" {input.consensusPeaks} > {output.consensusPeaks_filtered} && sort -k1,1 -k2,2n {output.consensusPeaks_filtered} > {output.consensusPeaks_sorted} """ peakFileEndingPattern = ".overlapPeaks.bed.gz" rule sortPWM: input: bed = config["additionalInputFiles"]["dir_PWMScan"] + "/{TF}_pwmscan.bed" output: bedSorted = TEMP_DIR + "/" + compType + "{TF}_pwmscan.sorted.bed.gz" message: "{ruleDisplayMessage} Sort bed file {input.bed}..." threads: 1 shell: """sh -c 'sort -k1,1 -k2,2n {input.bed} | gzip > {output.bedSorted}'""" def getBamFileFromBasename(basename): """text""" hit = numpy.asarray(samplesData.loc[samplesData["basename"] == basename, "bamReads"]) if len(hit) != 1: raise KeyError("Could not uniquely retrieve the BAM file for the basename \"" + basename + "\" from the file " + config["samples"]["summaryFile"]) return hit rule idxstatsBAM: input: bam = lambda wildcards: getBamFileFromBasename(wildcards.basename) output: idxoutput = temp(TEMP_DIR + "/" + compType + "{basename}.idxstats"), genomeFile = TEMP_DIR + "/" + compType + "{basename}.chrOrder1", genomeFile2 = TEMP_DIR + "/" + compType + "{basename}.chrOrder2" log: message: "{ruleDisplayMessage} Run idxstats for file {input.bam}..." threads: 1 params: shell: """ samtools idxstats {input.bam} > {output.idxoutput} && cat {output.idxoutput} | grep -v "^*" |awk '{{print $1"\\t"$2}}' >{output.genomeFile} && cat {output.idxoutput} | grep -v "^*" |awk '{{print $1}}' >{output.genomeFile2} """ rule intersectPeaksAndBAM: input: consensusPeaks = rules.filterSexChromosomesAndSortPeaks.output.consensusPeaks_sorted, genomeFile = rules.idxstatsBAM.output.genomeFile, genomeFile2 = rules.idxstatsBAM.output.genomeFile2, bam = lambda wildcards: getBamFileFromBasename(wildcards.basename) output: peaksOverlap = PEAKS_DIR + '/' + compType + '{basename}' + peakFileEndingPattern, resortedBed = TEMP_DIR + '/' + compType + '{basename}.' + "resorted.gz" log: message: "{ruleDisplayMessage} Run intersect and count for file {input.consensusPeaks} with {input.bam}..." threads: 1 params: shell: """ bedtools sort -faidx {input.genomeFile2} -i {input.consensusPeaks} | gzip > {output.resortedBed} && bedtools intersect \ -a {output.resortedBed} \ -b {input.bam} \ -c \ -sorted -g {input.genomeFile} \ | gzip > {output.peaksOverlap} """ # TF-specific part: rule intersectPeaksAndPWM: input: consensusPeaks = rules.filterSexChromosomesAndSortPeaks.output.consensusPeaks_sorted, allPWMs = expand('{dir}/{compType}{TF}_pwmscan.sorted.bed.gz', dir = TEMP_DIR, compType = compType, TF = allTF) output: TFBSinPeaks_bed = expand('{dir}/{compType}allPWM.peaks.bed.gz', dir = TEMP_DIR, compType = compType), TFBSinPeaksMod_bed = expand('{dir}/{compType}allPWM.peaks.extension.bed.gz', dir = TEMP_EXTENSION_DIR, compType = compType) log: message: "{ruleDisplayMessage} Obtain binding sites from peaks: Intersect files {input.allPWMs} and {input.consensusPeaks}..." threads: 1 params: extension = config["par_general"]["regionExtension"] shell: """bedtools intersect \ -a {input.consensusPeaks} \ -b {input.allPWMs} \ -wa -wb \ -sorted \ -filenames \ | gzip > {output.TFBSinPeaks_bed} && zcat {output.TFBSinPeaks_bed} | cut -f4,5,6,7,8,9,12 | uniq | awk '{{OFS="\\t"}};{{ print $4, $5-{params.extension}, $6+{params.extension},$1,$2,$7,$3}}' | gzip > {output.TFBSinPeaksMod_bed} """ rule intersectTFBSAndBAM: input: bed = rules.intersectPeaksAndPWM.output.TFBSinPeaksMod_bed, allBAMs = allBamFiles output: peaksOverlap = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}{peakFileEndingPattern}', dir = TF_DIR, extension = extDir, compType = compType, peakFileEndingPattern = peakFileEndingPattern) log: message: "{ruleDisplayMessage} Run bedtools intersect for file {input.bed} against all BAM files..." threads: 1 params: shell: """ zgrep -P "{wildcards.TF}_pwmscan\.sorted" {input.bed} | bedtools multicov \ -bed stdin \ -bams {input.allBAMs} \ | gzip > {output.peaksOverlap} """ name_plots = PEAKS_DIR + "/" + compType + "diagnosticPlots.peaks.pdf" rule DESeqPeaks: input: sampleData = config["samples"]["summaryFile"], peaks = expand('{dir}/{compType}{basenameBAM}.{peakFileEndingPattern}', dir = PEAKS_DIR, basenameBAM = allBamFilesBasename, compType = compType, peakFileEndingPattern = peakFileEndingPattern) output: sampleDataR = PEAKS_DIR + "/" + compType + "sampleMetadata.rds", peakFile = PEAKS_DIR + "/" + compType + "peaks.rds", peaks_tsv = PEAKS_DIR + "/" + compType + "peaks.tsv", condComp = TEMP_EXTENSION_DIR + "/" + compType + "conditionComparison.rds", normFacs = PEAKS_DIR + "/" + compType + "normFacs.rds", plots = name_plots, plotsPerm = expand('{dir}/{compType}diagnosticPlots.peaks_permutation{perm}.pdf', dir = PEAKS_DIR, compType = compType, perm = range(1, nPerm + 1, 1)), DESeqObj = PEAKS_DIR + "/" + compType + "DESeq.object.rds" log: expand('{dir}/2.DESeqPeaks.R.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Run R script {script_DESeqPeaks}" threads: min(threadsMax, config["par_general"]["nPermutations"] + 1) params: doCyclicLoess = "true" script: script_DESeqPeaks name_outputTSV = "output.tsv" name_plotsDiag = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}" + ".diagnosticPlots.pdf" name_TFSummary = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}" + ".summaryPlots.pdf" rule analyzeTF: input: overlapFile = rules.intersectTFBSAndBAM.output.peaksOverlap, sampleDataR = rules.DESeqPeaks.output.sampleDataR, peakFile = rules.DESeqPeaks.output.peakFile, peakFile2 = rules.DESeqPeaks.output.peaks_tsv, normFacs = rules.DESeqPeaks.output.normFacs, plotsPerm = rules.DESeqPeaks.output.plotsPerm output: outputTSV = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + name_outputTSV, outputRDS = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "summary.rds", plot_diagnostic = name_plotsDiag, plot_TFSummary = name_TFSummary, #plot_diagnosticPerm = dynamic(TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "diagnosticPlots_permutation{perm}.pdf"), plot_diagnosticPerm = expand('{dir}/{{TF}}/{ext}/{compType}{{TF}}.diagnosticPlots_permutation{perm}.pdf', dir = TF_DIR, ext = extDir, compType = compType, perm = range(1, nPerm + 1, 1)), DESeqObj = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "DESeq.object.rds" log: expand('{dir}/3.analyzeTF.{{TF}}.R.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Run R script {script_analyzeTF} for TF {wildcards.TF}..." threads: 1 params: doCyclicLoess = "true", allBAMS = list(allBamFiles) script: script_analyzeTF rule summary1: input: peaks = rules.DESeqPeaks.output.peaks_tsv, TF = expand('{dir}/{TF}/{ext}/{compType}{TF}.summary.rds', dir = TF_DIR, TF = allTF, ext = extDir, compType = compType) output: outputTable = FINAL_DIR + "/" + compType + "TF_vs_peak_distribution.tsv" log: expand('{dir}/4.summary1.R.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Run R script {script_summary1} ..." threads: 1 script: script_summary1 rule concatenateMotifs: input: diagnosticPlots = rules.summary1.output, TFMotifes = expand('{dir}/{TF}/{extension}/{compType}{TF}.{patternTSV}', dir = TF_DIR, TF = allTF, extension = extDir, compType = compType, patternTSV = name_outputTSV) output: allMotifs_tsv = FINAL_DIR + "/" + compType + "allMotifs.tsv.gz" log: message: "{ruleDisplayMessage}Concatenate all motifs for file {input.TFMotifes}..." threads: 1 params: folder = TF_DIR, pattern = "*" + extDir + "/" + compType + "*." + name_outputTSV shell: """ find {params.folder} -type f -path "{params.pattern}" -exec awk 'NR==1 || FNR!=1' {{}} + | gzip > {output.allMotifs_tsv} """ rule calcNucleotideContent: input: motifsBed = rules.concatenateMotifs.output.allMotifs_tsv output: bedTemp = TEMP_EXTENSION_DIR + "/" + compType + "motifs.coord.permutation{perm}.bed.gz", bed = TEMP_EXTENSION_DIR + "/" + compType + "motifs.coord.nucContent.permutation{perm}.bed.gz" log: message: "{ruleDisplayMessage}Calculate nucleotide content via bedtools nuc for file {input} ..." threads: 1 params: refGenome = config["additionalInputFiles"]["refGenome_fasta"] shell: """ zcat {input.motifsBed} | grep -P "^{wildcards.perm}\\t" | awk '{{OFS="\\t"}};{{print $3,$4,$5,$6,$2}}' | gzip > {output.bedTemp} && bedtools nuc -fi {params.refGenome} -bed {output.bedTemp} | gzip > {output.bed} """ rule prepareBinning: input: nucContent = expand('{dir}/{compType}motifs.coord.nucContent.permutation{perm}.bed.gz', dir = TEMP_EXTENSION_DIR, compType = compType, perm = range(0, nPerm + 1, 1)), motifes = rules.concatenateMotifs.output.allMotifs_tsv output: allTFData = TEMP_EXTENSION_DIR + "/" + compType + "allTFData_processedForPermutations.rds", allTFUniqueData = TEMP_EXTENSION_DIR + "/" + compType + "allTFUniqueData_processedForPermutations.rds" log: expand('{dir}/5.prepareBinning.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Run R script {script_prepareBinning} ..." threads: 1 params: nCGBins = nCGBins script: script_prepareBinning rule binningTF: input: allTFData = rules.prepareBinning.output.allTFData, allTFUniqueData = rules.prepareBinning.output.allTFUniqueData output: permResults = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.permutationResults.rds', dir = TF_DIR, extension = extDir, compType = compType), summary = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.permutationSummary.tsv', dir = TF_DIR, extension = extDir, compType = compType), covResults = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.covarianceResults.rds', dir = TF_DIR, extension = extDir, compType = compType) log: expand('{dir}/6.binningTF.{{TF}}.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Run R script {script_binningTF} ..." threads: 1 params: nBootstraps = nBootstraps script: script_binningTF rule summaryFinal: input: allPermutationResults = expand('{dir}/{TF}/{extension}/{compType}{TF}.permutationSummary.tsv', dir = TF_DIR, TF = allTF, extension = extDir, compType = compType), condComp = rules.DESeqPeaks.output.condComp, DeSeqObj = rules.DESeqPeaks.output.DESeqObj output: summary = FINAL_DIR + "/" + compType + "summary.tsv", circularPlot = FINAL_DIR + "/" + compType + "summary.circular.pdf", diagnosticPlots = FINAL_DIR + "/" + compType + "diagnosticPlots.pdf", log: expand('{dir}/7.summaryFinal.R.log', dir = LOG_BENCHMARK_DIR) message: "{ruleDisplayMessage}Run R script {script_summaryFinal} ..." threads: 1 params: TFs = ",".join(allTF) script: script_summaryFinal rule cleanUpLogFiles: input: rules.summaryFinal.output output: warnLog = LOG_BENCHMARK_DIR + "/" + compType + "all.warnings.log", errorLog = LOG_BENCHMARK_DIR + "/" + compType + "all.errors.log" message: "{ruleDisplayMessage}Clean and summarize Logs_and_Benchmark directory..." threads: 1 params: dir = LOG_BENCHMARK_DIR shell: """ grep -iP "^WARN" {params.dir}/*.log > {output.warnLog} || true && grep -iP "^FATAL" {params.dir}/*.log > {output.errorLog} || true && rm {params.dir}/*.out {params.dir}/*.err || true """