You can execute a workflow from python using python parameter values, defined identically to those in LaunchPlans, or using a previously registred LaunchPlan.

There are a few requirements for this feature:

  • The wf_name parameter must match the name in the .latch/workflow_name file in the workflow directory.
  • The wf_name and version must be registered to latch.

These further requirements only apply if you are using python parameter values:

  • The latch version in the workflow container must be greater than or equal to 2.62.0.
  • The python version in the workflow container should match the python version launching the execution. It may work without this requirement.
  • The types used by the workflow must be the same types as used by the workflow header (even if two types have the same structure, but are from different modules, parameter validation will fail). Thus, you should import from wf.entrypoint in the case of snakemake and nextflow workflows.

Below is an example of launching a workflow and polling for status.

import email.utils
import time
from datetime import datetime

from latch.types.file import LatchFile
from latch_cli.services.launch.launch_v2 import launch, launch_from_launch_plan
from latch_cli.tinyrequests import post
from latch_cli.utils import get_auth_header
from latch_sdk_config.latch import config

from wf.entrypoint import GenomeReference, Sample


# Using python parameter values
execution_id = launch(
    wf_name="my_workflow",
    version="0.1.0",
    params={
        "input": [
            Sample(
                patient="TestSample",
                fastq_1=LatchFile(
                    "latch:///test_L1_1.fastq.gz"
                ),
                ...
            ),
            ...
        ],
        "genome": GenomeReference.GATK_GRCh37,
        ...
    }
)

# Using a previously registered LaunchPlan
execution_id = launch_from_launch_plan(
    launch_plan_id="my_workflow",
    version="0.1.0",
    lp_name="Test Data",
)

while True:
    list_resp = post(
        url=config.api.execution.list,
        headers = {"Authorization": get_auth_header()},
        json={"ws_account_id": "XXXXX"},
    ).json()

    target_execution = list_resp[str(execution_id)]
    target_execution_status: str = target_execution.get('status')

    if target_execution_status == 'SUCCEEDED':
        break
    elif target_execution_status == 'FAILED':
        raise Exception("Execution failed")
    elif target_execution_status == 'ABORTED':
        raise Exception("Execution aborted")
    elif target_execution_status == 'UNDEFINED':
        time.sleep(5)
        continue

    start_time_str: str = target_execution.get('start_time')
    start_time_tuple = email.utils.parsedate_tz(start_time_str)
    if start_time_tuple is None:
        raise Exception("Failed to parse start time")

    start_time = datetime.fromtimestamp(email.utils.mktime_tz(start_time_tuple))
    current_time = datetime.now()

    if (current_time - start_time).total_seconds() > 300:
        raise TimeoutError("Execution has been running for more than 5 minutes")

    time.sleep(10)