This tutorial will outline the steps required to launch a Nextflow pipeline on Latch.
Prerequisites
- Register for an account and log into the Latch Console
- Install a compatible version of Python. The Latch SDK is currently only supported for Python
>=3.8
and <=3.11
- Install the Latch SDK
>= 2.52.3
Example on Ubuntu:
$ python3 -m venv env
$ source env/bin/activate
$ pip install latch
Step 1: Clone your Nextflow pipeline
We will use nf-core’s rnaseq as an example; however, feel free to follow along with any Nextflow pipeline.
git clone https://github.com/nf-core/rnaseq
cd rnaseq
The input parameters need to be explicitly defined to construct a graphical interface for a Nextflow pipeline. These parameters will be exposed to scientists in a web interface once the workflow is uploaded to Latch.
The Latch SDK provides a command to automatically generate the metadata file from an existing nextflow_schema.json
file.
If your workflow does not have a nextflow_schema.json
file, you must manually define the Nextflow metadata and input parameters.
latch generate-metadata nextflow_schema.json --nextflow
The command parses parameters defined in the nextflow_schema.json
and generates two files:
latch_metadata/__init__.py
latch_metadata/parameters.py
The first file holds the NextflowMetadata
object, and the second file contains the input parameter definitions.
Before continuing, we will need to make a few updates to the generated files to ensure that the input parameters are correctly defined:
- It is important to always verify that the generated input parameters and their inferred types are as expected.
After inspecting the generated parameters for nf-core/rnaseq, we notice that the data types for
hisat2_index
, salmon_index
, and rsem_index
are typing.Optional[str]
instead of typing.Optional[LatchFile]
. Update these parameters to their correct types.
'hisat2_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built HISAT2 index.',
),
'rsem_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built RSEM index.',
),
'salmon_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built Salmon index.',
),
- For this tutorial, we will execute the workflow using the parameters defined in the
test
configuration profile.
To simplify the user interface, remove any input parameters not defined in conf/test.config
from your latch_metadata/parameters.py
.
After making the above updates, your latch_metadata/parameters.py
file should now look like this:
Example:
latch_metadata/parameters.py
from dataclasses import dataclass
import typing
import typing_extensions
from flytekit.core.annotation import FlyteAnnotation
from latch.types.metadata import NextflowParameter
from latch.types.file import LatchFile
from latch.types.directory import LatchDir
generated_parameters = {
'input': NextflowParameter(
type=LatchFile,
default=None,
section_title='Input/output options',
description='Path to comma-separated file containing information about the samples in the experiment.',
),
'outdir': NextflowParameter(
type=LatchDir,
default=None,
section_title=None,
description='The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.',
),
'fasta': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title="Genome References",
description='Path to FASTA genome file.',
),
'gtf': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to GTF annotation file.',
),
'gff': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to GFF3 annotation file.',
),
'transcript_fasta': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to FASTA transcriptome file.',
),
'additional_fasta': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='FASTA file to concatenate to genome FASTA file e.g. containing spike-in sequences.',
),
'bbsplit_fasta_list': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title='Read filtering options',
description='Path to comma-separated file containing a list of reference genomes to filter reads against with BBSplit. You have to also explicitly set `--skip_bbsplit false` if you want to use BBSplit.',
),
'hisat2_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built HISAT2 index.',
),
'salmon_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built Salmon index.',
),
'rsem_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built RSEM index.',
),
'skip_bbsplit': NextflowParameter(
type=typing.Optional[bool],
default=True,
section_title=None,
description='Skip BBSplit for removal of non-reference genome reads.',
),
'pseudo_aligner': NextflowParameter(
type=typing.Optional[str],
default=None,
section_title=None,
description="Specifies the pseudo aligner to use - available options are 'salmon'. Runs in addition to '--aligner'.",
),
'umitools_bc_pattern': NextflowParameter(
type=typing.Optional[str],
default=None,
section_title=None,
description="The UMI barcode pattern to use e.g. 'NNNNNN' indicates that the first 6 nucleotides of the read are from the UMI.",
),
}
Let’s inspect the most relevant fields of the NextflowMetadata
object:
display_name: The display name of the workflow, as it will appear on the Latch UI.
author: Name of the person or organization that publishes the workflow
parameters: Input parameters to the workflow, defined as NextflowParameter
objects. The Latch Console will expose these parameters to scientists before they execute the workflow.
Input parameters are passed to Nextflow as command line arguments via --param-name param-value
. Therefore, the key
of the parameters
dictionary should match the name of the parameter in the Nextflow script.
runtime_resources: The resources the Nextflow Runtime requires to execute the workflow. The storage_gib
field will configure the storage size in GiB for the shared filesystem.
log_dir: Latch directory to dump .nextflow.log
file on workflow failure.
Step 3 (Optional): Importing samplesheets from Latch Registry
The input
parameter in our rnaseq workflow currently accepts a samplesheet as a CSV formatted LatchFile. Generating and handling
samplesheet files can be cumbersome and error-prone.
Latch Registry is a friendly table interface that allows users to fill out a sample sheet and link sequencing file for each sample.
The section below outlines how to configure a Nextflow workflow to accept a Latch Registry samplesheet (instead of a LatchFile).
- Define the schema required for the input samplesheet as a Python dataclass. Each field in the dataclass represents a column in the samplesheet.
For nf-core/rnaseq
, add the following snippet to your latch_metadata/parameters.py
file:
latch_metadata/parameters.py
@dataclass(frozen=True)
class Sample:
sample: str
fastq_1: LatchFile
fastq_2: typing.Optional[LatchFile]
strandedness: str
generated_parameters = {
...
}
- Update the type information for the samplesheet input parameter. Samplesheets must always be a list of dataclass objects.
Locate the input
parameter in the generated_parameters
dictionary in latch_metadata/parameters.py
and make the following changes:
latch_metadata/parameters.py
generated_parameters = {
'input': NextflowParameter(
type=typing.List[Sample],
samplesheet=True,
section_title='Input/output options',
description='Path to comma-separated file containing information about the samples in the experiment.',
),
...
}
- Define a samplesheet constructor to convert the input objects to a samplesheet file.
Your latch_metadata/parameters.py
should now look like this:
latch_metadata/parameters.py
from dataclasses import dataclass
import typing
import typing_extensions
from flytekit.core.annotation import FlyteAnnotation
from latch.types.metadata import NextflowParameter
from latch.types.file import LatchFile
from latch.types.directory import LatchDir
@dataclass(frozen=True)
class Sample:
sample: str
fastq_1: LatchFile
fastq_2: typing.Optional[LatchFile]
strandedness: str
generated_parameters = {
'input': NextflowParameter(
type=typing.List[Sample],
samplesheet=True,
samplesheet_type='csv',
section_title='Input/output options',
description='Path to comma-separated file containing information about the samples in the experiment.',
),
'outdir': NextflowParameter(
type=LatchDir,
default=None,
section_title=None,
description='The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.',
),
'fasta': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title="Genome References",
description='Path to FASTA genome file.',
),
'gtf': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to GTF annotation file.',
),
'gff': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to GFF3 annotation file.',
),
'transcript_fasta': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to FASTA transcriptome file.',
),
'additional_fasta': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='FASTA file to concatenate to genome FASTA file e.g. containing spike-in sequences.',
),
'bbsplit_fasta_list': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title='Read filtering options',
description='Path to comma-separated file containing a list of reference genomes to filter reads against with BBSplit. You have to also explicitly set `--skip_bbsplit false` if you want to use BBSplit.',
),
'hisat2_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built HISAT2 index.',
),
'salmon_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built Salmon index.',
),
'rsem_index': NextflowParameter(
type=typing.Optional[LatchFile],
default=None,
section_title=None,
description='Path to directory or tar.gz archive for pre-built RSEM index.',
),
'skip_bbsplit': NextflowParameter(
type=typing.Optional[bool],
default=True,
section_title=None,
description='Skip BBSplit for removal of non-reference genome reads.',
),
'pseudo_aligner': NextflowParameter(
type=typing.Optional[str],
default=None,
section_title=None,
description="Specifies the pseudo aligner to use - available options are 'salmon'. Runs in addition to '--aligner'.",
),
'umitools_bc_pattern': NextflowParameter(
type=typing.Optional[str],
default=None,
section_title=None,
description="The UMI barcode pattern to use e.g. 'NNNNNN' indicates that the first 6 nucleotides of the read are from the UMI.",
),
}
Step 4: Register the workflow
To register a Nextflow pipeline on Latch, type:
latch login
latch register . --nf-script main.nf --nf-execution-profile docker,test
Lets break down the above command:
latch register .
: Searches for a Latch workflow in the current directory and registers it to Latch.
--nf-script main.nf
: Specifies the Nextflow script passed to the Nextflow command at runtime. For this workflow: nextflow run main.nf
--nf-execution-profile docker,test
: Defines the execution profile to use when running the workflow on Latch. We specify the docker
configuration profile to execute processes in a containerized environment.
After running the above command, the Latch SDK will generate two files:
latch.config
- a Nextflow configuration file passed to Nextflow via the -config
flag.
wf/entrypoint.py
- the generated Latch SDK workflow code that executes the Nextflow pipeline.
Once the workflow is registered, click on the link provided in the output of the latch register
command. This will take you to an interface like the one below:
As a part of the registration process, we build a docker image which is specified in a Dockerfile
. Normally this Dockerfile
is autogenerated and stored in .latch
, but if there is already a Dockerfile
in the workflow directory prior to registering, it will be used to build this image. This can result in errors down the line if the Dockerfile is not generated by Latch.
Step 5: Execute the workflow
Before executing the workflow, we need to upload test data to Latch. You can find sample test data here.
Copy the test data to your Latch workspace by clicking the Copy to Workspace
button in the top right corner.
Now, let’s create the samplesheet in Latch Registry.
- Navigate to the Latch Registry and create a new Table.
- Select the table you just created and click “Import CSV”. This will open up the Latch Data filesystem. Import the
samplesheet.csv
file you copied from the provided test data.
Your data is now uploaded to Latch and ready to be processed!
Navigate to the Workflows tab in the Latch Console and select the workflow you previously registered.
Then, select the appropriate input parameters from the test data you uploaded and click Launch Workflow
in the bottom right corner to execute the workflow.
The workflow orchestrator will use these input parameters along with the metadata provided at registration time to construct the Nextflow command.
For example, the RNA-seq pipeline will be launched via the following command:
nextflow run main.nf -profile docker,test --input samplesheet.csv --outdir latch:///nf-rnaseq/outputs --fasta latch:///nf-rnaseq/inputs/reference/genome.fasta ...
Step 6: Monitoring the workflow
After launching the workflow, you can monitor progress by clicking on the appropriate execution under the Executions
tab of your workflow.
Under the Graph & Logs
tab, you can view the generated two-stage DAG with the initialization step and the Nextflow runtime task.
If you click on the Nextflow runtime node, you can view the runtime logs generated by Nextflow.
Once the Nextflow runtime starts executing the workflow, a Process Nodes
tab will appear in the menu bar where you can monitor the status of each process in the workflow.
Each node in the DAG represents a process in the Nextflow pipeline.
To more easily navigate the graph, you can filter the process nodes by
execution status by clicking the “Filter by Status” button in the top right
corner.
Click on a process node to see details of every invocation of that process, including the resources provisioned, execution time, and logs.
Once the workflow is complete, you can view any published outputs in Latch Data. It is convention for Nextflow workflows to use the outdir
parameter
to prepend publishDir paths. For example, if we set our outdir
parameter to latch:///nf-rnaseq/outputs
, all pipeline outputs will be published to the
nf-rnaseq/outputs
directory in Latch Data.
Step 7 (Optional): Customizing the Workflow
As explained in Step 5, the latch register
command generates a Latch workflow that runs the Nextflow workflow. In order to provide developers with flexibility
over how their Nextflow pipelines are executed on Latch, the generated workflow code can be modified to execute custom pre- and post-processing logic.
In this tutorial, we will modify the generated wf/entrypoint.py
file to add a Run Name
parameter that will be used to namespace the outputs of the Nextflow pipeline.
To do this, add the run_name
parameter to your entrypoint.py
file as follows:
@nextflow_runtime_task(cpu=4, memory=8, storage_gib=100)
def nextflow_runtime(
pvc_name: str,
run_name: str,
input: LatchFile,
outdir: LatchDir,
fasta: typing.Optional[LatchFile],
gtf: typing.Optional[LatchFile],
gff: typing.Optional[LatchFile],
transcript_fasta: typing.Optional[LatchFile],
additional_fasta: typing.Optional[LatchFile],
bbsplit_fasta_list: typing.Optional[LatchFile],
hisat2_index: typing.Optional[LatchFile],
salmon_index: typing.Optional[LatchFile],
rsem_index: typing.Optional[LatchFile],
pseudo_aligner: typing.Optional[str],
umitools_bc_pattern: typing.Optional[str],
skip_bbsplit: typing.Optional[bool],
) -> None:
...
@workflow(metadata._nextflow_metadata)
def nf_nf_core_rnaseq(
run_name: str,
input: LatchFile,
outdir: LatchDir,
fasta: typing.Optional[LatchFile],
gtf: typing.Optional[LatchFile],
gff: typing.Optional[LatchFile],
transcript_fasta: typing.Optional[LatchFile],
additional_fasta: typing.Optional[LatchFile],
bbsplit_fasta_list: typing.Optional[LatchFile],
hisat2_index: typing.Optional[LatchFile],
salmon_index: typing.Optional[LatchFile],
rsem_index: typing.Optional[LatchFile],
pseudo_aligner: typing.Optional[str],
umitools_bc_pattern: typing.Optional[str],
skip_bbsplit: typing.Optional[bool] = True,
) -> None:
"""
nf-core/rnaseq
Sample Description
"""
pvc_name: str = initialize()
nextflow_runtime(
pvc_name=pvc_name,
run_name=run_name,
input=input,
outdir=outdir,
fasta=fasta,
gtf=gtf,
gff=gff,
transcript_fasta=transcript_fasta,
additional_fasta=additional_fasta,
bbsplit_fasta_list=bbsplit_fasta_list,
hisat2_index=hisat2_index,
salmon_index=salmon_index,
rsem_index=rsem_index,
skip_bbsplit=skip_bbsplit,
pseudo_aligner=pseudo_aligner,
umitools_bc_pattern=umitools_bc_pattern,
)
In the above code snippet:
- We add a
run_name
parameter to the nf_nf_core_rnaseq
workflow function. All parameters defined in the nf_nf_core_rnaseq
function are
exposed the the user in the Latch UI.
- Pass the
run_name
parameter to the nextflow_runtime
task in the body of the workflow function.
- Add the
run_name
parameter to the nextflow_runtime
task signature.
Then, add logic to the nextflow_runtime
task to append the run_name
parameter to the outdir
parameter before executing the Nextflow pipeline.
@nextflow_runtime_task(cpu=4, memory=8, storage_gib=100)
def nextflow_runtime(...) -> None:
...
cmd = [
"/root/nextflow",
"run",
str(shared_dir / "main.nf"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
*get_flag("input", input),
*get_flag("outdir", LatchDir(f"{outdir.remote_path}/{run_name}")),
*get_flag("fasta", fasta),
*get_flag("gtf", gtf),
*get_flag("gff", gff),
*get_flag("transcript_fasta", transcript_fasta),
*get_flag("additional_fasta", additional_fasta),
*get_flag("bbsplit_fasta_list", bbsplit_fasta_list),
*get_flag("hisat2_index", hisat2_index),
*get_flag("salmon_index", salmon_index),
*get_flag("rsem_index", rsem_index),
*get_flag("skip_bbsplit", skip_bbsplit),
*get_flag("pseudo_aligner", pseudo_aligner),
*get_flag("umitools_bc_pattern", umitools_bc_pattern),
]
...
We will now re-register the workflow with the above updates. We purposely exclude the --nf-script
flag in the latch register
command to avoid
re-generating the Latch SDK workflow code (which will overwrite our updates).