Auto-generated API documentation from source code. Generated using: OpenAI/gpt-5-nano-2025-08-07

Table of Contents

latch.init

The Latch SDK is a command line toolchain to define and register serverless workflows with the Latch platform. This module re-exports a collection of utilities and resources from several submodules. The following names are imported and exposed by this package:

latch.resources.tasks

Latch tasks are decorators to turn Python functions into workflow ‘nodes’. Each task is containerized, versioned and registered with Flyte when a workflow is uploaded to Latch. Containerized tasks are then executed on arbitrary instances as Kubernetes Pods, scheduled using flytepropeller under the hood. The type of instance that the task executes on (eg. number of available resources, presence of GPU) can be controlled by invoking one of the set of exported decorators.
from latch.resources.tasks import medium_task

@medium_task
def my_task(a: int) -> str:
  ...

Functions

Below are the functions available in the tasks module.

custom_memory_optimized_task(cpu: int, memory: int)

Description: Deprecated helper returning a custom task configuration with specified CPU and RAM allocations. Notes:
  • This function is deprecated and will be removed in a future release.
  • It raises a deprecation warning.
Parameters:
  • cpu (int): Number of CPU cores to request
  • memory (int): Memory in GiB to request
Returns:
  • A partial Flyte task configured with the specified Pod settings
def custom_memory_optimized_task(cpu: int, memory: int):
Example:
from latch.resources.tasks import custom_memory_optimized_task

@custom_memory_optimized_task(cpu=16, memory=128)
def my_task(a: int) -> str:
  ...

custom_task(cpu: Union[Callable, int], memory: Union[Callable, int], *, storage_gib: Union[Callable, int] = 500, timeout: Union[datetime.timedelta, int] = 0, **kwargs)

Description: Returns a custom task configuration requesting the specified CPU/RAM allocations. If cpu, memory, or storage_gib are callables, returns a dynamic task configuration using DynamicTaskConfig with a small pod config; otherwise, constructs a static _custom_task_config Pod. Parameters:
  • cpu (Union[Callable, int]): CPU cores to request (integer or callable for dynamic)
  • memory (Union[Callable, int]): Memory in GiB to request (integer or callable)
  • storage_gib (Union[Callable, int], default 500): Storage in GiB (integer or callable)
  • timeout (Union[datetime.timedelta, int], default 0): Timeout for the task
  • **kwargs: Additional keyword arguments
Returns:
  • A partial Flyte task configured for either a dynamic or a static custom task
def custom_task(
    cpu: Union[Callable, int],
    memory: Union[Callable, int],
    *,
    storage_gib: Union[Callable, int] = 500,
    timeout: Union[datetime.timedelta, int] = 0,
    **kwargs,
):
Example:
from latch.resources.tasks import custom_task

@custom_task(cpu=8, memory=32, storage_gib=200)
def my_task(a: int) -> str:
  ...

lustre_setup_task()

Description: Returns a partial Flyte task configured for Lustre setup with a Nextflow work directory PVC. Parameters:
  • None
Returns:
  • A partial Flyte task
def lustre_setup_task():
Example:
from latch.resources.tasks import lustre_setup_task

@lustre_setup_task()
def setup_workdir(pvc_name: str) -> str:
    os.chmod("/nf-workdir", 0o777)
    return pvc_name

nextflow_runtime_task(cpu: int, memory: int, storage_gib: int = 50)

Description: Returns a partial Flyte task configured for Nextflow runtime with a shared work directory volume mounted at /nf-workdir. Parameters:
  • cpu (int): CPU cores
  • memory (int): Memory in GiB
  • storage_gib (int, default 50): Storage in GiB
Returns:
  • A partial Flyte task
def nextflow_runtime_task(cpu: int, memory: int, storage_gib: int = 50):
Example:
# Full code example: https://github.com/latchbio-nfcore/atacseq/blob/370c45694bca54a89a7cc65902b0fb54747a87b5/wf/entrypoint.py#L98

from latch.resources.tasks import nextflow_runtime_task

@nextflow_runtime_task(cpu=4, memory=16, storage_gib=64)
def nextflow_runtime(
    pvc_name: str,
    input: LatchFile,
    outdir: LatchDir,
    fasta: LatchFile,
) -> None:
    ...

g6e_xlarge_task, g6e_2xlarge_task, g6e_4xlarge_task, g6e_8xlarge_task, g6e_12xlarge_task, g6e_16xlarge_task, g6e_24xlarge_task, g6e_48xlarge_task

Partial Flyte tasks configured for specific L40s GPU pod configurations. Parameters:
  • None
Returns:
  • A partial Flyte task for each respective instance type and resources
g6e_xlarge_task = functools.partial(
    task,
    task_config=_get_l40s_pod("g6e-xlarge", cpu=4, memory_gib=32, gpus=1)
)
Example:
from latch.resources.tasks import g6e_xlarge_task

@g6e_xlarge_task
def my_task(a: int) -> str:
  ...

v100_x1_task, v100_x4_task, v100_x8_task

Partial Flyte tasks configured for specific V100 GPU pod configurations. Parameters:
  • None
Returns:
  • A partial Flyte task for each respective instance type and resources
v100_x1_task = functools.partial(
    task,
    task_config=_get_v100_pod("v100-x1", cpu=4, memory_gib=48, gpus=1)
)
Example:
from latch.resources.tasks import v100_x1_task

@v100_x1_task
def my_task(a: int) -> str:
  ...

latch.resources.workflow

This module defines decorators and helpers to convert Python callables into Flyte PythonFunctionWorkflow objects. It includes internal utilities for metadata generation and docstring injection, a workflow decorator that supports usage with or without arguments, and a nextflow_workflow helper for Nextflow-style workflows.

Functions

workflow()

Decorator to expose a Python function as a Flyte PythonFunctionWorkflow. Can be used as @workflow without arguments or with a LatchMetadata argument.
def workflow(
    metadata: Union[LatchMetadata, Callable],
) -> Union[PythonFunctionWorkflow, Callable]:
Parameters:
  • metadata (Union[LatchMetadata, Callable]): Either a LatchMetadata instance or the function to decorate (when used without parentheses)
Returns:
  • Union[PythonFunctionWorkflow, Callable]: A PythonFunctionWorkflow if used directly or a decorator if used with arguments
Example:
from latch.resources.workflow import workflow

@workflow
def add(a: int, b: int) -> int:
    return a + b
from latch.resources.workflow import workflow
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.metadata import LatchAuthor, LatchMetadata, LatchParameter

metadata = LatchMetadata(
    display_name="Target Workflow",
    author=LatchAuthor(
        name="Your Name",
    ),
    parameters={
        "input_directory": LatchParameter(
            display_name="Input Directory",
            batch_table_column=True,  # Show this parameter in batched mode.
        ),
        "output_directory": LatchParameter(
            display_name="Output Directory",
            batch_table_column=True,  # Show this parameter in batched mode.
        ),
    },
)


@workflow(metadata)
def template_workflow(
    input_directory: LatchDir, output_directory: LatchOutputDir
) -> LatchOutputDir:
    return task(input_directory=input_directory, output_directory=output_directory)

nextflow_workflow()

Decorator to expose a Python function as a Nextflow-style workflow.
def nextflow_workflow(
    metadata: NextflowMetadata,
) -> Callable[[Callable], PythonFunctionWorkflow]:
Parameters:
  • metadata (NextflowMetadata): Metadata for the Nextflow-style workflow
Returns:
  • Callable[[Callable], PythonFunctionWorkflow]: A decorator compatible with workflow
Example:
from latch.resources.workflow import nextflow_workflow
from latch.types.metadata import NextflowMetadata, NextflowParameter
from latch.types.file import LatchFile
from latch.types.directory import LatchDir
from latch.types.metadata import LatchAuthor, NextflowRuntimeResources

generated_parameters = {
    "input": NextflowParameter(
        type=LatchFile,
        display_name="Input",
        description="Input file description",
    ),
    ...
}

nextflow_metadata = NextflowMetadata(
    display_name="nf-core/atacseq",
    author=LatchAuthor(
        name="nf-core",
    ),
    repository="https://github.com/latchbio-nfcore/atacseq",
    parameters=generated_parameters,
    runtime_resources=NextflowRuntimeResources(
        cpus=4,
        memory=8,
        storage_gib=100,
    ),
    log_dir=LatchDir("latch:///your_log_dir"),
)

@nextflow_workflow(nextflow_metadata)
def nf_nf_core_atacseq(...) -> LatchOutputDir:
    ...
    return LatchOutputDir("latch:///your_output_dir")

latch.resources.conditional

This module exposes a factory function to create a ConditionalSection for conditional execution in workflows. It delegates to conditional from flytekit.core.condition to create the ConditionalSection.

Functions

create_conditional_section()

Creates a new conditional section in a workflow, allowing a user to conditionally execute a task based on the value of a task result.
  • The conditional sections can be n-ary with as many elif clauses as desired.
  • Outputs from conditional nodes can be consumed, and outputs from other tasks can be passed to conditional nodes.
  • Boolean expressions in the condition use & (and) and | (or) operators.
  • Unary expressions are not allowed. If a task returns a boolean, use built-in truth checks like result.is_true() or result.is_false().
Parameters:
  • name (str): The name of the conditional section, to be shown in Latch Console
Returns:
  • ConditionalSection
def create_conditional_section(name: str) -> ConditionalSection:
    ...
Example:
from latch.resources.tasks import small_task
from latch import create_conditional_section

@small_task
def square(n: float) -> float:
    """
    **Parameters:**
        - `n` (float): Name of the parameter for the task is derived from the name of the input variable, and the type is automatically mapped to Types.Integer
    **Returns:**
        - `float`: The label for the output is automatically assigned and the type is deduced from the annotation
    """
    return n * n


@small_task
def double(n: float) -> float:
    """
    **Parameters:**
        - `n` (float): Name of the parameter for the task is derived from the name of the input variable and the type is mapped to `Types.Integer`
    **Returns:**
        - `float`: The label for the output is auto-assigned and the type is deduced from the annotation
    """
    return 2 * n

@workflow
def multiplier(my_input: float) -> float:
    result_1 = double(n=my_input)
    result_2 =  (
        create_conditional_section("fractions")
        .if_((result_1 < 0.0)).then(double(n=result_1))
        .elif_((result_1 > 0.0)).then(square(n=result_1))
        .else_().fail("Only nonzero values allowed")
    )
    result_3 = double(n=result_2)
    return result_3

latch.resources.map_tasks

map_tasks

A map task lets you run a pod task or a regular task over a list of inputs within a single workflow node. This means you can run thousands of instances of the task without creating a node for every instance, providing valuable performance gains. Some use cases of map tasks include:
  • Several inputs must run through the same code logic
  • Multiple data batches need to be processed in parallel
  • Hyperparameter optimization
Parameters:
  • task_function: The task to be mapped, to be shown in Latch Console
Returns:
  • A conditional section
Intended Use:
from latch.resources.tasks import small_task, map_task
from latch.resources.workflow import workflow

@small_task
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified

@small_task
def coalesce(b: typing.List[str]) -> str:
    coalesced = "".join(b)
    return coalesced

@workflow
def my_map_workflow(a: typing.List[int]) -> str:
    mapped_out = map_task(a_mappable_task)(a=a)
    coalesced = coalesce(b=mapped_out)
    return coalesced

latch.resources.reference_workflow

This module defines workflow_reference to create a Flyte Launch Plan reference using the current workspace as the project and the domain set to 'development'. It imports reference_launch_plan from flytekit.core.launch_plan and current_workspace from latch.utils.

Functions

workflow_reference()

Returns a Flyte Launch Plan reference for the given name and version in the current workspace under domain 'development'. Parameters:
  • name (str): The name of the launch plan to reference
  • version (str): The version of the launch plan to reference
Returns:
  • The value returned by reference_launch_plan configured with:
    • project=current_workspace()
    • domain="development"
    • name=name
    • version=version
def workflow_reference(
    name: str,
    version: str,
):
    return reference_launch_plan(
        project=current_workspace(),
        domain="development",
        name=name,
        version=version,
    )

Examples

from reference_workflow import workflow_reference

lp = workflow_reference(name="my_workflow", version="v1")

latch.types.init

Latch types package initializer. This module re-exports several type definitions and a utility function from submodules under latch.types. The available exports are:
  • LatchDir
  • LatchOutputDir
  • LatchFile
  • LatchOutputFile
  • file_glob
  • DockerMetadata
  • Fork
  • ForkBranch
  • LatchAppearanceType
  • LatchAuthor
  • LatchMetadata
  • LatchParameter
  • LatchRule
  • Params
  • Section
  • Spoiler
  • Text

Functions

file_glob()

Constructs a list of LatchFiles from a glob pattern. Convenient utility for passing collections of files between tasks. See nextflow’s channels or snakemake’s wildcards for similar functionality in other orchestration tools. The remote location of each constructed LatchFile will be constructed by appending the file name returned by the pattern to the directory represented by the remote_directory.
def file_glob(
    pattern: str, 
    remote_directory: str, 
    target_dir: Optional[Path] = None
) -> List[LatchFile]:
Parameters:
  • pattern (str): A glob pattern to match a set of files, e.g. '*.py'. Will resolve paths with respect to the working directory of the caller.
  • remote_directory (str): A valid latch URL pointing to a directory, e.g. latch:///foo. This must be a directory and not a file.
  • target_dir (Optional[Path]): An optional Path object to define an alternate working directory for path resolution.
Returns:
  • List[LatchFile]: A list of instantiated LatchFile objects.
Example:
@small_task
def process_fastq_files():
    # Get all .fastq.gz files from the current directory
    # and map them to a remote directory
    fastq_files = file_glob("*.fastq.gz", "latch:///fastqc_outputs")
    
    for fastq_file in fastq_files:
        # Process each file
        process_file(fastq_file)
    
    return fastq_files

Classes

This module re-exports the following classes from their respective submodules. For detailed documentation, see the individual module sections:

latch.types.file

Latch types for file handling within Flyte tasks. This module defines a LatchFile class to represent a file object with both a local path and an optional remote path, a type alias for an output file, and a transformer that converts between LatchFile instances and Flyte Literals. Notes:
  • LatchOutputFile is a type alias for LatchFile annotated as an output in Flyte. It is defined as: Annotated[LatchFile, FlyteAnnotation({"output": True})]

Classes

Other notable methods and properties
  • size(self) -> int Returns the size of the remote data via LPath(self.remote_path).size().
  • local_path (property) -> str Local file path for the environment executing the task.
  • remote_path (property) -> Optional[str] Remote URL referencing the object (LatchData or S3).
Code example (basic usage)
# Basic usage: create a LatchFile with a local path
lf = LatchFile("./my_file.txt")

# With a remote path
lf_remote = LatchFile("./my_file.txt", "latch:///remote_path.txt")

latch.types.directory

Module for directory handling in Latch workflows and standalone Python environments. Provides LatchDir for working with directories that can be stored locally or remotely (on Latch Data or S3).

Classes

LatchDir

Represents a directory with both local and remote path management. Constructor:
def __init__(
    self,
    path: Union[str, PathLike],
    remote_path: Optional[PathLike] = None,
    **kwargs,
) -> None:
Parameters:
  • path (Union[str, PathLike]): The local path to the directory
  • remote_path (Optional[PathLike]): The remote path (latch:// or s3:// URL) where the directory is stored
Properties:
  • local_path (str): Local directory path for the environment executing the task
  • remote_path (Optional[str]): Remote URL referencing the directory (LatchData or S3)
Methods:
  • iterdir() -> List[Union[LatchFile, LatchDir]]: Returns a list of the directory’s children (files and subdirectories)
  • size_recursive() -> int: Returns the total size of the directory and all its contents recursively
Usage Examples:
from latch.types.directory import LatchDir
from pathlib import Path

# Create a local directory
local_dir = LatchDir("./my_directory")

# Create a directory with remote path
remote_dir = LatchDir("./my_directory", "latch:///remote_directory")

# In a workflow task
@small_task
def process_directory(input_dir: LatchDir) -> LatchDir:
    # Access local path
    local_path = Path(input_dir.local_path)
    
    # List directory contents
    for item in input_dir.iterdir():
        if isinstance(item, LatchFile):
            print(f"Found file: {item}")
        elif isinstance(item, LatchDir):
            print(f"Found subdirectory: {item}")
    
    # Get directory size
    total_size = input_dir.size_recursive()
    print(f"Directory size: {total_size} bytes")
    
    # Create output directory
    return LatchDir("./output", "latch:///output_directory")

# In a Jupyter notebook or standalone script
def explore_remote_directory():
    # Create a LatchDir pointing to a remote directory
    remote_dir = LatchDir("latch:///my_data")
    
    # List contents of the remote directory
    for item in remote_dir.iterdir():
        print(f"Item: {item}")
        if hasattr(item, 'remote_path'):
            print(f"  Remote path: {item.remote_path}")
    
    # Get directory size
    total_size = remote_dir.size_recursive()
    print(f"Total directory size: {total_size} bytes")

LatchOutputDir

A LatchDir tagged as the output of some workflow. Definition:
LatchOutputDir = Annotated[LatchDir, FlyteAnnotation({"output": True})]
Purpose: The Latch Console uses this metadata to avoid checking for existence of the directory at its remote path and displaying an error. This check is normally made to avoid launching workflows with LatchDirs that point to objects that don’t exist. Usage:
from latch.types.directory import LatchDir, LatchOutputDir

@small_task
def create_output() -> LatchOutputDir:
    return LatchDir("./results", "latch:///my_workflow_output")

latch.types.metadata

Module for defining workflow metadata, parameter configurations, and UI flow elements. It provides the building blocks for creating rich, interactive workflow interfaces in the Latch Console, including parameter validation, custom UI layouts, and integration with Snakemake/Nextflow workflows.

Functions

default_samplesheet_constructor(samples: List[DC], t: DC, delim: str = ",") -> Path

Creates a CSV samplesheet from a list of dataclass instances. Parameters:
  • samples (List[DC]): List of dataclass instances to convert to CSV
  • t (DC): The dataclass type to use for column headers
  • delim (str): CSV delimiter, defaults to ”,”
Returns:
  • Path: Path to the created samplesheet.csv file
Example:
from dataclasses import dataclass
from latch.types.metadata import default_samplesheet_constructor

@dataclass
class SampleData:
    sample_id: str
    condition: str
    replicate: int

# Create sample data
samples = [
    SampleData("sample1", "control", 1),
    SampleData("sample2", "treatment", 1),
    SampleData("sample3", "control", 2),
]

# Generate samplesheet
csv_path = default_samplesheet_constructor(samples, t=SampleData)
print(f"Created samplesheet at: {csv_path}")

Classes

LatchRule

Defines validation rules for parameter inputs using regular expressions. Attributes:
  • regex (str): Regular expression pattern that inputs must match
  • message (str): Error message displayed when validation fails
Example:
from latch.types.metadata import LatchRule

# Validate email format
email_rule = LatchRule(
    regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
    message="Please enter a valid email address"
)

# Validate file extension
fastq_rule = LatchRule(
    regex=r"\.(fastq|fq)(\.gz)?$",
    message="File must be a FASTQ file (.fastq, .fq, .fastq.gz, or .fq.gz)"
)

LatchAppearanceEnum

Controls how text input fields are rendered in the UI. Values:
  • line: Single-line text input
  • paragraph: Multi-line text area
Example:
from latch.types.metadata import LatchAppearanceEnum

# Single line input for short text
short_input = LatchAppearanceEnum.line

# Multi-line input for longer text
long_input = LatchAppearanceEnum.paragraph

MultiselectOption

Represents a single option in a multiselect widget. Attributes:
  • name (str): Display name shown in the UI
  • value (object): Value associated with this option

Multiselect

Creates a multiselect input widget with predefined options. Attributes:
  • options (List[MultiselectOption]): List of available options
  • allow_custom (bool): Whether users can enter custom values
Example:
from latch.types.metadata import Multiselect, MultiselectOption

# Create multiselect for organism selection
organism_multiselect = Multiselect(
    options=[
        MultiselectOption("Human", "homo_sapiens"),
        MultiselectOption("Mouse", "mus_musculus"),
        MultiselectOption("Drosophila", "drosophila_melanogaster"),
    ],
    allow_custom=True  # Allow users to enter custom organism names
)

LatchAuthor

Contains metadata about the workflow author. Attributes:
  • name (Optional[str]): Author’s name
  • email (Optional[str]): Author’s email address
  • github (Optional[str]): Link to author’s GitHub profile
Example:
from latch.types.metadata import LatchAuthor

author = LatchAuthor(
    name="Dr. Jane Smith",
    email="jane.smith@university.edu",
    github="https://github.com/janesmith"
)

UI Flow Elements

These classes define the layout and organization of workflow parameters in the Latch Console UI.

FlowBase

Base class for all UI flow elements. This is a frozen dataclass that serves as the foundation for organizing workflow interfaces.

Section

Creates a card with a title containing child flow elements. Constructor:
Section(section: str, *flow: FlowBase)
Parameters:
  • section (str): Title displayed on the section card
  • *flow (FlowBase): Variable number of flow elements to display in the section

Text

Displays markdown-formatted text in the UI. Attributes:
  • text (str): Markdown content to display

Title

Displays a markdown title in the UI. Attributes:
  • title (str): Markdown title text

Params

Displays parameter input widgets for specified parameters. Constructor:
Params(*args: str)
Parameters:
  • *args (str): Names of parameters to display

Spoiler

Creates a collapsible section with a title and child flow elements. Constructor:
Spoiler(spoiler: str, *flow: FlowBase)
Parameters:
  • spoiler (str): Title of the collapsible section
  • *flow (FlowBase): Flow elements to display when expanded
Example:
from latch.types.metadata import Section, Text, Params, Spoiler

# Create a workflow UI flow
flow = [
    Text("## RNA-seq Analysis Workflow"),
    Text("This workflow performs differential gene expression analysis."),
    
    Section("Input Parameters", 
        Params("input_files", "reference_genome"),
        Text("Select your input FASTQ files and reference genome.")
    ),
    
    Section("Analysis Options",
        Params("min_reads", "p_value_threshold"),
        Spoiler("Advanced Options",
            Params("threads", "memory_limit"),
            Text("Configure advanced computational parameters.")
        )
    )
]

ForkBranch

Defines a single branch within a Fork element. Constructor:
ForkBranch(display_name: str, *flow: FlowBase)
Parameters:
  • display_name (str): Text displayed on the branch button
  • *flow (FlowBase): Flow elements to display when this branch is active

Fork

Creates a conditional UI flow where users can select between mutually exclusive options. Constructor:
Fork(fork: str, display_name: str, **flows: ForkBranch)
Parameters:
  • fork (str): Name of the string parameter that stores the selected branch key
  • display_name (str): Title shown above the fork selector
  • **flows (ForkBranch): Named branches, where keys become the parameter values
Example:
from latch.types.metadata import Fork, ForkBranch, Params, Text

# Create conditional analysis options
analysis_fork = Fork(
    fork="analysis_type",
    display_name="Analysis Type",
    differential=ForkBranch(
        "Differential Expression",
        Params("p_value_threshold", "fold_change_threshold"),
        Text("Configure parameters for differential expression analysis.")
    ),
    pathway=ForkBranch(
        "Pathway Analysis", 
        Params("pathway_database", "enrichment_method"),
        Text("Configure parameters for pathway enrichment analysis.")
    )
)

Core Metadata Classes

LatchParameter

Defines metadata and behavior for workflow parameters in the Latch Console UI. Key Attributes:
  • display_name (Optional[str]): Human-readable name for the parameter
  • description (Optional[str]): Help text describing the parameter
  • hidden (bool): Whether to hide the parameter by default
  • placeholder (Optional[str]): Placeholder text in input fields
  • output (bool): Whether this parameter represents a workflow output
  • rules (List[LatchRule]): Validation rules for the parameter
  • appearance_type (LatchAppearance): How to render the input (line/paragraph/multiselect)
Samplesheet Integration:
  • samplesheet (Optional[bool]): Enable samplesheet input UI
  • allowed_tables (Optional[List[int]]): Registry table IDs allowed for samplesheet
Example:
from latch.types.metadata import LatchParameter, LatchRule, LatchAppearanceEnum

# Basic parameter
input_file_param = LatchParameter(
    display_name="Input FASTQ Files",
    description="Select your input FASTQ files for analysis",
    placeholder="Choose files...",
    rules=[
        LatchRule(
            regex=r"\.(fastq|fq)(\.gz)?$",
            message="Please select FASTQ files"
        )
    ]
)

# Samplesheet parameter
sample_param = LatchParameter(
    display_name="Sample Information",
    description="Upload a samplesheet with sample metadata",
    samplesheet=True,
    allowed_tables=[123, 456]  # Specific registry table IDs
)

# Text parameter with validation
email_param = LatchParameter(
    display_name="Email Address",
    description="Your email for notifications",
    appearance_type=LatchAppearanceEnum.line,
    rules=[
        LatchRule(
            regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
            message="Please enter a valid email address"
        )
    ]
)

LatchMetadata

The main class for defining workflow metadata and UI configuration. Core Attributes:
  • display_name (str): Human-readable workflow name
  • author (LatchAuthor): Workflow author information
  • documentation (Optional[str]): Link to workflow documentation
  • repository (Optional[str]): Link to source code repository
  • license (str): SPDX license identifier
  • parameters (Dict[str, LatchParameter]): Parameter definitions
  • flow (List[FlowBase]): UI layout configuration
Additional Metadata:
  • tags (List[str]): Categorization tags
  • wiki_url (Optional[str]): Link to wiki documentation
  • video_tutorial (Optional[str]): Link to tutorial video
  • about_page_path (Optional[Path]): Path to markdown about page
Example:
from latch.types.metadata import (
    LatchMetadata, LatchAuthor, LatchParameter, 
    Section, Text, Params
)

# Define workflow metadata
metadata = LatchMetadata(
    display_name="RNA-seq Differential Expression Analysis",
    author=LatchAuthor(
        name="Dr. Jane Smith",
        email="jane.smith@university.edu",
        github="https://github.com/janesmith"
    ),
    documentation="https://docs.example.com/rnaseq-workflow",
    repository="https://github.com/janesmith/rnaseq-workflow",
    license="MIT",
    tags=["RNA-seq", "differential-expression", "bioinformatics"],
    
    # Define parameters
    parameters={
        "input_files": LatchParameter(
            display_name="Input FASTQ Files",
            description="Select your input FASTQ files"
        ),
        "reference_genome": LatchParameter(
            display_name="Reference Genome",
            description="Choose a reference genome"
        ),
        "p_value_threshold": LatchParameter(
            display_name="P-value Threshold",
            description="Statistical significance threshold",
            placeholder="0.05"
        )
    },
    
    # Define UI layout
    flow=[
        Text("## RNA-seq Analysis Workflow"),
        Text("This workflow performs differential gene expression analysis using DESeq2."),
        
        Section("Input Data",
            Params("input_files", "reference_genome"),
            Text("Select your input files and reference genome.")
        ),
        
        Section("Analysis Parameters",
            Params("p_value_threshold"),
            Text("Configure statistical parameters for the analysis.")
        )
    ]
)

Integration Metadata Classes

DockerMetadata

Configuration for private Docker repositories. Attributes:
  • username (str): Docker registry username
  • secret_name (str): Name of the secret containing the password

SnakemakeMetadata(LatchMetadata)

Extended metadata class for Snakemake workflows with additional Snakemake-specific configuration. Additional Attributes:
  • output_dir (Optional[LatchDir]): Directory for Snakemake outputs
  • docker_metadata (Optional[DockerMetadata]): Docker registry credentials
  • cores (int): Number of cores for Snakemake execution
  • parameters (Dict[str, SnakemakeParameter]): Snakemake-specific parameter metadata

NextflowMetadata(LatchMetadata)

Extended metadata class for Nextflow workflows with additional Nextflow-specific configuration. Additional Attributes:
  • runtime_resources (NextflowRuntimeResources): Computational resources
  • execution_profiles (List[str]): Nextflow execution profiles
  • log_dir (Optional[LatchDir]): Directory for Nextflow logs
  • parameters (Dict[str, NextflowParameter]): Nextflow-specific parameter metadata

Helper Classes

SnakemakeParameter(Generic[T], LatchParameter)

Snakemake-specific parameter with type information and default values.

NextflowParameter(Generic[T], LatchParameter)

Nextflow-specific parameter with samplesheet integration and results path configuration.

NextflowRuntimeResources

Defines computational resources for Nextflow tasks. Attributes:
  • cpus (Optional[int]): Number of CPUs
  • memory (Optional[int]): Memory in GiB
  • storage_gib (Optional[int]): Storage in GiB
  • storage_expiration_hours (int): Workdir retention time

latch.types.glob

Module for creating collections of LatchFile objects from glob patterns. This is useful for batch processing files and passing file collections between workflow tasks.

Functions

file_glob()

Creates a list of LatchFile objects by matching files with a glob pattern and mapping them to a remote directory. Signature:
def file_glob(
    pattern: str, 
    remote_directory: str, 
    target_dir: Optional[Path] = None
) -> List[LatchFile]:
Parameters:
  • pattern (str): Glob pattern to match files (e.g., "*.fastq.gz", "data/*.csv")
  • remote_directory (str): Latch URL pointing to a directory (e.g., "latch:///my_data")
  • target_dir (Optional[Path]): Alternative working directory for pattern resolution
Returns:
  • List[LatchFile]: List of LatchFile objects with local paths matching the pattern and remote paths in the specified directory
Examples:
from latch.types.glob import file_glob
from pathlib import Path

# Basic usage - match all FASTQ files in current directory
fastq_files = file_glob("*.fastq.gz", "latch:///rnaseq_inputs")

# Match files in subdirectories
all_csv_files = file_glob("data/**/*.csv", "latch:///processed_data")

# Use custom working directory
custom_files = file_glob(
    "*.bam", 
    "latch:///alignment_outputs",
    target_dir=Path("/path/to/working/dir")
)

# Process multiple file types
@small_task
def process_sequencing_files():
    # Get all sequencing files
    fastq_files = file_glob("*.fastq.gz", "latch:///raw_data")
    bam_files = file_glob("*.bam", "latch:///alignments")
    
    # Process each file
    for file in fastq_files + bam_files:
        process_file(file)
    
    return fastq_files + bam_files
Notes:
  • The remote_directory must be a valid Latch URL pointing to a directory (not a file)
  • If the remote directory URL is invalid, an empty list is returned
  • Each matched file gets a remote path constructed by appending the filename to the remote directory
  • This is particularly useful for workflows that need to process multiple files of the same type

latch.functions.messages

Module for displaying messages to users during workflow execution. These messages appear prominently in the Latch Console and help communicate task status, warnings, and errors to users.

Functions

message(typ: str, data: Dict[str, Any]) -> None

Displays a message in the Latch Console during task execution. Messages are shown on the task execution page and help users understand what’s happening or if there are issues. Parameters:
  • typ (str): Message type determining display style. Options:
    • "info": Informational messages (blue styling)
    • "warning": Warning messages (yellow/orange styling)
    • "error": Error messages (red styling)
  • data (Dict[str, Any]): Message content with required keys:
    • "title" (str): Brief message title
    • "body" (str): Detailed message content
Returns:
  • None
Raises:
  • RuntimeError: If message processing fails
Examples:
from latch.functions.messages import message

@small_task
def process_samples(input_file: LatchFile) -> str:
    # Info message - task progress
    message("info", {
        "title": "Processing Started",
        "body": f"Analyzing {input_file} with 1000 samples"
    })
    
    try:
        # Process the file
        result = analyze_file(input_file)
        
        # Success message
        message("info", {
            "title": "Analysis Complete",
            "body": f"Successfully processed {len(result)} samples"
        })
        
        return result
        
    except ValueError as e:
        # Error message with helpful guidance
        message("error", {
            "title": "Invalid File Format",
            "body": f"Expected CSV format with columns: sample_id, condition, replicate. Error: {str(e)}"
        })
        raise
        
    except FileNotFoundError:
        # Warning message
        message("warning", {
            "title": "File Not Found",
            "body": "Input file could not be located. Please check the file path and try again."
        })
        raise

@small_task
def validate_parameters(threshold: float) -> bool:
    if threshold < 0 or threshold > 1:
        message("error", {
            "title": "Invalid Parameter Value",
            "body": f"Threshold must be between 0 and 1, got {threshold}. Please adjust your input."
        })
        return False
    
    if threshold > 0.05:
        message("warning", {
            "title": "High Threshold Warning",
            "body": f"Threshold of {threshold} is quite high. Consider using a lower value (e.g., 0.05) for more sensitive results."
        })
    
    return True

latch.functions.operators

⚠️ DEPRECATED - This module is deprecated and may be removed in future versions. This module provides utilities for data manipulation operations inspired by Nextflow channel operators. It includes functions for dictionary joins, tuple grouping, filtering, and Cartesian products.

Functions

left_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]

Performs a left join on two dictionaries, keeping all keys from the left dictionary. Parameters:
  • left (Dict[str, Any]): Left dictionary (all keys preserved)
  • right (Dict[str, Any]): Right dictionary (matched keys only)
Returns:
  • Dict[str, Any]: Dictionary with all left keys, combined with right values where keys match
Example:
left = {"a": 1, "b": 2, "c": 3}
right = {"b": 20, "c": 30, "d": 40}
result = left_join(left, right)
# Result: {"a": 1, "b": [2, 20], "c": [3, 30]}

right_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]

Performs a right join on two dictionaries, keeping all keys from the right dictionary. Parameters:
  • left (Dict[str, Any]): Left dictionary (matched keys only)
  • right (Dict[str, Any]): Right dictionary (all keys preserved)
Returns:
  • Dict[str, Any]: Dictionary with all right keys, combined with left values where keys match
Example:
left = {"a": 1, "b": 2, "c": 3}
right = {"b": 20, "c": 30, "d": 40}
result = right_join(left, right)
# Result: {"b": [2, 20], "c": [3, 30], "d": 40}

inner_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]

Performs an inner join on two dictionaries, keeping only keys present in both dictionaries. Parameters:
  • left (Dict[str, Any]): Left dictionary
  • right (Dict[str, Any]): Right dictionary
Returns:
  • Dict[str, Any]: Dictionary containing only keys present in both inputs, with combined values
Example:
left = {"a": 1, "b": 2, "c": 3}
right = {"b": 20, "c": 30, "d": 40}
result = inner_join(left, right)
# Result: {"b": [2, 20], "c": [3, 30]}

outer_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]

Performs an outer join on two dictionaries, keeping all keys from both dictionaries. Parameters:
  • left (Dict[str, Any]): Left dictionary
  • right (Dict[str, Any]): Right dictionary
Returns:
  • Dict[str, Any]: Dictionary containing all keys from both inputs, with combined values where keys match
Example:
left = {"a": 1, "b": 2, "c": 3}
right = {"b": 20, "c": 30, "d": 40}
result = outer_join(left, right)
# Result: {"a": 1, "b": [2, 20], "c": [3, 30], "d": 40}

group_tuple(channel: List[Tuple], key_index: Optional[int] = None) -> List[Tuple]

Groups tuples by a specified key index, mimicking Nextflow’s groupTuple operator. Parameters:
  • channel (List[Tuple]): List of tuples to group
  • key_index (Optional[int]): Index to group by (defaults to 0)
Returns:
  • List[Tuple]: List of grouped tuples, one per distinct key
Example:
channel = [(1, 'A'), (1, 'B'), (2, 'C'), (3, 'B'), (1, 'C'), (2, 'A'), (3, 'D')]
result = group_tuple(channel)  # Group by first element (index 0)
# Result: [(1, ['A', 'B', 'C']), (2, ['C', 'A']), (3, ['B', 'D'])]

# Group by second element (index 1)
result2 = group_tuple(channel, key_index=1)
# Result: [('A', [1, 2]), ('B', [1, 3]), ('C', [2, 1]), ('D', [3])]

latch_filter(channel: List[Any], predicate: Union[Callable, re.Pattern, type, None]) -> List[Any]

Filters a list using a predicate function, regex pattern, or type check. Parameters:
  • channel (List[Any]): List to filter
  • predicate (Union[Callable, re.Pattern, type, None]): Filter criteria:
    • Callable: Function that returns True/False for each item
    • re.Pattern: Regex pattern to match against strings
    • type: Type to filter by (e.g., str, int)
    • None: Returns original list unchanged
Returns:
  • List[Any]: Filtered list based on the predicate
Examples:
import re

# Filter by function
numbers = [1, 2, 3, 4, 5, 6]
evens = latch_filter(numbers, lambda x: x % 2 == 0)
# Result: [2, 4, 6]

# Filter by regex pattern
strings = ["hello", "world", "test123", "abc"]
pattern = re.compile(r'\d')
with_numbers = latch_filter(strings, pattern)
# Result: ["test123"]

# Filter by type
mixed = [1, "hello", 2.5, "world", 3]
strings_only = latch_filter(mixed, str)
# Result: ["hello", "world"]

combine(channel_0: List[Any], channel_1: List[Any], by: Optional[int] = None) -> Union[List, Dict[str, List[Any]]]

Creates a Cartesian product of two lists, with optional grouping by a tuple index. Parameters:
  • channel_0 (List[Any]): First list to combine
  • channel_1 (List[Any]): Second list to combine
  • by (Optional[int]): If provided, group tuples by this index before combining
Returns:
  • Union[List, Dict[str, List[Any]]]:
    • If by is None: List of tuples representing the Cartesian product
    • If by is provided: Dictionary with grouped products
Examples:
# Basic Cartesian product
c0 = ['hello', 'ciao']
c1 = [1, 2, 3]
result = combine(c0, c1)
# Result: [('hello', 1), ('hello', 2), ('hello', 3), ('ciao', 1), ('ciao', 2), ('ciao', 3)]

# With grouping by tuple index
c0_tuples = [('A', 1), ('A', 2), ('B', 3)]
c1_tuples = [('A', 'x'), ('A', 'y'), ('B', 'z')]
result = combine(c0_tuples, c1_tuples, by=0)  # Group by first element
# Result: {
#   'A': [('A', 1, 'A', 'x'), ('A', 1, 'A', 'y'), ('A', 2, 'A', 'x'), ('A', 2, 'A', 'y')],
#   'B': [('B', 3, 'B', 'z')]
# }
Note: When using by, all elements in both lists must be tuples of the same length.

latch.functions.secrets

Module for securely retrieving secrets stored in Latch workspaces. Secrets are encrypted values that can be used to store sensitive information like API keys, database passwords, or authentication tokens.

Functions

get_secret(secret_name: str) -> str

Retrieves a secret value from the Latch workspace. Parameters:
  • secret_name (str): Name of the secret to retrieve
Returns:
  • str: The decrypted secret value
Examples:
from latch.functions.secrets import get_secret

@small_task
def process_with_api_key() -> str:
    # Retrieve API key from secrets
    api_key = get_secret("my-api-key")
    
    # Use the API key for authentication
    response = make_api_call(api_key)
    return response

latch.ldata

Module for working with Latch Data (LData) - Latch’s cloud storage system. This module provides the LPath class for interacting with files and directories stored in Latch Data, including operations like uploading, downloading, copying, and metadata retrieval.

Classes

LPath

Represents a remote file or directory path hosted on Latch Data. Provides a pathlib-like interface for working with cloud storage. Constructor:
LPath(path: str)
Parameters:
  • path (str): The Latch path, must start with “latch://”
Key Features:
  • Lazy Loading: Metadata is fetched on-demand to minimize network requests
  • Caching: Metadata is cached after first fetch for improved performance
  • Path Operations: Supports path joining with / operator
  • File/Directory Operations: Upload, download, copy, delete, and directory listing
  • Metadata Access: Get file size, content type, version ID, and node information
Properties and Methods: Metadata Methods:
  • fetch_metadata(): Force refresh of all cached metadata
  • node_id(load_if_missing=True): Get the unique node ID
  • name(load_if_missing=True): Get the file/directory name
  • type(load_if_missing=True): Get the node type (file, directory, etc.)
  • size(load_if_missing=True): Get file size in bytes
  • size_recursive(load_if_missing=True): Get total size including subdirectories
  • content_type(load_if_missing=True): Get MIME content type
  • version_id(load_if_missing=True): Get version identifier
  • is_dir(load_if_missing=True): Check if path is a directory
Directory Operations:
  • iterdir(): List contents of directory (non-recursive)
  • mkdirp(): Create directory and all parent directories
  • rmr(): Recursively delete files and directories
File Operations:
  • upload_from(src: Path, show_progress_bar=False): Upload local file/directory
  • download(dst=None, show_progress_bar=False, cache=False): Download to local path
  • copy_to(dst: LPath): Copy to another LPath
Path Operations:
  • __truediv__(other): Join paths using / operator
Examples:
from latch.ldata import LPath
from pathlib import Path

# Create LPath instances
data_dir = LPath("latch:///my_workspace/data")
input_file = LPath("latch:///my_workspace/inputs/sample.fastq")
output_dir = LPath("latch:///my_workspace/results")

# Check if paths exist and get metadata
if data_dir.is_dir():
    print(f"Directory size: {data_dir.size_recursive()} bytes")

# List directory contents
for item in data_dir.iterdir():
    print(f"{item.name()}: {item.type()}")

# Upload local files
local_file = Path("local_data.csv")
data_dir.upload_from(local_file)

# Download files
downloaded_path = input_file.download()
print(f"Downloaded to: {downloaded_path}")

# Copy between Latch paths
backup_file = LPath("latch:///my_workspace/backups/sample.fastq")
input_file.copy_to(backup_file)

# Create directories
new_dir = LPath("latch:///my_workspace/new_analysis")
new_dir.mkdirp()

# Path joining
results_file = output_dir / "analysis_results.txt"
print(f"Full path: {results_file.path}")

# Workflow integration
@small_task
def process_data(input_path: LPath) -> LPath:
    # Download input
    local_input = input_path.download()
    
    # Process the file
    result = analyze_file(local_input)
    
    # Upload result
    output_path = LPath("latch:///my_workspace/results/processed.txt")
    output_path.upload_from(Path(result))
    
    return output_path

# Batch operations
@small_task
def batch_upload(input_dir: Path) -> List[LPath]:
    uploaded_paths = []
    
    for file_path in input_dir.glob("*.csv"):
        # Create corresponding LPath
        latch_path = LPath(f"latch:///my_workspace/data/{file_path.name}")
        
        # Upload file
        latch_path.upload_from(file_path)
        uploaded_paths.append(latch_path)
    
    return uploaded_paths
Error Handling:
from latch.ldata import LPath, LatchPathError

try:
    # This will raise LatchPathError if path doesn't exist
    file_path = LPath("latch:///nonexistent/file.txt")
    size = file_path.size()
    
except LatchPathError as e:
    print(f"Path error: {e}")
    print(f"Remote path: {e.remote_path}")
    print(f"Account ID: {e.acc_id}")
Best Practices:
  • Use load_if_missing=False when you know metadata is already cached
  • Call fetch_metadata() to refresh stale cache when needed
  • Use cache=True in download() for repeated downloads of the same file
  • Handle LatchPathError for robust error handling
  • Use path joining with / operator for cleaner code
  • Check is_dir() before calling directory-specific methods

LatchPathError

Exception raised when LPath operations fail. Attributes:
  • message (str): Error description
  • remote_path (Optional[str]): The Latch path that caused the error
  • acc_id (Optional[str]): Account ID associated with the error

LDataNodeType

Enum representing different types of Latch Data nodes. Values:
  • account_root: Root directory of an account
  • dir: Regular directory
  • obj: File object
  • mount: Mounted storage
  • link: Symbolic link
  • mount_gcp: Google Cloud Platform mount
  • mount_azure: Azure mount
Example:
from latch.ldata import LPath, LDataNodeType

path = LPath("latch:///my_workspace/data")
node_type = path.type()

if node_type == LDataNodeType.dir:
    print("This is a directory")
elif node_type == LDataNodeType.obj:
    print("This is a file")