Newer
Older
############################################
# Libraries, versions, authors and license #
############################################
from snakemake.utils import min_version
import subprocess
import os
import pandas
import numpy
# Enforce a minimum Snakemake version because of various features

Christian Arnold
committed
min_version("4.0")
########
# MISC #
########
# Make the output nicer and easier to follow
ruleDisplayMessage = "\n\n########################\n# START EXECUTING RULE #\n########################\n"
# The onsuccess handler is executed if the workflow finished without error.
onsuccess:
print("\n\n###############################\n# Workflow finished, no error #\n###############################\n\n")
# Else, the onerror handler is executed.
onerror:
print("\n\n#####################\n# An error occurred #\n#####################\n\n")
#shell("mail -s "an error occurred" carnold@embl.de < {log}")
# onstart handler will be executed before the workflow starts. Note that dry-runs do not trigger any of the handlers
onstart:
print("Reading samples and metadata....\n")
print ("Running workflow for the following " + str(len(allTF)) + " TF:\n " + ' \n '.join(map(str, allTF)))
print ("Running workflow for the following BAM files:\n " + ' \n '.join(map(str, allBamFiles)))
def read_samplesTable(samplesSummaryFile, consensusPeaks):
"""text"""
data = pandas.read_table(samplesSummaryFile)
# Expect a particular number of columns, do a sanity check here
if not {'SampleID', 'bamReads', 'conditionSummary', 'Peaks'}.issubset(data.columns.values):
raise KeyError("The samples file must contain at least the following named columns (TAB separated!): 'SampleID', 'bamReads', 'conditionSummary', 'Peaks'")
else:
if not {'SampleID', 'bamReads', 'conditionSummary'}.issubset(data.columns.values):
raise KeyError("The samples file must contain at least the following named columns (TAB separated!): 'SampleID', 'bamReads', 'conditionSummary'")
return data

Christian Arnold
committed
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
###############################################
# Check if all parameters have been specified #
###############################################
requiredSections = ["samples", "par_general", "peaks", "additionalInputFiles"]
for sectionCur in requiredSections:
if not sectionCur in config:
raise KeyError("Could not find section \"" + sectionCur + "\" in the config file or no config file has been specified.")
# 1. Section par_general
missingParameters = []
requiredPar = ["outdir", "regionExtension", "comparisonType", "designContrast", "designVariableTypes", "nPermutations", "nBootstraps", "nCGBins", "TFs", "dir_scripts", "RNASeqIntegration"]
for parCur in requiredPar:
if not parCur in config["par_general"]:
missingParameters.append(parCur)
if len(missingParameters) > 0:
missingParStr = ",".join(missingParameters)
raise KeyError("Could not find parameter(s) \"" + missingParStr + "\" in section \"par_general\" in the config file.")
# 2. Section par_general
if not "summaryFile" in config["samples"]:
raise KeyError("Could not find parameter \"summaryFile\" in section \"samples\" in the config file.")
# 3. Section peaks
if not "consensusPeaks" in config["peaks"]:
raise KeyError("Could not find parameter \"consensusPeaks\" in section \"peaks\" in the config file.")
if not "peakType" in config["peaks"]:
raise KeyError("Could not find parameter \"peakType\" in section \"peaks\" in the config file.")
if not "minOverlap" in config["peaks"]:
raise KeyError("Could not find parameter \"minOverlap\" in section \"peaks\" in the config file.")
# 4. Section additionalInputFiles
if not "refGenome_fasta" in config["additionalInputFiles"]:
raise KeyError("Could not find parameter \"refGenome_fasta\" in section \"additionalInputFiles\" in the config file.")
if not "dir_PWMScan" in config["additionalInputFiles"]:
raise KeyError("Could not find parameter \"dir_PWMScan\" in section \"additionalInputFiles\" in the config file.")
if not "RNASeqCounts" in config["additionalInputFiles"]:
raise KeyError("Could not find parameter \"RNASeqCounts\" in section \"additionalInputFiles\" in the config file.")
if not "HOCOMOCO_mapping" in config["additionalInputFiles"]:
raise KeyError("Could not find parameter \"HOCOMOCO_mapping\" in section \"additionalInputFiles\" in the config file.")
#############################
# DIRECTORIES AND VARIABLES #
#############################
# Maximum number of cores per rule. This value will never be achieved because the minimum of this value and the --cores parameter will define the number of CPUs per rule in the end.
threadsMax = 16
# Input files
samplesSummaryFile = config["samples"]["summaryFile"]
extDir = "extension" + str(config["par_general"]["regionExtension"])
ROOT_DIR = config["par_general"]["outdir"]
TF_DIR = ROOT_DIR + "/TF-SPECIFIC"
PEAKS_DIR = ROOT_DIR + "/PEAKS"
LOG_BENCHMARK_DIR = ROOT_DIR + "/LOGS_AND_BENCHMARKS"
samplesData = read_samplesTable(config["samples"]["summaryFile"], config["peaks"]["consensusPeaks"])
allBamFiles = samplesData.loc[:,"bamReads"]
allBamFilesBasename = []
for fileCur in allBamFiles:
if not os.path.isfile(fileCur):
raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") not found.")
raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") does not have an index (corresponding *.bam.bai). All BAM files must be indexed (e.g., use samtools index)")
basename = os.path.splitext(os.path.basename(fileCur))[0]
allBamFilesBasename.append(basename)

Christian Arnold
committed
#print(allBamFilesBasename)
# Add new column
samplesData = samplesData.assign(basename = allBamFilesBasename)
# Check existance of all specified peak files
if not config["peaks"]["consensusPeaks"]:
allPeakFiles = samplesData.loc[:,"Peaks"]
for fileCur in allPeakFiles:
if not os.path.isfile(fileCur):
raise IOError("File \"" + fileCur + "\" (defined in " + config["samples"]["summaryFile"] + ") not found.")
else:
allPeakFiles = []
regionExt = config["par_general"]["regionExtension"]
nPerm = config["par_general"]["nPermutations"]
nBootstraps = config["par_general"]["nBootstraps"]
nCGBins = config["par_general"]["nCGBins"]
if config["par_general"]["comparisonType"] != "":
compType = config["par_general"]["comparisonType"] + "."
if config["par_general"]["TFs"] == "all":
PWM_FILES = os.popen("ls " + config["additionalInputFiles"]["dir_PWMScan"]).readlines()
for TFCur in PWM_FILES:
TFCurBasename = os.path.basename(TFCur.replace('\n', '').replace('_pwmscan.bed', ''))
allTF.append(TFCurBasename)
else:
TFArray = config["par_general"]["TFs"].replace(" ", "").split(',')
for TFCur in TFArray:
fileCur = config["additionalInputFiles"]["dir_PWMScan"] + "/" + TFCur + "_pwmscan.bed"
if not os.path.isfile(fileCur):
raise IOError("The TF " + TFCur + " is in the list of TFs to process, but the file \"" + fileCur + "\" is missing. Check the folder " + config["additionalInputFiles"]["dir_PWMScan"] + " and \"par_general\": \"TFs\")")
allTF.append(TFCur)
if len(allTF) == 0:
raise WorkflowError("The list of TFs is empty. Adjust the parameter \"par_general\": \"TFs\".")
if not os.path.isfile(config["additionalInputFiles"]["refGenome_fasta"]):
raise IOError("File \"" + config["additionalInputFiles"]["refGenome_fasta"] + "\" not found.")
filenameCur = config["additionalInputFiles"]["HOCOMOCO_mapping"]
if not os.path.isfile(filenameCur):
raise IOError("File \"" + filenameCur + "\" not found.")
filenameCur = config["additionalInputFiles"]["RNASeqCounts"]
if not os.path.isfile(filenameCur):
raise IOError("File \"" + filenameCur + "\" not found.")
if config["peaks"]["consensusPeaks"]:
filenameCur = config["peaks"]["consensusPeaks"]
if not os.path.isfile(filenameCur):
raise IOError("File \"" + filenameCur + "\" not found.")
# Check if it contains scientific notification
# the || true in the end ensures the exit status of grep is 0, because this would raise an error otherwise
nHits = int(subprocess.check_output('grep -c "e+" ' + config["peaks"]["consensusPeaks"] + " || true", shell=True))
if nHits > 0:
raise AssertionError("File " + config["peaks"]["consensusPeaks"] + " contains at least one line with the scientific notation (e+). This will cause errors in subsequent steps. Check the file and transform all \"e+\" coordinates.")

Christian Arnold
committed
# Default directory for R scripts, specified relative to the Snakefile
# Not to be confused with the dir_scripts directory, which is only used to load the correct functions.R file in all R scripts
dir_scripts = "R/"

Christian Arnold
committed
script_createConsensusPeaks = "1.createConsensusPeaks.R"
script_DESeqPeaks = "2.DESeqPeaks.R"
script_analyzeTF = "3.analyzeTFNew.R"
script_summary1 = "4.summary1.R"
script_prepareBinning = "5.prepareBinning.R"
script_binningTF = "6.binningTF.R"
script_summaryFinal = "7.summaryFinal.R"
#########
# RULES #
#########
# For cluster usage: The keyword localrules allows to mark a rule as local, so that it is not submitted to the cluster and instead executed on the host node
localrules: all, link_inputFiles
rule all:
input:
perm = FINAL_DIR + "/" + compType + "summary.circular.pdf",
summaryLogs = LOG_BENCHMARK_DIR + "/" + compType + "all.warnings.log"
rule produceConsensusPeaks:
input:
peaks = allPeakFiles
output:
consensusPeaks_bed = PEAKS_DIR + "/" + compType + "consensusPeaks.bed",
summaryPlot = PEAKS_DIR + "/" + compType + "consensusPeaks_lengthDistribution.pdf"
log: expand('{dir}/1.produceConsensusPeaks.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Calculate consensus peaks for all peak files with the script {script_createConsensusPeaks}..."

Christian Arnold
committed
script: dir_scripts + script_createConsensusPeaks
# Forces the execution of the rule above just when the user did not provide a consensus peak file
def retrieveConsensusPeakFile (par_consensusPeaks):
if not par_consensusPeaks:
return rules.produceConsensusPeaks.output.consensusPeaks_bed
else:
return config["peaks"]["consensusPeaks"]
consensusPeaks = retrieveConsensusPeakFile(config["peaks"]["consensusPeaks"])
consensusPeaks_filtered = TEMP_DIR + "/" + compType + "consensusPeaks.filtered.bed",
consensusPeaks_sorted = PEAKS_DIR + "/" + compType + "consensusPeaks.filtered.sorted.bed"
message: "{ruleDisplayMessage}Filter sex and unassembled chromosomes..."
shell: """
grep -vP "^chrX|^chrY|^chrM|^chrUn|random|hap|_gl" {input.consensusPeaks} > {output.consensusPeaks_filtered} &&
sort -k1,1 -k2,2n {output.consensusPeaks_filtered} > {output.consensusPeaks_sorted}
"""

Christian Arnold
committed
peakFileEndingPattern = "overlapPeaks.bed.gz"
rule sortPWM:
input:
bed = config["additionalInputFiles"]["dir_PWMScan"] + "/{TF}_pwmscan.bed"
output:
bedSorted = TEMP_DIR + "/" + compType + "{TF}_pwmscan.sorted.bed.gz"
message: "{ruleDisplayMessage} Sort bed file {input.bed}..."
threads: 1
shell:
"""sh -c 'sort -k1,1 -k2,2n {input.bed} | gzip > {output.bedSorted}'"""
def getBamFileFromBasename(basename):
"""text"""
hit = numpy.asarray(samplesData.loc[samplesData["basename"] == basename, "bamReads"])
if len(hit) != 1:
raise KeyError("Could not uniquely retrieve the BAM file for the basename \"" + basename + "\" from the file " + config["samples"]["summaryFile"])
return hit
input:
bam = lambda wildcards: getBamFileFromBasename(wildcards.basename)
output:
idxoutput = temp(TEMP_DIR + "/" + compType + "{basename}.idxstats"),
genomeFile = TEMP_DIR + "/" + compType + "{basename}.chrOrder1",
genomeFile2 = TEMP_DIR + "/" + compType + "{basename}.chrOrder2"
log:
message: "{ruleDisplayMessage} Run idxstats for file {input.bam}..."
threads: 1
params:
shell:
"""
samtools idxstats {input.bam} > {output.idxoutput} &&
cat {output.idxoutput} | grep -v "^*" |awk '{{print $1"\\t"$2}}' >{output.genomeFile} &&
cat {output.idxoutput} | grep -v "^*" |awk '{{print $1}}' >{output.genomeFile2}
"""
rule intersectPeaksAndBAM:
input:
consensusPeaks = rules.filterSexChromosomesAndSortPeaks.output.consensusPeaks_sorted,
genomeFile = rules.idxstatsBAM.output.genomeFile,
genomeFile2 = rules.idxstatsBAM.output.genomeFile2,
bam = lambda wildcards: getBamFileFromBasename(wildcards.basename)
output:

Christian Arnold
committed
peaksOverlap = PEAKS_DIR + '/' + compType + '{basename}.' + peakFileEndingPattern,
resortedBed = TEMP_DIR + '/' + compType + '{basename}.' + "resorted.gz"
log:
message: "{ruleDisplayMessage} Run intersect and count for file {input.consensusPeaks} with {input.bam}..."
threads: 1
params:
shell:
""" bedtools sort -faidx {input.genomeFile2} -i {input.consensusPeaks} | gzip > {output.resortedBed} &&
bedtools intersect \
-a {output.resortedBed} \
-b {input.bam} \
-c \
-sorted -g {input.genomeFile} \
"""
# TF-specific part:
rule intersectPeaksAndPWM:
input:
consensusPeaks = rules.filterSexChromosomesAndSortPeaks.output.consensusPeaks_sorted,
allPWMs = expand('{dir}/{compType}{TF}_pwmscan.sorted.bed.gz', dir = TEMP_DIR, compType = compType, TF = allTF)
TFBSinPeaks_bed = expand('{dir}/{compType}allPWM.peaks.bed.gz', dir = TEMP_DIR, compType = compType),
TFBSinPeaksMod_bed = expand('{dir}/{compType}allPWM.peaks.extension.bed.gz', dir = TEMP_EXTENSION_DIR, compType = compType)
message: "{ruleDisplayMessage} Obtain binding sites from peaks: Intersect files {input.allPWMs} and {input.consensusPeaks}..."
threads: 1
params:
extension = config["par_general"]["regionExtension"]
shell:
"""bedtools intersect \
-a {input.consensusPeaks} \
-filenames \
| gzip > {output.TFBSinPeaks_bed} &&
zcat {output.TFBSinPeaks_bed} | cut -f4,5,6,7,8,9,12 | uniq | awk '{{OFS="\\t"}};{{ print $4, $5-{params.extension}, $6+{params.extension},$1,$2,$7,$3}}' | gzip > {output.TFBSinPeaksMod_bed}
bed = rules.intersectPeaksAndPWM.output.TFBSinPeaksMod_bed,
allBAMs = allBamFiles

Christian Arnold
committed
peaksOverlap = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.{peakFileEndingPattern}', dir = TF_DIR, extension = extDir, compType = compType, peakFileEndingPattern = peakFileEndingPattern)
message: "{ruleDisplayMessage} Run bedtools intersect for file {input.bed} against all BAM files..."
""" zgrep -P "{wildcards.TF}_pwmscan\.sorted" {input.bed} |
bedtools multicov \
-bed stdin \
-bams {input.allBAMs} \
| gzip > {output.peaksOverlap}
name_plots = PEAKS_DIR + "/" + compType + "diagnosticPlots.peaks.pdf"
rule DESeqPeaks:
input:
sampleData = config["samples"]["summaryFile"],
peaks = expand('{dir}/{compType}{basenameBAM}.{peakFileEndingPattern}', dir = PEAKS_DIR, basenameBAM = allBamFilesBasename, compType = compType, peakFileEndingPattern = peakFileEndingPattern)
sampleDataR = PEAKS_DIR + "/" + compType + "sampleMetadata.rds",
peakFile = PEAKS_DIR + "/" + compType + "peaks.rds",
peaks_tsv = PEAKS_DIR + "/" + compType + "peaks.tsv",
condComp = TEMP_EXTENSION_DIR + "/" + compType + "conditionComparison.rds",
normFacs = PEAKS_DIR + "/" + compType + "normFacs.rds",
plots = name_plots,
plotsPerm = expand('{dir}/{compType}diagnosticPlots.peaks_permutation{perm}.pdf', dir = PEAKS_DIR, compType = compType, perm = range(1, nPerm + 1, 1)),
DESeqObj = PEAKS_DIR + "/" + compType + "DESeq.object.rds"
log: expand('{dir}/2.DESeqPeaks.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_DESeqPeaks}"
threads: min(threadsMax, config["par_general"]["nPermutations"] + 1)

Christian Arnold
committed
script: dir_scripts + script_DESeqPeaks
name_plotsDiag = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}" + ".diagnosticPlots.pdf"
name_TFSummary = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}" + ".summaryPlots.pdf"
overlapFile = rules.intersectTFBSAndBAM.output.peaksOverlap,
sampleDataR = rules.DESeqPeaks.output.sampleDataR,
peakFile = rules.DESeqPeaks.output.peakFile,
peakFile2 = rules.DESeqPeaks.output.peaks_tsv,
normFacs = rules.DESeqPeaks.output.normFacs,
plotsPerm = rules.DESeqPeaks.output.plotsPerm
outputTSV = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + name_outputTSV,
outputRDS = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "summary.rds",
plot_diagnostic = name_plotsDiag,
plot_TFSummary = name_TFSummary,
DESeqObj = TF_DIR + "/{TF}/" + extDir + "/" + compType + "{TF}." + "DESeq.object.rds"
log: expand('{dir}/3.analyzeTF.{{TF}}.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_analyzeTF} for TF {wildcards.TF}..."
doCyclicLoess = "true",
allBAMS = list(allBamFiles)

Christian Arnold
committed
script: dir_scripts + script_analyzeTF
peaks = rules.DESeqPeaks.output.peaks_tsv,
TF = expand('{dir}/{TF}/{ext}/{compType}{TF}.summary.rds', dir = TF_DIR, TF = allTF, ext = extDir, compType = compType)
outputTable = FINAL_DIR + "/" + compType + "TF_vs_peak_distribution.tsv"
log: expand('{dir}/4.summary1.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_summary1} ..."

Christian Arnold
committed
script: dir_scripts + script_summary1
rule concatenateMotifs:
input:
diagnosticPlots = rules.summary1.output,
TFMotifes = expand('{dir}/{TF}/{extension}/{compType}{TF}.{patternTSV}', dir = TF_DIR, TF = allTF, extension = extDir, compType = compType, patternTSV = name_outputTSV)
output: allMotifs_tsv = FINAL_DIR + "/" + compType + "allMotifs.tsv.gz"
log:
message: "{ruleDisplayMessage}Concatenate all motifs for file {input.TFMotifes}..."
threads: 1
params:
folder = TF_DIR,
pattern = "*" + extDir + "/" + compType + "*." + name_outputTSV
find {params.folder} -type f -path "{params.pattern}" -exec awk 'NR==1 || FNR!=1' {{}} + | gzip > {output.allMotifs_tsv}
input:
motifsBed = rules.concatenateMotifs.output.allMotifs_tsv
output:
bedTemp = TEMP_EXTENSION_DIR + "/" + compType + "motifs.coord.permutation{perm}.bed.gz",
bed = TEMP_EXTENSION_DIR + "/" + compType + "motifs.coord.nucContent.permutation{perm}.bed.gz"
log:
message: "{ruleDisplayMessage}Calculate nucleotide content via bedtools nuc for file {input} ..."
threads: 1
params: refGenome = config["additionalInputFiles"]["refGenome_fasta"]
shell:
"""
zcat {input.motifsBed} | grep -P "^{wildcards.perm}\\t" | awk '{{OFS="\\t"}};{{print $3,$4,$5,$6,$2}}' | gzip > {output.bedTemp} &&
bedtools nuc -fi {params.refGenome} -bed {output.bedTemp} | gzip > {output.bed}
nucContent = expand('{dir}/{compType}motifs.coord.nucContent.permutation{perm}.bed.gz', dir = TEMP_EXTENSION_DIR, compType = compType, perm = range(0, nPerm + 1, 1)),
motifes = rules.concatenateMotifs.output.allMotifs_tsv
output:
allTFData = TEMP_EXTENSION_DIR + "/" + compType + "allTFData_processedForPermutations.rds",
allTFUniqueData = TEMP_EXTENSION_DIR + "/" + compType + "allTFUniqueData_processedForPermutations.rds"
log: expand('{dir}/5.prepareBinning.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_prepareBinning} ..."

Christian Arnold
committed
script: dir_scripts + script_prepareBinning
allTFData = rules.prepareBinning.output.allTFData,
allTFUniqueData = rules.prepareBinning.output.allTFUniqueData
permResults = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.permutationResults.rds', dir = TF_DIR, extension = extDir, compType = compType),
summary = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.permutationSummary.tsv', dir = TF_DIR, extension = extDir, compType = compType),
covResults = expand('{dir}/{{TF}}/{extension}/{compType}{{TF}}.covarianceResults.rds', dir = TF_DIR, extension = extDir, compType = compType)
log: expand('{dir}/6.binningTF.{{TF}}.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_binningTF} ..."

Christian Arnold
committed
script: dir_scripts + script_binningTF
allPermutationResults = expand('{dir}/{TF}/{extension}/{compType}{TF}.permutationSummary.tsv', dir = TF_DIR, TF = allTF, extension = extDir, compType = compType),
condComp = rules.DESeqPeaks.output.condComp,
DeSeqObj = rules.DESeqPeaks.output.DESeqObj
summary = FINAL_DIR + "/" + compType + "summary.tsv",
circularPlot = FINAL_DIR + "/" + compType + "summary.circular.pdf",
diagnosticPlots = FINAL_DIR + "/" + compType + "diagnosticPlots.pdf",
log: expand('{dir}/7.summaryFinal.R.log', dir = LOG_BENCHMARK_DIR)
message: "{ruleDisplayMessage}Run R script {script_summaryFinal} ..."

Christian Arnold
committed
script: dir_scripts + script_summaryFinal
rule cleanUpLogFiles:
input: rules.summaryFinal.output
output:
warnLog = LOG_BENCHMARK_DIR + "/" + compType + "all.warnings.log",
errorLog = LOG_BENCHMARK_DIR + "/" + compType + "all.errors.log"
message: "{ruleDisplayMessage}Clean and summarize Logs_and_Benchmark directory..."
threads: 1
params: dir = LOG_BENCHMARK_DIR
shell:
"""
grep -iP "^WARN" {params.dir}/*.log > {output.warnLog} || true &&
grep -iP "^FATAL" {params.dir}/*.log > {output.errorLog} || true &&
rm {params.dir}/*.out {params.dir}/*.err || true
"""