This tutorial will outline the steps required to launch a Nextflow pipeline on Latch.

Prerequisites

  • Register for an account and log into the Latch Console
  • Install a compatible version of Python. The Latch SDK is currently only supported for Python >=3.8 and <=3.11
  • Install the Latch SDK >= 2.52.3

Example on Ubuntu:

$ python3 -m venv env
$ source env/bin/activate
$ pip install latch

Step 1: Clone your Nextflow pipeline

We will use nf-core’s rnaseq as an example; however, feel free to follow along with any Nextflow pipeline.

git clone https://github.com/nf-core/rnaseq
cd rnaseq

Step 2: Define metadata and workflow graphical interface

The input parameters need to be explicitly defined to construct a graphical interface for a Nextflow pipeline. These parameters will be exposed to scientists in a web interface once the workflow is uploaded to Latch.

The Latch SDK provides a command to automatically generate the metadata file from an existing nextflow_schema.json file. If your workflow does not have a nextflow_schema.json file, you must manually define the Nextflow metadata and input parameters.

latch generate-metadata nextflow_schema.json --nextflow

The command parses parameters defined in the nextflow_schema.json and generates two files:

latch_metadata/__init__.py
latch_metadata/parameters.py

The first file holds the NextflowMetadata object, and the second file contains the input parameter definitions.

Before continuing, we will need to make a few updates to the generated files to ensure that the input parameters are correctly defined:

  1. It is important to always verify that the generated input parameters and their inferred types are as expected. After inspecting the generated parameters for nf-core/rnaseq, we notice that the data types for hisat2_index, salmon_index, and rsem_index are typing.Optional[str] instead of typing.Optional[LatchFile]. Update these parameters to their correct types.
'hisat2_index': NextflowParameter(
    type=typing.Optional[LatchFile], # MODIFIED
    default=None,
    section_title=None,
    description='Path to directory or tar.gz archive for pre-built HISAT2 index.',
),
'rsem_index': NextflowParameter(
    type=typing.Optional[LatchFile], # MODIFIED
    default=None,
    section_title=None,
    description='Path to directory or tar.gz archive for pre-built RSEM index.',
),
'salmon_index': NextflowParameter(
    type=typing.Optional[LatchFile], # MODIFIED
    default=None,
    section_title=None,
    description='Path to directory or tar.gz archive for pre-built Salmon index.',
),
  1. For this tutorial, we will execute the workflow using the parameters defined in the test configuration profile. To simplify the user interface, remove any input parameters not defined in conf/test.config from your latch_metadata/parameters.py.

After making the above updates, your latch_metadata/parameters.py file should now look like this:

Example:

latch_metadata/parameters.py
from dataclasses import dataclass
import typing
import typing_extensions

from flytekit.core.annotation import FlyteAnnotation

from latch.types.metadata import NextflowParameter
from latch.types.file import LatchFile
from latch.types.directory import LatchDir

# Import these into your `__init__.py` file:
#
# from .parameters import generated_parameters

generated_parameters = {
    'input': NextflowParameter(
        type=LatchFile,
        default=None,
        section_title='Input/output options',
        description='Path to comma-separated file containing information about the samples in the experiment.',
    ),
    'outdir': NextflowParameter(
        type=LatchDir,
        default=None,
        section_title=None,
        description='The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.',
    ),
    'fasta': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title="Genome References",
        description='Path to FASTA genome file.',
    ),
    'gtf': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to GTF annotation file.',
    ),
    'gff': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to GFF3 annotation file.',
    ),
    'transcript_fasta': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to FASTA transcriptome file.',
    ),
    'additional_fasta': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='FASTA file to concatenate to genome FASTA file e.g. containing spike-in sequences.',
    ),
    'bbsplit_fasta_list': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title='Read filtering options',
        description='Path to comma-separated file containing a list of reference genomes to filter reads against with BBSplit. You have to also explicitly set `--skip_bbsplit false` if you want to use BBSplit.',
    ),
    'hisat2_index': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to directory or tar.gz archive for pre-built HISAT2 index.',
    ),
    'salmon_index': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to directory or tar.gz archive for pre-built Salmon index.',
    ),
    'rsem_index': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to directory or tar.gz archive for pre-built RSEM index.',
    ),
    'skip_bbsplit': NextflowParameter(
        type=typing.Optional[bool],
        default=True,
        section_title=None,
        description='Skip BBSplit for removal of non-reference genome reads.',
    ),
    'pseudo_aligner': NextflowParameter(
        type=typing.Optional[str],
        default=None,
        section_title=None,
        description="Specifies the pseudo aligner to use - available options are 'salmon'. Runs in addition to '--aligner'.",
    ),
    'umitools_bc_pattern': NextflowParameter(
        type=typing.Optional[str],
        default=None,
        section_title=None,
        description="The UMI barcode pattern to use e.g. 'NNNNNN' indicates that the first 6 nucleotides of the read are from the UMI.",
    ),
}

Let’s inspect the most relevant fields of the NextflowMetadata object:

display_name: The display name of the workflow, as it will appear on the Latch UI.

author: Name of the person or organization that publishes the workflow

parameters: Input parameters to the workflow, defined as NextflowParameter objects. The Latch Console will expose these parameters to scientists before they execute the workflow. Input parameters are passed to Nextflow as command line arguments via --param-name param-value. Therefore, the key of the parameters dictionary should match the name of the parameter in the Nextflow script.

runtime_resources: The resources the Nextflow Runtime requires to execute the workflow. The storage_gib field will configure the storage size in GiB for the shared filesystem.

log_dir: Latch directory to dump .nextflow.log file on workflow failure.

Step 3 (Optional): Importing samplesheets from Latch Registry

The input parameter in our rnaseq workflow currently accepts a samplesheet as a CSV formatted LatchFile. Generating and handling samplesheet files can be cumbersome and error-prone.

Latch Registry is a friendly table interface that allows users to fill out a sample sheet and link sequencing file for each sample. The section below outlines how to configure a Nextflow workflow to accept a Latch Registry samplesheet (instead of a LatchFile).

  1. Define the schema required for the input samplesheet as a Python dataclass. Each field in the dataclass represents a column in the samplesheet.

For nf-core/rnaseq, add the following snippet to your latch_metadata/parameters.py file:

latch_metadata/parameters.py
@dataclass(frozen=True)
class Sample:
    sample: str
    fastq_1: LatchFile
    fastq_2: typing.Optional[LatchFile]
    strandedness: str

generated_parameters = {
    ...
}
  1. Update the type information for the samplesheet input parameter. Samplesheets must always be a list of dataclass objects.

Locate the input parameter in the generated_parameters dictionary in latch_metadata/parameters.py and make the following changes:

latch_metadata/parameters.py
generated_parameters = {
    'input': NextflowParameter(
        type=typing.List[Sample],   # MODIFIED
        samplesheet=True,           # ADDED
        section_title='Input/output options',
        description='Path to comma-separated file containing information about the samples in the experiment.',
    ),
    ...
}
  1. Define a samplesheet constructor to convert the input objects to a samplesheet file.

Your latch_metadata/parameters.py should now look like this:

latch_metadata/parameters.py
from dataclasses import dataclass
import typing
import typing_extensions

from flytekit.core.annotation import FlyteAnnotation

from latch.types.metadata import NextflowParameter
from latch.types.file import LatchFile
from latch.types.directory import LatchDir

# Import these into your `__init__.py` file:
#
# from .parameters import generated_parameters

@dataclass(frozen=True)
class Sample:
    sample: str
    fastq_1: LatchFile
    fastq_2: typing.Optional[LatchFile]
    strandedness: str


generated_parameters = {
      'input': NextflowParameter(
        type=typing.List[Sample],
        samplesheet=True,
        samplesheet_type='csv',  # ADDED, also accepts 'tsv'
        section_title='Input/output options',
        description='Path to comma-separated file containing information about the samples in the experiment.',
    ),
    'outdir': NextflowParameter(
        type=LatchDir,
        default=None,
        section_title=None,
        description='The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.',
    ),
    'fasta': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title="Genome References",
        description='Path to FASTA genome file.',
    ),
    'gtf': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to GTF annotation file.',
    ),
    'gff': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to GFF3 annotation file.',
    ),
    'transcript_fasta': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to FASTA transcriptome file.',
    ),
    'additional_fasta': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='FASTA file to concatenate to genome FASTA file e.g. containing spike-in sequences.',
    ),
    'bbsplit_fasta_list': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title='Read filtering options',
        description='Path to comma-separated file containing a list of reference genomes to filter reads against with BBSplit. You have to also explicitly set `--skip_bbsplit false` if you want to use BBSplit.',
    ),
    'hisat2_index': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to directory or tar.gz archive for pre-built HISAT2 index.',
    ),
    'salmon_index': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to directory or tar.gz archive for pre-built Salmon index.',
    ),
    'rsem_index': NextflowParameter(
        type=typing.Optional[LatchFile],
        default=None,
        section_title=None,
        description='Path to directory or tar.gz archive for pre-built RSEM index.',
    ),
    'skip_bbsplit': NextflowParameter(
        type=typing.Optional[bool],
        default=True,
        section_title=None,
        description='Skip BBSplit for removal of non-reference genome reads.',
    ),
    'pseudo_aligner': NextflowParameter(
        type=typing.Optional[str],
        default=None,
        section_title=None,
        description="Specifies the pseudo aligner to use - available options are 'salmon'. Runs in addition to '--aligner'.",
    ),
    'umitools_bc_pattern': NextflowParameter(
        type=typing.Optional[str],
        default=None,
        section_title=None,
        description="The UMI barcode pattern to use e.g. 'NNNNNN' indicates that the first 6 nucleotides of the read are from the UMI.",
    ),
}

Step 4: Register the workflow

To register a Nextflow pipeline on Latch, type:

latch login
latch register . --nf-script main.nf --nf-execution-profile docker,test

Lets break down the above command:

latch register .: Searches for a Latch workflow in the current directory and registers it to Latch.

--nf-script main.nf: Specifies the Nextflow script passed to the Nextflow command at runtime. For this workflow: nextflow run main.nf

--nf-execution-profile docker,test: Defines the execution profile to use when running the workflow on Latch. We specify the docker configuration profile to execute processes in a containerized environment.

After running the above command, the Latch SDK will generate two files:

  1. latch.config - a Nextflow configuration file passed to Nextflow via the -config flag.
  2. wf/entrypoint.py - the generated Latch SDK workflow code that executes the Nextflow pipeline.

Once the workflow is registered, click on the link provided in the output of the latch register command. This will take you to an interface like the one below:

As a part of the registration process, we build a docker image which is specified in a Dockerfile. Normally this Dockerfile is autogenerated and stored in .latch, but if there is already a Dockerfile in the workflow directory prior to registering, it will be used to build this image. This can result in errors down the line if the Dockerfile is not generated by Latch.

Step 5: Execute the workflow

Before executing the workflow, we need to upload test data to Latch. You can find sample test data here. Copy the test data to your Latch workspace by clicking the Copy to Workspace button in the top right corner.

Now, let’s create the samplesheet in Latch Registry.

  1. Navigate to the Latch Registry and create a new Table.
  2. Select the table you just created and click “Import CSV”. This will open up the Latch Data filesystem. Import the samplesheet.csv file you copied from the provided test data.

Your data is now uploaded to Latch and ready to be processed!

Navigate to the Workflows tab in the Latch Console and select the workflow you previously registered. Then, select the appropriate input parameters from the test data you uploaded and click Launch Workflow in the bottom right corner to execute the workflow.

The workflow orchestrator will use these input parameters along with the metadata provided at registration time to construct the Nextflow command. For example, the RNA-seq pipeline will be launched via the following command:

nextflow run main.nf -profile docker,test --input samplesheet.csv --outdir latch:///nf-rnaseq/outputs --fasta latch:///nf-rnaseq/inputs/reference/genome.fasta ...

Step 6: Monitoring the workflow

After launching the workflow, you can monitor progress by clicking on the appropriate execution under the Executions tab of your workflow.

Under the Graph & Logs tab, you can view the generated two-stage DAG with the initialization step and the Nextflow runtime task. If you click on the Nextflow runtime node, you can view the runtime logs generated by Nextflow.

Once the Nextflow runtime starts executing the workflow, a Process Nodes tab will appear in the menu bar where you can monitor the status of each process in the workflow. Each node in the DAG represents a process in the Nextflow pipeline.

To more easily navigate the graph, you can filter the process nodes by execution status by clicking the “Filter by Status” button in the top right corner.

Click on a process node to see details of every invocation of that process, including the resources provisioned, execution time, and logs.

Once the workflow is complete, you can view any published outputs in Latch Data. It is convention for Nextflow workflows to use the outdir parameter to prepend publishDir paths. For example, if we set our outdir parameter to latch:///nf-rnaseq/outputs, all pipeline outputs will be published to the nf-rnaseq/outputs directory in Latch Data.

Step 7 (Optional): Customizing the Workflow

As explained in Step 5, the latch register command generates a Latch workflow that runs the Nextflow workflow. In order to provide developers with flexibility over how their Nextflow pipelines are executed on Latch, the generated workflow code can be modified to execute custom pre- and post-processing logic.

In this tutorial, we will modify the generated wf/entrypoint.py file to add a Run Name parameter that will be used to namespace the outputs of the Nextflow pipeline.

To do this, add the run_name parameter to your entrypoint.py file as follows:

@nextflow_runtime_task(cpu=4, memory=8, storage_gib=100)
def nextflow_runtime(
    pvc_name: str,
    run_name: str, # -- (3)
    input: LatchFile,
    outdir: LatchDir,
    fasta: typing.Optional[LatchFile],
    gtf: typing.Optional[LatchFile],
    gff: typing.Optional[LatchFile],
    transcript_fasta: typing.Optional[LatchFile],
    additional_fasta: typing.Optional[LatchFile],
    bbsplit_fasta_list: typing.Optional[LatchFile],
    hisat2_index: typing.Optional[LatchFile],
    salmon_index: typing.Optional[LatchFile],
    rsem_index: typing.Optional[LatchFile],
    pseudo_aligner: typing.Optional[str],
    umitools_bc_pattern: typing.Optional[str],
    skip_bbsplit: typing.Optional[bool],
) -> None:
    ...


@workflow(metadata._nextflow_metadata)
def nf_nf_core_rnaseq(
    run_name: str, # -- (1)
    input: LatchFile,
    outdir: LatchDir,
    fasta: typing.Optional[LatchFile],
    gtf: typing.Optional[LatchFile],
    gff: typing.Optional[LatchFile],
    transcript_fasta: typing.Optional[LatchFile],
    additional_fasta: typing.Optional[LatchFile],
    bbsplit_fasta_list: typing.Optional[LatchFile],
    hisat2_index: typing.Optional[LatchFile],
    salmon_index: typing.Optional[LatchFile],
    rsem_index: typing.Optional[LatchFile],
    pseudo_aligner: typing.Optional[str],
    umitools_bc_pattern: typing.Optional[str],
    skip_bbsplit: typing.Optional[bool] = True,
) -> None:
    """
    nf-core/rnaseq

    Sample Description
    """

    pvc_name: str = initialize()
    nextflow_runtime(
        pvc_name=pvc_name,
        run_name=run_name, # -- (2)
        input=input,
        outdir=outdir,
        fasta=fasta,
        gtf=gtf,
        gff=gff,
        transcript_fasta=transcript_fasta,
        additional_fasta=additional_fasta,
        bbsplit_fasta_list=bbsplit_fasta_list,
        hisat2_index=hisat2_index,
        salmon_index=salmon_index,
        rsem_index=rsem_index,
        skip_bbsplit=skip_bbsplit,
        pseudo_aligner=pseudo_aligner,
        umitools_bc_pattern=umitools_bc_pattern,
    )

In the above code snippet:

  1. We add a run_name parameter to the nf_nf_core_rnaseq workflow function. All parameters defined in the nf_nf_core_rnaseq function are exposed the the user in the Latch UI.
  2. Pass the run_name parameter to the nextflow_runtime task in the body of the workflow function.
  3. Add the run_name parameter to the nextflow_runtime task signature.

Then, add logic to the nextflow_runtime task to append the run_name parameter to the outdir parameter before executing the Nextflow pipeline.

@nextflow_runtime_task(cpu=4, memory=8, storage_gib=100)
def nextflow_runtime(...) -> None:
    ...

    cmd = [
        "/root/nextflow",
        "run",
        str(shared_dir / "main.nf"),
        "-work-dir",
        str(shared_dir),
        "-profile",
        profiles,
        "-c",
        "latch.config",
        "-resume",
        *get_flag("input", input),
        *get_flag("outdir", LatchDir(f"{outdir.remote_path}/{run_name}")), # UPDATED
        *get_flag("fasta", fasta),
        *get_flag("gtf", gtf),
        *get_flag("gff", gff),
        *get_flag("transcript_fasta", transcript_fasta),
        *get_flag("additional_fasta", additional_fasta),
        *get_flag("bbsplit_fasta_list", bbsplit_fasta_list),
        *get_flag("hisat2_index", hisat2_index),
        *get_flag("salmon_index", salmon_index),
        *get_flag("rsem_index", rsem_index),
        *get_flag("skip_bbsplit", skip_bbsplit),
        *get_flag("pseudo_aligner", pseudo_aligner),
        *get_flag("umitools_bc_pattern", umitools_bc_pattern),
    ]

    ...

We will now re-register the workflow with the above updates. We purposely exclude the --nf-script flag in the latch register command to avoid re-generating the Latch SDK workflow code (which will overwrite our updates).

latch register .