When Snakemake workflows are executed locally on a single computer or high-performance cluster, all dependencies and input/ output files are on a single machine.
When a Snakemake workflow is executed on Latch, each generated job is run in a separate container on a potentially isolated machine.
Therefore, it may be necessary to adapt your Snakefile to address issues arising from this execution method, which were not encountered during local execution:
- Add missing rule inputs that are implicitly fulfilled when executing locally.
- Make sure shared code does not rely on input files. This is any code that is not under a rule, and so gets executed by every task
- Optimize data transfer by merging tasks that have 1-to-1 dependencies
Here, we will walk through examples of each of the cases outlined above.
When a Snakemake workflow is executed on Latch, each generated job for the Snakefile rule is run on a separate machine. Only files and directories explicitly specified under the input
directive of the rule are downloaded in the task.
A typical example is if the index files for biological data are not explicitly specified as a Snakefile input, the generated job for that rule will fail due to the missing index files.
Example
In the example below, there are two Snakefile rules:
delly_s
: The rule runs Delly to call SVs and outputs an unfiltered BCF file, followed by quality filtering using bcftools
filter to retain only the SV calls that pass certain filters. Finally, it indexes the BCF file.
delly_merge
: This rule merges or concatenates BCF files containing SV calls from the delly_s rule, producing a single VCF file. The rule requires the index file to be available for each corresponding BAM file.
rule delly_s:
input:
fasta=get_fasta(),
fai=get_faidx()[0],
bam=get_bam("{path}/{sample}"),
bai=get_bai("{path}/{sample}"),
excl_opt=get_bed()
params:
excl_opt='-x "%s"' % get_bed() if exclude_regions() else "",
output:
bcf = os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly-{}{}".format("{sv_type}", config.file_exts.bcf),
)
conda:
"../envs/caller.yaml"
threads: 1
resources:
mem_mb=config.callers.delly.memory,
tmp_mb=config.callers.delly.tmpspace,
shell:
"""
set -xe
OUTDIR="$(dirname "{output.bcf}")"
PREFIX="$(basename "{output.bcf}" .bcf)"
OUTFILE="${{OUTDIR}}/${{PREFIX}}.unfiltered.bcf"
if [ "{config.echo_run}" -eq "1" ]; then
echo "{input}" > "{output}"
else
export OMP_NUM_THREADS={threads}
delly call \
-t "{wildcards.sv_type}" \
-g "{input.fasta}" \
-o "${{OUTFILE}}" \
-q 1 `
-s 9 `
{params.excl_opt} \
"{input.bam}"
bcftools filter \
-O b `
-o "{output.bcf}" \
-i "FILTER == 'PASS'" \
"${{OUTFILE}}"
bcftools index "{output.bcf}"
fi
"""
rule delly_merge:
input:
bcf = [
os.path.join(
"{path}",
"{tumor}--{normal}",
get_outdir("delly"),
"delly-{}{}".format(sv, config.file_exts.bcf),
)
for sv in config.callers.delly.sv_types
]
if config.mode is config.mode.PAIRED_SAMPLE
else [
os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly-{}{}".format(sv, config.file_exts.bcf),
)
for sv in config.callers.delly.sv_types
],
if config.mode is config.mode.PAIRED_SAMPLE
else [
os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly-{}{}".format(sv, config.file_exts.bcf),
) + ".csi"
for sv in config.callers.delly.sv_types
]
output:
os.path.join(
"{path}",
"{tumor}--{normal}",
get_outdir("delly"),
"delly{}".format(config.file_exts.vcf),
)
if config.mode is config.mode.PAIRED_SAMPLE
else os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly{}".format(config.file_exts.vcf),
),
conda:
"../envs/caller.yaml"
threads: 1
resources:
mem_mb=1024,
tmp_mb=0,
shell:
"""
set -x
if [ "{config.echo_run}" -eq "1" ]; then
cat {input} > "{output}"
else
bcftools concat \
-a `
-O v `
-o "{output}" \
{input.bcf}
fi
"""
The above code will fail with the error:
Failed to open: /root/workflow/data/bam/3/T3--N3/delly_out/delly-BND.bcf.csi
Solution
The task failed because the BAM index files (ending with bcf.csi
) are produced by the delly_s
rule but is not explicitly specified as input to the delly_merge
rule. Hence, the index files are not downloaded into the task that executes the delly_merge
rule.
To resolve the error, we need to add the index files as the output of the delly_s
rule and the input of the delly_merge
rule:
rule delly_s:
input:
fasta=get_fasta(),
fai=get_faidx()[0],
bam=get_bam("{path}/{sample}"),
bai=get_bai("{path}/{sample}"),
excl_opt=get_bed()
params:
excl_opt='-x "%s"' % get_bed() if exclude_regions() else "",
output:
bcf = os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly-{}{}".format("{sv_type}", config.file_exts.bcf),
),
bcf_index = os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly-{}{}".format("{sv_type}", config.file_exts.bcf),
) + ".csi"
...
rule delly_merge:
input:
bcf = [
os.path.join(
"{path}",
"{tumor}--{normal}",
get_outdir("delly"),
"delly-{}{}".format(sv, config.file_exts.bcf),
)
for sv in config.callers.delly.sv_types
]
if config.mode is config.mode.PAIRED_SAMPLE
else [
os.path.join(
"{path}",
"{sample}",
get_outdir("delly"),
"delly-{}{}".format(sv, config.file_exts.bcf),
)
for sv in config.callers.delly.sv_types
],
bcf_index = [
os.path.join(
"{path}",
"{tumor}--{normal}",
get_outdir("delly"),
"delly-{}{}".format(sv, config.file_exts.bcf),
) + ".csi"
for sv in config.callers.delly.sv_types
]
...
Tasks at runtime will only download files their target rules explicitly depend on. Shared code, or Snakefile code that is not under any rule, will usually fail if it tries to read input files.
Example
samples = Path("inputs").glob("*.fastq")
rule all:
input:
expand("fastqc/{sample}.html", sample=samples)
rule fastqc:
input:
"inputs/{sample}.fastq"
output:
"fastqc/{sample}.html"
shellcmd:
fastqc {input} -o {output}
Since the Path("inputs").glob(...)
call is not under any rule, it runs in all tasks. Because the fastqc
rule does not specify input_dir
as an input
, it will not be downloaded and the code will throw an error.
Solution
Only access files when necessary (i.e. when computing dependencies as in the example, or in a rule body) by placing problematic code within rule definitions. Either directly inline the variable or write a function to use in place of the variable.
Example
rule all_inline:
input:
expand("fastqc/{sample}.html", sample=Path("inputs").glob("*.fastq"))
def get_samples():
samples = Path("inputs").glob("*.fastq")
return samples
rule all_function:
input:
expand("fastqc/{sample}.html", sample=get_samples())
This works because the JIT step replaces input
, output
, params
, and other declarations with static strings for the runtime workflow so any function calls within them will be replaced with pre-computed strings and the Snakefile will not attempt to read the files again.
Same example at runtime:
rule all_inline:
input:
"fastqc/example.html"
def get_samples():
samples = Path("inputs").glob("*.fastq")
return samples
rule all_function:
input:
"fastqc/example.html"
Example using multiple return values:
def get_samples_data():
samples = Path("inputs").glob("*.fastq")
return {
"samples": samples,
"names": [x.name for x in samples]
}
rule all:
input:
expand("fastqc/{sample}.html", sample=get_samples_data()["samples"]),
expand("reports/{name}.txt", name=get_samples_data()["names"]),
Optimize data transfer
In a Snakemake workflow, each rule is executed on a separate, isolated machine. As a result, all input files specified for a rule are downloaded to the machine every time the rule is run. Frequent downloading of the same input files across multiple rules can lead to increased workflow runtime and higher costs, especially if the data files are large.
To optimize performance and minimize costs, it is recommended to consolidate the logic that relies on shared inputs into a single rule.
Example
- Inefficient example with multiple rules processing the same BAM file:
rule all:
input:
"results/final_variants.vcf"
rule mark_duplicates:
input:
"data/sample.bam"
output:
"results/dedupped_sample.bam"
shell:
"""
gatk MarkDuplicates \
-I {input} \
-O {output} \
-M results/metrics.txt
"""
rule call_variants:
input:
bam = "results/dedupped_sample.bam",
ref = "data/reference.fasta"
output:
"results/raw_variants.vcf"
shell:
"""
gatk HaplotypeCaller \
-R {input.ref} \
-I {input.bam} \
-O {output}
"""
rule filter_variants:
input:
"results/raw_variants.vcf"
output:
"results/final_variants.vcf"
shell:
"""
gatk VariantFiltration \
-V {input} \
-O {output} \
--filter-name "QD_filter" \
--filter-expression "QD < 2.0"
"""
Solution
Instead of having separate rules processing the BAM file for marking duplicates, calling variants, and filtering variants, we consolidate the logic into a single rule, reducing redundant data downloads.
rule process_and_call_variants:
input:
bam = "data/sample.bam",
ref = "data/reference.fasta"
output:
vcf = "results/final_variants.vcf",
dedupped_bam = temp("results/dedupped_sample.bam"),
raw_vcf = temp("results/raw_variants.vcf")
shell:
"""
gatk MarkDuplicates \
-I {input.bam} \
-O {output.dedupped_bam} \
-M results/metrics.txt
gatk HaplotypeCaller \
-R {input.ref} \
-I {output.dedupped_bam} \
-O {output.raw_vcf}
gatk VariantFiltration \
-V {output.raw_vcf} \
-O {output.vcf} \
--filter-name "QD_filter" \
--filter-expression "QD < 2.0"
"""