src/latch/resources/tasks.py
. It documents actual definitions and their behavior as implemented, using the provided docstrings where present and concise, factual descriptions otherwise.
Module description
Latch tasks are decorators to turn functions into workflow ‘nodes’.
Each task is containerized, versioned and registered with Flyte
_ when a
workflow is uploaded to Latch. Containerized tasks are then executed on
arbitrary instances as Kubernetes Pods
, scheduled using flytepropeller
.
The type of instance that the task executes on (eg. number of available
resources, presence of GPU) can be controlled by invoking one of the set of
exported decorators.
..
from latch import medium_task
@medium_task
def my_task(a: int) -> str:
…
.. _Kubernetes Pods:
https://kubernetes.io/docs/concepts/workloads/pods/
.. _flytepropeller:
https://github.com/flyteorg/flytepropeller
.. _Flyte:
https://docs.flyte.org/en/latest/
get_v100_x1_pod()
Pod
get_v100_x4_pod()
Pod
get_v100_x8_pod()
Pod
_get_large_gpu_pod()
Pod
_get_small_gpu_pod()
Pod
_get_large_pod()
Pod
_get_medium_pod()
Pod
_get_small_pod()
Pod
_get_l40s_pod(instance_type: str, cpu: int, memory_gib: int, gpus: int) -> Pod
instance_type: str
– taint key/value for the podcpu: int
– CPUs requestedmemory_gib: int
– memory in GiBgpus: int
– number of GPUsPod
custom_memory_optimized_task(cpu: int, memory: int)
cpu: int
– number of CPU cores to requestmemory: int
– memory in GiB to requestcustom_task(cpu: Union[Callable, int], memory: Union[Callable, int], *, storage_gib: Union[Callable, int] = 500, timeout: Union[datetime.timedelta, int] = 0, **kwargs)
cpu
, memory
, or storage_gib
are callables, returns a dynamic task configuration using DynamicTaskConfig
with a small pod config; otherwise, constructs a static _custom_task_config
Pod.
Parameters:
cpu: Union[Callable, int]
– CPU cores to request (integer or callable for dynamic)memory: Union[Callable, int]
– memory in GiB to request (integer or callable)storage_gib: Union[Callable, int]
(default 500) – storage in GiB (integer or callable)timeout: Union[datetime.timedelta, int]
(default 0) – timeout for the task**kwargs
– additional keyword argumentslustre_setup_task()
nextflow_runtime_task(cpu: int, memory: int, storage_gib: int = 50)
/nf-workdir
.
Parameters:
cpu: int
– CPU coresmemory: int
– memory in GiBstorage_gib: int
– storage in GiB (default 50)_get_l40s_pod(instance_type: str, cpu: int, memory_gib: int, gpus: int) -> Pod
instance_type: str
– taint key/value for the podcpu: int
– CPUs requestedmemory_gib: int
– memory in GiBgpus: int
– number of GPUsPod
g6e_xlarge_task
, g6e_2xlarge_task
, g6e_4xlarge_task
, g6e_8xlarge_task
, g6e_12xlarge_task
, g6e_16xlarge_task
, g6e_24xlarge_task
, g6e_48xlarge_task
workflow
decorator that supports usage with or without arguments, and a nextflow_workflow
helper for Nextflow-style workflows.
_generate_metadata()
f: Callable
— Function to generate metadata for.LatchMetadata
— Generated metadata for the function._inject_metadata()
f: Callable
— Function to mutate the docstring of.metadata: LatchMetadata
— Metadata to inject.workflow()
@workflow
without arguments or with a LatchMetadata
argument.
metadata: Union[LatchMetadata, Callable]
— Either a LatchMetadata
instance or the function to decorate (when used without parentheses).Union[PythonFunctionWorkflow, Callable]
— A PythonFunctionWorkflow
if used directly or a decorator if used with arguments.nextflow_workflow()
unpack_records
in NextflowMetadata
and delegates to workflow
.
metadata: NextflowMetadata
— Metadata for the Nextflow-style workflow.Callable[[Callable], PythonFunctionWorkflow]
— A decorator compatible with workflow
.ConditionalSection
for conditional execution in workflows. It delegates to conditional
from flytekit.core.condition
to create the ConditionalSection
.
create_conditional_section()
elif
clauses as desired.&
(and) and |
(or) operators.result.is_true()
or result.is_false()
.name
(str): The name of the conditional section, to be shown in Latch ConsoleConditionalSection
task_function
: The task to be mapped, to be shown in Latch Consolemap_task
from flytekit.core.map_task
.
workflow_reference
to create a Flyte Launch Plan reference using the current workspace as the project and the domain set to 'development'
. It imports reference_launch_plan
from flytekit.core.launch_plan
and current_workspace
from latch.utils
.
workflow_reference()
name
and version
in the current workspace under domain 'development'
.
name
(str): The name of the launch plan to reference.version
(str): The version of the launch plan to reference.reference_launch_plan
configured with:
project=current_workspace()
domain="development"
name=name
version=version
latch.types
. The available exports are:
LatchDir
LatchOutputDir
LatchFile
LatchOutputFile
file_glob
DockerMetadata
Fork
ForkBranch
LatchAppearanceType
LatchAuthor
LatchMetadata
LatchParameter
LatchRule
Params
Section
Spoiler
Text
file_glob()
latch.types.glob
. No docstring is present in this module for this entry; see the original definition for details.
LatchDir
LatchOutputDir
LatchFile
LatchOutputFile
DockerMetadata
Fork
ForkBranch
LatchAppearanceType
LatchAuthor
LatchMetadata
LatchParameter
LatchRule
Params
Section
Spoiler
Text
Annotated[LatchFile, FlyteAnnotation({"output": True})]
LatchFile
LatchFile
inherits implementation of
__fsopen__
from FlyteFile
, so methods like open
can retrieve a string
representation of self.
..
@task
def task(file: LatchFile):
with open(file, “r”) as f:
print(f.read())
mypath = Path(file).resolve()
The remote path identifies a remote location. The remote location, either a
latch or s3 url, can be inspected from an object passed to the task to
reveal its remote source.
It can also to deposit the file to a latch path when the object is returned
from a task.
..
@task
def task(file: LatchFile):
path = file.remote_path # inspect remote location
size(self) -> int
Returns the size of the remote data via LPath(self.remote_path).size()
.
_idempotent_set_path(self, hint: Optional[str] = None) -> None
Sets the local path using the Flyte context’s file access if not already set.
_create_imposters(self) -> None
Creates local filesystem placeholders (parent directories and empty file) at the local path.
local_path
(property) -> str
Local file path for the environment executing the task.
remote_path
(property) -> Optional[str]
Remote URL referencing the object (LatchData or S3).
__repr__(self) -> str
String representation, differing based on whether remote_path
is set.
__str__(self) -> str
Simple string representation of the LatchFile.
LatchFilePathTransformer
LatchFile
instances and Flyte Literal values for file paths.
Constructor
to_literal(...) -> Literal
Convert a LatchFile
to a Flyte Literal
containing a Blob with the file URI.to_python_value(...) -> LatchFile
Convert a Flyte Literal
back to a LatchFile
. Behavior depends on the expected Python type:
expected_python_type
is PathLike
, raise TypeError
.expected_python_type
is not a subclass of LatchFile
, raise TypeError
.LatchFile
-like instance initialized with the URI.LatchFile
with a local path and a downloader that fetches the data.ctx.file_access
to handle remote/local data paths and downloads.LatchDir
, a FlyteDirectory-based representation of a directory with local and remote pathsLatchOutputDir
as a tagged output typeLatchDirPathTransformer
to convert between Python LatchDir objects and Flyte literalsLatchDir
__init__(self, path: Union[str, PathLike], remote_path: Optional[PathLike] = None, **kwargs)
_idempotent_set_path(self)
iterdir(self) -> list[Union[LatchFile, "LatchDir"]]
remote_path
is not set, it enumerates the local directory contents and returns corresponding LatchDir
or LatchFile
objects. If remote_path
is set, it queries remote metadata (via GraphQL) to construct child objects.
size_recursive(self)
LPath(self.remote_path).size_recursive()
.
_create_imposters(self)
local_path
(property)remote_path
(property)__repr__(self)
LatchDir
, showing the local path or the remote path depending on whether a remote path is set.
__str__(self)
format_path
and normalize_path
and integrates with LatchFile
for non-directory children.LatchOutputDir
is an alias for a LatchDir
annotated as an output in Flyte.LatchDirPathTransformer
provides conversion between LatchDir
instances and Flyte literals.LatchOutputDir
LatchDir
as the output of a Flyte workflow.
LatchDir
as an output so the console can optimize existence checks for remote paths when needed.LatchDirPathTransformer
LatchDir
instances and Flyte literals, enabling serialization/deserialization within Flyte.
to_literal(self, ctx: FlyteContext, python_val: LatchDir, python_type: type[LatchDir], expected: LiteralType)
LatchDir
to a Flyte Literal
representing its remote path as a multipart blob.
to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Union[type[LatchDir], PathLike]) -> FlyteDirectory
Literal
back to a Python LatchDir
(or a subclass of LatchDir
). If the expected type is a PathLike
, a TypeError
is raised since casting to PathLike
is not supported. If the URI is not remote, the local URI is returned as the path. If remote, a LatchDir
is created with a downloader to fetch data on demand.
TypeEngine
to enable automatic handling of LatchDir
types in Flyte workflows._samplesheet_repr(v: Any) -> str
None
, returns an empty string; if it is a LatchFile
or LatchDir
, returns its remote_path
; if it is an Enum
, returns its value
; otherwise returns str(v)
.default_samplesheet_constructor(samples: List[DC], t: DC, delim: str = ",") -> Path
samplesheet.csv
using the field names of the dataclass type t
as headers. Values are converted using _samplesheet_repr
. Returns the path to the created file.LatchRule
regex: str
— A string regular expression which inputs must matchmessage: str
— The message to render when an input does not match the regexdict
(property) — Returns a dictionary representation of the instance__post_init__
— Validates that regex
is a valid regular expressionLatchAppearanceEnum
line
— Value: “line”paragraph
— Value: “paragraph”MultiselectOption
name: str
— Display name of the optionvalue: object
— Associated value for the optionMultiselect
options: List[MultiselectOption]
— Available optionsallow_custom: bool
— Whether custom entries are allowedLatchAuthor
name: Optional[str]
— The name of the authoremail: Optional[str]
— The email of the authorgithub: Optional[str]
— A link to the GitHub profile of the authorFlowBase
Section
, Text
, Params
, etc.Section
section: str
— Title of the sectionflow: List[FlowBase]
— Flow displayed in the section card__init__(section: str, *flow: FlowBase)
— Initializes a section with a title and child flow elementsText
text: str
— Markdown body textTitle
title: str
— Markdown title textParams
params: List[str]
— Names of parameters whose widgets will be displayed__init__(*args: str)
— Initializes with a list of parameter namesSpoiler
spoiler: str
— Title of the spoilerflow: List[FlowBase]
— Flow displayed in the spoiler card__init__(spoiler: str, *flow: FlowBase)
— Initializes a spoiler with a title and child flowForkBranch
display_name: str
— String displayed in the fork’s multibuttonflow: List[FlowBase]
— Child flow displayed when the branch is active__init__(display_name: str, *flow: FlowBase)
— Initializes a fork branchFork
fork: str
— Name of a str
-typed parameter to store the active branch’s keydisplay_name: str
— Title shown above the fork selectorflows: Dict[str, ForkBranch]
— Mapping between branch keys to branch definitions__init__(fork: str, display_name: str, **flows: ForkBranch)
— Initializes fork with branchesLatchParameter
display_name: Optional[str]
— The name used to display the parameterdescription: Optional[str]
— The parameter’s descriptionhidden: bool
— Whether the parameter should be hidden by defaultsection_title: Optional[str]
— Section header for groupingplaceholder: Optional[str]
— Placeholder text for inputcomment: Optional[str]
— Comment about the parameteroutput: bool
— Whether the parameter is an outputbatch_table_column: bool
— Show in batch table in UIallow_dir: bool
— Allow directories in UIallow_file: bool
— Allow files in UIappearance_type: LatchAppearance
— How the parameter should be rendered (line/paragraph or multiselect)rules: List[LatchRule]
— Validation rules for inputsdetail: Optional[str]
— Additional detailsamplesheet: Optional[bool]
— Use samplesheet input UIsamplesheet
-related fields:
allowed_tables: Optional[List[int]]
— Registry Tables allowed for samplesheet_custom_ingestion: Optional[str]
— Custom ingestion hook__str__()
— YAML-like string representation including metadatadict
(property) — Returns a dictionary with an __metadata__
key containing metadata, including nested appearance details and rulesSnakemakeParameter(Generic[T], LatchParameter)
type: Optional[Type[T]]
— The python type of the parameterdefault: Optional[T]
— Default valueLatchParameter
SnakemakeFileParameter(SnakemakeParameter[Union[LatchFile, LatchDir]])
file_metadata
in SnakemakeMetadata
instead
type: Optional[Union[Type[LatchFile], Type[LatchDir]]]
— The python type of the parameterpath: Optional[Path]
— Destination path for fileconfig: bool
— Expose path in Snakemake configdownload: bool
— Download in JIT stepSnakemakeFileMetadata
path: Path
— Local path where the file will be copiedconfig: bool
— Expose in Snakemake configdownload: bool
— Download in JIT stepNextflowParameter(Generic[T], LatchParameter)
type: Optional[Type[T]]
— The python type of the parameterdefault: Optional[T]
— Default valuesamplesheet_type: Literal["csv", "tsv", None]
— Samplesheet type (CSV/TSV)samplesheet_constructor: Optional[Callable[[T], Path]]
— Custom samplesheet constructorresults_paths: Optional[List[Path]]
— Output sub-paths exposed in UI under “Results”__post_init__()
— Validates samplesheet constraints and requires a constructor if needed or raises errors via ClickDC
(TypeVar)default_samplesheet_constructor
with generic dataclass types_samplesheet_repr(v: Any) -> str
(see Functions)default_samplesheet_constructor
(see Functions)NextflowRuntimeResources
cpus: Optional[int]
— Number of CPUs required for the taskmemory: Optional[int]
— Memory required (GiB)storage_gib: Optional[int]
— Storage required (GiB)storage_expiration_hours: int
— Hours to retain workdir after failureLatchMetadata
display_name: str
— The human-readable name of the workflowauthor: LatchAuthor
— Metadata about the workflow authordocumentation: Optional[str]
— Link to workflow documentationrepository: Optional[str]
— Link to repository hosting the workflowlicense: str
— SPDX identifierparameters: Dict[str, LatchParameter]
— Parameter metadata mapwiki_url: Optional[str]
— Wiki URLvideo_tutorial: Optional[str]
— Video tutorial URLtags: List[str]
— Tags for categorizationflow: List[FlowBase]
— Flow elements describing UI flowno_standard_bulk_execution: bool
— Disable standard bulk execution_non_standard: Dict[str, object]
— Non-standard metadataabout_page_path: Optional[Path]
— Path to markdown about pagevalidate()
— Validates fields (e.g., about page path type)dict
(property) — Returns a dictionary with metadata; excludes parameters
__str__()
— YAML-like string with parameters expandedDockerMetadata
username: str
— Account usernamesecret_name: str
— Secret name containing the passwordEnvironmentConfig
use_conda: bool
— Use Snakemake conda directiveuse_container: bool
— Use Snakemake container directivecontainer_args: List[str]
— Additional container argumentsFileMetadata
(TypeAlias)Dict[str, Union[SnakemakeFileMetadata, "FileMetadata"]]
SnakemakeMetadata(LatchMetadata)
output_dir: Optional[LatchDir]
— Directory for Snakemake outputsname: Optional[str]
— Name of the workflowdocker_metadata: Optional[DockerMetadata]
— Docker credentialsenv_config: EnvironmentConfig
— Environment configurationparameters: Dict[str, SnakemakeParameter]
— Snakemake parameter metadatafile_metadata: FileMetadata
— File metadata mappingscores: int
— Number of cores (Snakemake)about_page_content: Optional[Path]
— Path to About page contentvalidate()
— Validates fields (e.g., about_page_content
type)dict
(property) — Returns a dictionary including metadata; omits about_page_content
__post_init__()
— Calls validate()
, assigns default workflow name if needed, and registers global metadataNextflowMetadata(LatchMetadata)
name: Optional[str]
— Name of the workflowparameters: Dict[str, NextflowParameter]
— Nextflow parameter metadataruntime_resources: NextflowRuntimeResources
— Runtime resourcesexecution_profiles: List[str]
— Execution config profileslog_dir: Optional[LatchDir]
— Directory to dump Nextflow logsupload_command_logs: bool
— Upload .command.* logs after task executiondict
(property) — Returns a dictionary including metadata, omitting about_page_path
__post_init__()
— Calls validate()
, assigns name if missing, and registers global metadata_snakemake_metadata: Optional[SnakemakeMetadata]
— Registry for Snakemake metadata_nextflow_metadata: Optional[NextflowMetadata]
— Registry for Nextflow metadata__post_init__
or validate()
methods; these validations are reflected in the class descriptions above where relevant.file_glob
, a utility to construct a list of LatchFile
objects from a glob pattern. It validates the provided remote_directory
URL using is_valid_url
and returns an empty list if the URL is not valid. The function resolves file paths relative to the caller’s working directory (or an optional target_dir
if provided) and constructs LatchFile
instances whose remote paths are formed by appending the matched file name to the remote_directory
.
file_glob()
LatchFile
objects from a glob pattern.
LatchFile
objects from a glob pattern. This utility helps pass collections of files between tasks. The remote location of each constructed LatchFile
is created by appending the file name matched by the pattern to the directory represented by remote_directory
.
pattern: str
'*.py'
. The pattern is resolved relative to the working directory of the caller unless a target_dir
is provided.remote_directory: str
'latch:///foo'
. This must be a directory and not a file.target_dir: Optional[Path]
Path
object to define an alternate working directory for path resolution.List[LatchFile]
LatchFile
objects.JsonArray
JsonObject
JsonValue
JsonArray
List["JsonValue"]
JsonObject
Dict[str, "JsonValue"]
JsonValue
Union[JsonObject, JsonArray, str, int, float, bool, None]
NUCLEUS_URL
(str): URL derived from environment variable LATCH_CLI_NUCLEUS_URL
or a default.ADD_MESSAGE_ENDPOINT
(str): Endpoint to post task execution messages.message(typ: str, data: Dict[str, Any]) -> None
: Post or print a task execution message.message(typ: str, data: Dict[str, Any])
typ: str
'info'
, 'warning'
, or 'error'
.data: Dict[str, Any]
{'title': ..., 'body': ...}
.RuntimeError
: If an internal error occurs while processing the message._combine(item1: Any, item2: Any)
item1: Any
item2: Any
List[Any]
A new list containing the combined items following the rules above.left_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]
left: Dict[str, Any]
right: Dict[str, Any]
Dict[str, Any]
Dictionary containing joined values or original left values when no match.right_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]
left: Dict[str, Any]
right: Dict[str, Any]
Dict[str, Any]
inner_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]
left: Dict[str, Any]
right: Dict[str, Any]
Dict[str, Any]
Dictionary containing values for keys present in both input dictionaries.outer_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]
left: Dict[str, Any]
right: Dict[str, Any]
Dict[str, Any]
Dictionary containing all keys from both inputs, combining where present in both.group_tuple(channel: List[Tuple], key_index: Optional[int] = None) -> List[Tuple]
groupTuple
construct from Nextflow:
The groupTuple
operator collects tuples (or lists) of values emitted
by the source channel grouping together the elements that share the same
key. Finally it emits a new tuple object for each distinct key collected.
Args
channel: List[Tuple]
A list of tuples to be grouped by key_indexkey_index: Optional[int]
Which index of the tuple to match against - if not provided,
defaults to 0List[Tuple]
List of grouped tuples, one per distinct key.latch_filter(channel: List[Any], predicate: Union[Callable, re.Pattern, type, None]) -> List[Any]
channel: List[Any]
predicate: Union[Callable, re.Pattern, type, None]
List[Any]
Filtered list according to the predicate, pattern, or type. If no predicate is provided or unrecognized, returns the original channel.combine(channel_0: List[Any], channel_1: List[Any], by: Optional[int] = None) -> Union[List, Dict[str, List[Any]]]
channel_0: List[Any]
If by is provided, all elements must be tuples of the same lengthchannel_1: List[Any]
If by is provided, all elements must be tuples of the same length
as channel_0by: Optional[int]
If provided, which index to group by first.Union[List, Dict[str, List[Any]]]
The Cartesian product as a list of tuples, or a dictionary of grouped products if by
is used.by
is provided, the function groups elements by the specified index before producing the product, returning a list of combined tuples per group.
get_secret()
latch develop
, the only secrets you will be able to access are the ones in your personal workspace. To use secrets from a shared workspace, register your workflow and run it on Latch.
Examples:
secret_name
: str — Name of the secret to retrieve.ValueError
— If the HTTP response status code is not 200; the error message is taken from the response JSON as {"error": ...}
.tables <latch.registry.table.Table>
).
:meth:~latch.account.Account.list_registry_projects
is the typical way to get a :class:Project
.
Key attributes:
load_if_missing
is True and the display name is not cached, the project is loaded first.load_if_missing
is True and the table list is not cached, the project is loaded first.reload_on_commit
is True, the project will be reloaded after commit.Account
and AccountUpdate
classes to fetch and mutate registry projects within a workspace. It uses GraphQL via the execute
function to fetch project data and perform mutations. Exposed exception: AccountNotFoundError
.
Account
current
is the typical way of getting an :class:Account
.
If the current request signer (CLI user or execution context)
lacks permissions to fetch some information, the corresponding operations
will act as if the information does not exist. Update operations will usually
produce errors.
id: str
— Unique identifier.latch
commands) this is the current setting of latch workspace
, which defaults to the user’s default workspace. Returns: Current account.
load_if_missing: bool
— If true, :meth:load
the project list if not in cache. If false, return None
if not in cache.AccountUpdate.clear
before closing the context manager.
AccountUpdate
display_name: str
— Display name of the new project.AccountNotFoundError
id
does not exist or permissions are insufficient to access it.