Step 2: Define metadata and workflow graphical interface
The input parameters need to be explicitly defined to construct a graphical interface for a Nextflow pipeline. These parameters will be exposed to scientists in a web interface once the workflow is uploaded to Latch.
The Latch SDK provides a command to automatically generate the metadata file from an existing nextflow_schema.json file.
If your workflow does not have a nextflow_schema.json file, you must manually define the Nextflow metadata and input parameters.
The first file holds the NextflowMetadata object, and the second file contains the input parameter definitions.
Before continuing, we will need to make a few updates to the generated files to ensure that the input parameters are correctly defined:
It is important to always verify that the generated input parameters and their inferred types are as expected.
After inspecting the generated parameters for nf-core/rnaseq, we notice that the data types for hisat2_index, salmon_index, and rsem_index are typing.Optional[str]
instead of typing.Optional[LatchFile]. Update these parameters to their correct types.
'hisat2_index': NextflowParameter( type=typing.Optional[LatchFile], # MODIFIED default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built HISAT2 index.',),'rsem_index': NextflowParameter( type=typing.Optional[LatchFile], # MODIFIED default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built RSEM index.',),'salmon_index': NextflowParameter( type=typing.Optional[LatchFile], # MODIFIED default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built Salmon index.',),
For this tutorial, we will execute the workflow using the parameters defined in the test configuration profile.
To simplify the user interface, remove any input parameters not defined in conf/test.config from your latch_metadata/parameters.py.
After making the above updates, your latch_metadata/parameters.py file should now look like this:
Example:
latch_metadata/parameters.py
from dataclasses import dataclassimport typingimport typing_extensionsfrom flytekit.core.annotation import FlyteAnnotationfrom latch.types.metadata import NextflowParameterfrom latch.types.file import LatchFilefrom latch.types.directory import LatchDir# Import these into your `__init__.py` file:## from .parameters import generated_parametersgenerated_parameters = { 'input': NextflowParameter( type=LatchFile, default=None, section_title='Input/output options', description='Path to comma-separated file containing information about the samples in the experiment.', ), 'outdir': NextflowParameter( type=LatchDir, default=None, section_title=None, description='The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.', ), 'fasta': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title="Genome References", description='Path to FASTA genome file.', ), 'gtf': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to GTF annotation file.', ), 'gff': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to GFF3 annotation file.', ), 'transcript_fasta': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to FASTA transcriptome file.', ), 'additional_fasta': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='FASTA file to concatenate to genome FASTA file e.g. containing spike-in sequences.', ), 'bbsplit_fasta_list': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title='Read filtering options', description='Path to comma-separated file containing a list of reference genomes to filter reads against with BBSplit. You have to also explicitly set `--skip_bbsplit false` if you want to use BBSplit.', ), 'hisat2_index': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built HISAT2 index.', ), 'salmon_index': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built Salmon index.', ), 'rsem_index': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built RSEM index.', ), 'skip_bbsplit': NextflowParameter( type=typing.Optional[bool], default=True, section_title=None, description='Skip BBSplit for removal of non-reference genome reads.', ), 'pseudo_aligner': NextflowParameter( type=typing.Optional[str], default=None, section_title=None, description="Specifies the pseudo aligner to use - available options are 'salmon'. Runs in addition to '--aligner'.", ), 'umitools_bc_pattern': NextflowParameter( type=typing.Optional[str], default=None, section_title=None, description="The UMI barcode pattern to use e.g. 'NNNNNN' indicates that the first 6 nucleotides of the read are from the UMI.", ),}
Let’s inspect the most relevant fields of the NextflowMetadata object:
display_name: The display name of the workflow, as it will appear on the Latch UI.
author: Name of the person or organization that publishes the workflow
parameters: Input parameters to the workflow, defined as NextflowParameter objects. The Latch Console will expose these parameters to scientists before they execute the workflow.
Input parameters are passed to Nextflow as command line arguments via --param-name param-value. Therefore, the key of the parameters dictionary should match the name of the parameter in the Nextflow script.
runtime_resources: The resources the Nextflow Runtime requires to execute the workflow. The storage_gib field will configure the storage size in GiB for the shared filesystem.
log_dir: Latch directory to dump .nextflow.log file on workflow failure.
Step 3 (Optional): Importing samplesheets from Latch Registry
The input parameter in our rnaseq workflow currently accepts a samplesheet as a CSV formatted LatchFile. Generating and handling
samplesheet files can be cumbersome and error-prone.
Latch Registry is a friendly table interface that allows users to fill out a sample sheet and link sequencing file for each sample.
The section below outlines how to configure a Nextflow workflow to accept a Latch Registry samplesheet (instead of a LatchFile).
Define the schema required for the input samplesheet as a Python dataclass. Each field in the dataclass represents a column in the samplesheet.
For nf-core/rnaseq, add the following snippet to your latch_metadata/parameters.py file:
Update the type information for the samplesheet input parameter. Samplesheets must always be a list of dataclass objects.
Locate the input parameter in the generated_parameters dictionary in latch_metadata/parameters.py and make the following changes:
latch_metadata/parameters.py
generated_parameters = { 'input': NextflowParameter( type=typing.List[Sample], # MODIFIED samplesheet=True, # ADDED section_title='Input/output options', description='Path to comma-separated file containing information about the samples in the experiment.', ), ...}
Define a samplesheet constructor to convert the input objects to a samplesheet file.
generated_parameters = { 'input': NextflowParameter( type=typing.List[Sample], samplesheet=True, samplesheet_type='csv', # ADDED, also accepts 'tsv' section_title='Input/output options', description='Path to comma-separated file containing information about the samples in the experiment.', ), ...}
Your latch_metadata/parameters.py should now look like this:
latch_metadata/parameters.py
from dataclasses import dataclassimport typingimport typing_extensionsfrom flytekit.core.annotation import FlyteAnnotationfrom latch.types.metadata import NextflowParameterfrom latch.types.file import LatchFilefrom latch.types.directory import LatchDir# Import these into your `__init__.py` file:## from .parameters import generated_parameters@dataclass(frozen=True)class Sample: sample: str fastq_1: LatchFile fastq_2: typing.Optional[LatchFile] strandedness: strgenerated_parameters = { 'input': NextflowParameter( type=typing.List[Sample], samplesheet=True, samplesheet_type='csv', # ADDED, also accepts 'tsv' section_title='Input/output options', description='Path to comma-separated file containing information about the samples in the experiment.', ), 'outdir': NextflowParameter( type=LatchDir, default=None, section_title=None, description='The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.', ), 'fasta': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title="Genome References", description='Path to FASTA genome file.', ), 'gtf': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to GTF annotation file.', ), 'gff': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to GFF3 annotation file.', ), 'transcript_fasta': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to FASTA transcriptome file.', ), 'additional_fasta': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='FASTA file to concatenate to genome FASTA file e.g. containing spike-in sequences.', ), 'bbsplit_fasta_list': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title='Read filtering options', description='Path to comma-separated file containing a list of reference genomes to filter reads against with BBSplit. You have to also explicitly set `--skip_bbsplit false` if you want to use BBSplit.', ), 'hisat2_index': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built HISAT2 index.', ), 'salmon_index': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built Salmon index.', ), 'rsem_index': NextflowParameter( type=typing.Optional[LatchFile], default=None, section_title=None, description='Path to directory or tar.gz archive for pre-built RSEM index.', ), 'skip_bbsplit': NextflowParameter( type=typing.Optional[bool], default=True, section_title=None, description='Skip BBSplit for removal of non-reference genome reads.', ), 'pseudo_aligner': NextflowParameter( type=typing.Optional[str], default=None, section_title=None, description="Specifies the pseudo aligner to use - available options are 'salmon'. Runs in addition to '--aligner'.", ), 'umitools_bc_pattern': NextflowParameter( type=typing.Optional[str], default=None, section_title=None, description="The UMI barcode pattern to use e.g. 'NNNNNN' indicates that the first 6 nucleotides of the read are from the UMI.", ),}
latch register .: Searches for a Latch workflow in the current directory and registers it to Latch.
--nf-script main.nf: Specifies the Nextflow script passed to the Nextflow command at runtime. For this workflow: nextflow run main.nf
--nf-execution-profile docker,test: Defines the execution profile to use when running the workflow on Latch. We specify the docker configuration profile to execute processes in a containerized environment.
After running the above command, the Latch SDK will generate two files:
latch.config - a Nextflow configuration file passed to Nextflow via the -config flag.
wf/entrypoint.py - the generated Latch SDK workflow code that executes the Nextflow pipeline.
Once the workflow is registered, click on the link provided in the output of the latch register command. This will take you to an interface like the one below:
As a part of the registration process, we build a docker image which is specified in a Dockerfile. Normally this Dockerfile is autogenerated and stored in .latch, but if there is already a Dockerfile in the workflow directory prior to registering, it will be used to build this image. This can result in errors down the line if the Dockerfile is not generated by Latch.
Before executing the workflow, we need to upload test data to Latch. You can find sample test data here.
Copy the test data to your Latch workspace by clicking the Copy to Workspace button in the top right corner.
Now, let’s create the samplesheet in Latch Registry.
Select the table you just created and click “Import CSV”. This will open up the Latch Data filesystem. Import the samplesheet.csv file you copied from the provided test data.
Your data is now uploaded to Latch and ready to be processed!
Navigate to the Workflows tab in the Latch Console and select the workflow you previously registered.
Then, select the appropriate input parameters from the test data you uploaded and click Launch Workflow in the bottom right corner to execute the workflow.
The workflow orchestrator will use these input parameters along with the metadata provided at registration time to construct the Nextflow command.
For example, the RNA-seq pipeline will be launched via the following command:
After launching the workflow, you can monitor progress by clicking on the appropriate execution under the Executions tab of your workflow.
Under the Graph & Logs tab, you can view the generated two-stage DAG with the initialization step and the Nextflow runtime task.
If you click on the Nextflow runtime node, you can view the runtime logs generated by Nextflow.
Once the Nextflow runtime starts executing the workflow, a Process Nodes tab will appear in the menu bar where you can monitor the status of each process in the workflow.
Each node in the DAG represents a process in the Nextflow pipeline.
To more easily navigate the graph, you can filter the process nodes by
execution status by clicking the “Filter by Status” button in the top right
corner.
Click on a process node to see details of every invocation of that process, including the resources provisioned, execution time, and logs.
Once the workflow is complete, you can view any published outputs in Latch Data. It is convention for Nextflow workflows to use the outdir parameter
to prepend publishDir paths. For example, if we set our outdir parameter to latch:///nf-rnaseq/outputs, all pipeline outputs will be published to the
nf-rnaseq/outputs directory in Latch Data.
As explained in Step 5, the latch register command generates a Latch workflow that runs the Nextflow workflow. In order to provide developers with flexibility
over how their Nextflow pipelines are executed on Latch, the generated workflow code can be modified to execute custom pre- and post-processing logic.
In this tutorial, we will modify the generated wf/entrypoint.py file to add a Run Name parameter that will be used to namespace the outputs of the Nextflow pipeline.
To do this, add the run_name parameter to your entrypoint.py file as follows:
We add a run_name parameter to the nf_nf_core_rnaseq workflow function. All parameters defined in the nf_nf_core_rnaseq function are
exposed the the user in the Latch UI.
Pass the run_name parameter to the nextflow_runtime task in the body of the workflow function.
Add the run_name parameter to the nextflow_runtime task signature.
Then, add logic to the nextflow_runtime task to append the run_name parameter to the outdir parameter before executing the Nextflow pipeline.
We will now re-register the workflow with the above updates. We purposely exclude the --nf-script flag in the latch register command to avoid
re-generating the Latch SDK workflow code (which will overwrite our updates).