Newer
Older
############################################
# Libraries, versions, authors and license #
############################################
from snakemake.utils import min_version
import subprocess
import os
import pandas
import numpy
# Enforce a minimum Snakemake version because of various features
__license__ = "MIT"
############################################
# Working directory and configuration file #
############################################
# Not needed, will be provided via the command line in Snakemake
########
# MISC #
########
# Make the output nicer and easier to follow
ruleDisplayMessage = "\n\n########################\n# START EXECUTING RULE #\n########################\n"
# The onsuccess handler is executed if the workflow finished without error.
onsuccess:
print("\n\n###############################\n# Workflow finished, no error #\n###############################\n\n")
# Else, the onerror handler is executed.
onerror:
print("\n\n#####################\n# An error occurred #\n#####################\n\n")
#shell("mail -s "an error occurred" carnold@embl.de < {log}")
# onstart handler will be executed before the workflow starts. Note that dry-runs do not trigger any of the handlers
onstart:
print("Reading samples and metadata....\n")
print ("Running workflow for the following " + str(len(allTF)) + " TF:\n " + ' \n '.join(map(str, allTF)))
print ("Running workflow for the following BAM files:\n " + ' \n '.join(map(str, allBamFiles)))
def read_samplesTable(samplesSummaryFile, consensusPeaks):
"""text"""
data = pandas.read_table(samplesSummaryFile)
# Expect a particular number of columns, do a sanity check here
if not {'SampleID', 'bamReads', 'conditionSummary', 'Peaks'}.issubset(data.columns.values):
raise KeyError("The samples file must contain at least the following named columns (TAB separated!): 'SampleID', 'bamReads', 'conditionSummary', 'Peaks'")
else:
if not {'SampleID', 'bamReads', 'conditionSummary'}.issubset(data.columns.values):
raise KeyError("The samples file must contain at least the following named columns (TAB separated!): 'SampleID', 'bamReads', 'conditionSummary'")
return data
#############################
# DIRECTORIES AND VARIABLES #
#############################
# Maximum number of cores per rule. This value will never be achieved because the minimum of this value and the --cores parameter will define the number of CPUs per rule in the end.
threadsMax = 16
# Input files
samplesSummaryFile = config["samples"]["summaryFile"]
extDir = "extension" + str(config["par_general"]["regionExtension"])
ROOT_DIR = config["par_general"]["outdir"]
TF_DIR = ROOT_DIR + "/TF-SPECIFIC"
PEAKS_DIR = ROOT_DIR + "/PEAKS"
LOG_BENCHMARK_DIR = ROOT_DIR + "/LOGS_AND_BENCHMARKS"
samplesData = read_samplesTable(config["samples"]["summaryFile"], config["peaks"]["consensusPeaks"])
allBamFiles = samplesData.loc[:,"bamReads"]
allBamFilesBasename = []
for fileCur in allBamFiles:
if not os.path.isfile(fileCur):
raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") not found.")
raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") does not have an index (corresponding *.bam.bai). All BAM files must be indexed (e.g., use samtools index)")
basename = os.path.splitext(os.path.basename(fileCur))[0]
allBamFilesBasename.append(basename)
# Add new column
samplesData = samplesData.assign(basename = allBamFilesBasename)
# Check existance of all specified peak files
if not config["peaks"]["consensusPeaks"]:
allPeakFiles = samplesData.loc[:,"Peaks"]
for fileCur in allPeakFiles:
if not os.path.isfile(fileCur):
raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") not found.")
else:
allPeakFiles = []
regionExt = config["par_general"]["regionExtension"]
nPerm = config["par_general"]["nPermutations"]
nBootstraps = config["par_general"]["nBootstraps"]
nCGBins = config["par_general"]["nCGBins"]
if config["par_general"]["comparisonType"] != "":
compType = config["par_general"]["comparisonType"] + "."
if config["par_general"]["TFs"] == "all":
PWM_FILES = os.popen("ls " + config["additionalInputFiles"]["dir_PWMScan"]).readlines()
for TFCur in PWM_FILES:
TFCurBasename = os.path.basename(TFCur.replace('\n', '').replace('_pwmscan.bed', ''))
allTF.append(TFCurBasename)
else:
TFArray = config["par_general"]["TFs"].replace(" ", "").split(',')
for TFCur in TFArray:
fileCur = config["additionalInputFiles"]["dir_PWMScan"] + "/" + TFCur + "_pwmscan.bed"
if not os.path.isfile(fileCur):
raise IOError("The TF " + TFCur + " is in the list of TFs to process, but the file \"" + fileCur + "\" is missing. Check the folder " + config["additionalInputFiles"]["dir_PWMScan"] + " and \"par_general\": \"TFs\")")
allTF.append(TFCur)
if len(allTF) == 0:
raise WorkflowError("The list of TFs is empty. Adjust the parameter \"par_general\": \"TFs\".")
if not os.path.isfile(config["additionalInputFiles"]["refGenome_fasta"]):
raise IOError("File \"" + config["additionalInputFiles"]["refGenome_fasta"] + "\" not found.")
filenameCur = config["additionalInputFiles"]["HOCOMOCO_mapping"]
if not os.path.isfile(filenameCur):
raise IOError("File \"" + filenameCur + "\" not found.")
filenameCur = config["additionalInputFiles"]["RNASeqCounts"]
if not os.path.isfile(filenameCur):
raise IOError("File \"" + filenameCur + "\" not found.")
if config["peaks"]["consensusPeaks"]:
filenameCur = config["peaks"]["consensusPeaks"]
if not os.path.isfile(filenameCur):
raise IOError("File \"" + filenameCur + "\" not found.")
# Check if it contains scientific notification
# the || true in the end ensures the exit status of grep is 0, because this would raise an error otherwise
nHits = int(subprocess.check_output('grep -c "e+" ' + config["peaks"]["consensusPeaks"] + " || true", shell=True))
if nHits > 0:
raise AssertionError("File " + config["peaks"]["consensusPeaks"] + " contains at least one line with the scientific notation (e+). This will cause errors in subsequent steps. Check the file and transform all \"e+\" coordinates.")
script_createConsensusPeaks = dir_scripts + "/1.createConsensusPeaks.R"
script_DESeqPeaks = dir_scripts + "/2.DESeqPeaks.R"
script_analyzeTF = dir_scripts + "/3.analyzeTFNew.R"
script_summary1 = dir_scripts + "/4.summary1.R"
script_prepareBinning = dir_scripts + "/5.prepareBinning.R"
script_binningTF = dir_scripts + "/6.binningTF.R"
script_summaryFinal = dir_scripts + "/7.summaryFinal.R"
for scriptNameCur in [script_createConsensusPeaks, script_DESeqPeaks, script_analyzeTF, script_summary1, script_prepareBinning, script_binningTF, script_summaryFinal]:
if not os.path.isfile(scriptNameCur):
raise IOError("File " + scriptNameCur + " not found.")
#########
# RULES #
#########
# For cluster usage: The keyword localrules allows to mark a rule as local, so that it is not submitted to the cluster and instead executed on the host node
localrules: all, link_inputFiles
rule all:
input:
perm = FINAL_DIR + "/" + compType + "summary.circular.pdf",
summaryLogs = LOG_BENCHMARK_DIR + "/" + compType + "all.warnings.log"
rule produceConsensusPeaks:
input:
peaks = allPeakFiles
output:
consensusPeaks_bed = PEAKS_DIR + "/" + compType + "consensusPeaks.bed",
summaryPlot = PEAKS_DIR + "/" + compType + "consensusPeaks_lengthDistribution.pdf"
log: expand('{dir}/1.produceConsensusPeaks.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Calculate consensus peaks for all peak files with the script {script_createConsensusPeaks}..."
# Forces the execution of the rule above just when the user did not provide a consensus peak file
def retrieveConsensusPeakFile (par_consensusPeaks):
if not par_consensusPeaks:
return rules.produceConsensusPeaks.output.consensusPeaks_bed
else:
return config["peaks"]["consensusPeaks"]
consensusPeaks = retrieveConsensusPeakFile(config["peaks"]["consensusPeaks"])
consensusPeaks_filtered = TEMP_DIR + "/" + compType + "consensusPeaks.filtered.bed",
consensusPeaks_sorted = PEAKS_DIR + "/" + compType + "consensusPeaks.filtered.sorted.bed"
message: "{ruleDisplayMessage}Filter sex and unassembled chromosomes..."
shell: """
grep -vP "^chrX|^chrY|^chrM|^chrUn|random|hap|_gl" {input.consensusPeaks} > {output.consensusPeaks_filtered} &&
sort -k1,1 -k2,2n {output.consensusPeaks_filtered} > {output.consensusPeaks_sorted}
"""
rule sortPWM:
input:
bed = config["additionalInputFiles"]["dir_PWMScan"] + "/{TF}_pwmscan.bed"
output:
bedSorted = TEMP_DIR + "/" + compType + "{TF}_pwmscan.sorted.bed.gz"
message: "{ruleDisplayMessage} Sort bed file {input.bed}..."
threads: 1
shell:
"""sh -c 'sort -k1,1 -k2,2n {input.bed} | gzip > {output.bedSorted}'"""
def getBamFileFromBasename(basename):
"""text"""
hit = numpy.asarray(samplesData.loc[samplesData["basename"] == basename, "bamReads"])
if len(hit) != 1:
raise KeyError("Could not uniquely retrieve the BAM file for the basename \"" + basename + "\" from the file " + config["samples"]["summaryFile"])
return hit
input:
bam = lambda wildcards: getBamFileFromBasename(wildcards.basename)
output:
idxoutput = temp(TEMP_DIR + "/" + compType + "{basename}.idxstats"),
genomeFile = TEMP_DIR + "/" + compType + "{basename}.chrOrder1",
genomeFile2 = TEMP_DIR + "/" + compType + "{basename}.chrOrder2"
log:
message: "{ruleDisplayMessage} Run idxstats for file {input.bam}..."
threads: 1
params:
shell:
"""
samtools idxstats {input.bam} > {output.idxoutput} &&
cat {output.idxoutput} | grep -v "^*" |awk '{{print $1"\\t"$2}}' >{output.genomeFile} &&
cat {output.idxoutput} | grep -v "^*" |awk '{{print $1}}' >{output.genomeFile2}
"""
rule intersectPeaksAndBAM:
input:
consensusPeaks = rules.filterSexChromosomesAndSortPeaks.output.consensusPeaks_sorted,
genomeFile = rules.idxstatsBAM.output.genomeFile,
genomeFile2 = rules.idxstatsBAM.output.genomeFile2,
bam = lambda wildcards: getBamFileFromBasename(wildcards.basename)
output:
peaksOverlap = PEAKS_DIR + '/' + compType + '{basename}' + peakFileEndingPattern,
resortedBed = TEMP_DIR + '/' + compType + '{basename}.' + "resorted.gz"
log:
message: "{ruleDisplayMessage} Run intersect and count for file {input.consensusPeaks} with {input.bam}..."
threads: 1
params:
shell:
""" bedtools sort -faidx {input.genomeFile2} -i {input.consensusPeaks} | gzip > {output.resortedBed} &&
bedtools intersect \
-a {output.resortedBed} \
-b {input.bam} \
-c \
-sorted -g {input.genomeFile} \
"""
# TF-specific part:
rule intersectPeaksAndPWM:
input:
consensusPeaks = rules.filterSexChromosomesAndSortPeaks.output.consensusPeaks_sorted,
allPWMs = expand('{dir}/{compType}{TF}_pwmscan.sorted.bed.gz', dir = TEMP_DIR, compType = compType, TF = allTF)
TFBSinPeaks_bed = expand('{dir}/{compType}allPWM.peaks.bed.gz', dir = TEMP_DIR, compType = compType),
TFBSinPeaksMod_bed = expand('{dir}/{compType}allPWM.peaks.extension.bed.gz', dir = TEMP_EXTENSION_DIR, compType = compType)
message: "{ruleDisplayMessage} Obtain binding sites from peaks: Intersect files {input.allPWMs} and {input.consensusPeaks}..."
threads: 1
params:
extension = config["par_general"]["regionExtension"]
shell:
"""bedtools intersect \
-a {input.consensusPeaks} \
-filenames \
| gzip > {output.TFBSinPeaks_bed} &&
zcat {output.TFBSinPeaks_bed} | cut -f4,5,6,7,8,9,12 | uniq | awk '{{OFS="\\t"}};{{ print $4, $5-{params.extension}, $6+{params.extension},$1,$2,$7,$3}}' | gzip > {output.TFBSinPeaksMod_bed}
bed = rules.intersectPeaksAndPWM.output.TFBSinPeaksMod_bed,
allBAMs = allBamFiles
peaksOverlap = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}{peakFileEndingPattern}', dir = TF_DIR, extension = extDir, compType = compType, peakFileEndingPattern = peakFileEndingPattern)
message: "{ruleDisplayMessage} Run bedtools intersect for file {input.bed} against all BAM files..."
""" zgrep -P "{wildcards.TF}_pwmscan\.sorted" {input.bed} |
bedtools multicov \
-bed stdin \
-bams {input.allBAMs} \
| gzip > {output.peaksOverlap}
name_plots = PEAKS_DIR + "/" + compType + "diagnosticPlots.peaks.pdf"
rule DESeqPeaks:
input:
sampleData = config["samples"]["summaryFile"],
peaks = expand('{dir}/{compType}{basenameBAM}.{peakFileEndingPattern}', dir = PEAKS_DIR, basenameBAM = allBamFilesBasename, compType = compType, peakFileEndingPattern = peakFileEndingPattern)
sampleDataR = PEAKS_DIR + "/" + compType + "sampleMetadata.rds",
peakFile = PEAKS_DIR + "/" + compType + "peaks.rds",
peaks_tsv = PEAKS_DIR + "/" + compType + "peaks.tsv",
condComp = TEMP_EXTENSION_DIR + "/" + compType + "conditionComparison.rds",
normFacs = PEAKS_DIR + "/" + compType + "normFacs.rds",
plots = name_plots,
plotsPerm = expand('{dir}/{compType}diagnosticPlots.peaks_permutation{perm}.pdf', dir = PEAKS_DIR, compType = compType, perm = range(1, nPerm + 1, 1)),
DESeqObj = PEAKS_DIR + "/" + compType + "DESeq.object.rds"
log: expand('{dir}/2.DESeqPeaks.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_DESeqPeaks}"
threads: min(threadsMax, config["par_general"]["nPermutations"] + 1)
doCyclicLoess = "true"
script: script_DESeqPeaks
name_plotsDiag = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}" + ".diagnosticPlots.pdf"
name_TFSummary = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}" + ".summaryPlots.pdf"
overlapFile = rules.intersectTFBSAndBAM.output.peaksOverlap,
sampleDataR = rules.DESeqPeaks.output.sampleDataR,
peakFile = rules.DESeqPeaks.output.peakFile,
peakFile2 = rules.DESeqPeaks.output.peaks_tsv,
normFacs = rules.DESeqPeaks.output.normFacs,
plotsPerm = rules.DESeqPeaks.output.plotsPerm
outputTSV = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + name_outputTSV,
outputRDS = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "summary.rds",
plot_diagnostic = name_plotsDiag,
plot_TFSummary = name_TFSummary,
#plot_diagnosticPerm = dynamic(TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "diagnosticPlots_permutation{perm}.pdf"),
plot_diagnosticPerm = expand('{dir}/{{TF}}/{ext}/{compType}{{TF}}.diagnosticPlots_permutation{perm}.pdf', dir = TF_DIR, ext = extDir, compType = compType, perm = range(1, nPerm + 1, 1)),
DESeqObj = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "DESeq.object.rds"
log: expand('{dir}/3.analyzeTF.{{TF}}.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_analyzeTF} for TF {wildcards.TF}..."
doCyclicLoess = "true",
allBAMS = list(allBamFiles)
script: script_analyzeTF
rule summary1:
peaks = rules.DESeqPeaks.output.peaks_tsv,
TF = expand('{dir}/{TF}/{ext}/{compType}{TF}.summary.rds', dir = TF_DIR, TF = allTF, ext = extDir, compType = compType)
outputTable = FINAL_DIR + "/" + compType + "TF_vs_peak_distribution.tsv"
log: expand('{dir}/4.summary1.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_summary1} ..."
rule concatenateMotifs:
input:
diagnosticPlots = rules.summary1.output,
TFMotifes = expand('{dir}/{TF}/{extension}/{compType}{TF}.{patternTSV}', dir = TF_DIR, TF = allTF, extension = extDir, compType = compType, patternTSV = name_outputTSV)
output: allMotifs_tsv = FINAL_DIR + "/" + compType + "allMotifs.tsv.gz"
log:
message: "{ruleDisplayMessage}Concatenate all motifs for file {input.TFMotifes}..."
threads: 1
params:
folder = TF_DIR,
pattern = "*" + extDir + "/" + compType + "*." + name_outputTSV
find {params.folder} -type f -path "{params.pattern}" -exec awk 'NR==1 || FNR!=1' {{}} + | gzip > {output.allMotifs_tsv}
input:
motifsBed = rules.concatenateMotifs.output.allMotifs_tsv
output:
bedTemp = TEMP_EXTENSION_DIR + "/" + compType + "motifs.coord.permutation{perm}.bed.gz",
bed = TEMP_EXTENSION_DIR + "/" + compType + "motifs.coord.nucContent.permutation{perm}.bed.gz"
log:
message: "{ruleDisplayMessage}Calculate nucleotide content via bedtools nuc for file {input} ..."
threads: 1
params: refGenome = config["additionalInputFiles"]["refGenome_fasta"]
shell:
"""
zcat {input.motifsBed} | grep -P "^{wildcards.perm}\\t" | awk '{{OFS="\\t"}};{{print $3,$4,$5,$6,$2}}' | gzip > {output.bedTemp} &&
bedtools nuc -fi {params.refGenome} -bed {output.bedTemp} | gzip > {output.bed}
nucContent = expand('{dir}/{compType}motifs.coord.nucContent.permutation{perm}.bed.gz', dir = TEMP_EXTENSION_DIR, compType = compType, perm = range(0, nPerm + 1, 1)),
motifes = rules.concatenateMotifs.output.allMotifs_tsv
output:
allTFData = TEMP_EXTENSION_DIR + "/" + compType + "allTFData_processedForPermutations.rds",
allTFUniqueData = TEMP_EXTENSION_DIR + "/" + compType + "allTFUniqueData_processedForPermutations.rds"
log: expand('{dir}/5.prepareBinning.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_prepareBinning} ..."
nCGBins = nCGBins
script: script_prepareBinning
rule binningTF:
allTFData = rules.prepareBinning.output.allTFData,
allTFUniqueData = rules.prepareBinning.output.allTFUniqueData
permResults = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.permutationResults.rds', dir = TF_DIR, extension = extDir, compType = compType),
summary = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.permutationSummary.tsv', dir = TF_DIR, extension = extDir, compType = compType),
covResults = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.covarianceResults.rds', dir = TF_DIR, extension = extDir, compType = compType)
log: expand('{dir}/6.binningTF.{{TF}}.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_binningTF} ..."
nBootstraps = nBootstraps
script: script_binningTF
rule summaryFinal:
allPermutationResults = expand('{dir}/{TF}/{extension}/{compType}{TF}.permutationSummary.tsv', dir = TF_DIR, TF = allTF, extension = extDir, compType = compType),
condComp = rules.DESeqPeaks.output.condComp,
DeSeqObj = rules.DESeqPeaks.output.DESeqObj
summary = FINAL_DIR + "/" + compType + "summary.tsv",
circularPlot = FINAL_DIR + "/" + compType + "summary.circular.pdf",
diagnosticPlots = FINAL_DIR + "/" + compType + "diagnosticPlots.pdf",
log: expand('{dir}/7.summaryFinal.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_summaryFinal} ..."
params: TFs = ",".join(allTF)
script: script_summaryFinal
rule cleanUpLogFiles:
input: rules.summaryFinal.output
output:
warnLog = LOG_BENCHMARK_DIR + "/" + compType + "all.warnings.log",
errorLog = LOG_BENCHMARK_DIR + "/" + compType + "all.errors.log"
message: "{ruleDisplayMessage}Clean and summarize Logs_and_Benchmark directory..."
threads: 1
params: dir = LOG_BENCHMARK_DIR
shell:
"""
grep -iP "^WARN" {params.dir}/*.log > {output.warnLog} || true &&
grep -iP "^FATAL" {params.dir}/*.log > {output.errorLog} || true &&
rm {params.dir}/*.out {params.dir}/*.err || true
"""