The Latch SDK provides a Python API to programmatically launch workflows and monitor their status. This is useful for integrating workflows into automated test suites (e.g., GitHub CI/CD).There are two ways to launch a workflow programmatically:
Call the workflow with Python parameter values.
Use a LaunchPlan from a previously registered workflow.
Please make sure you have registered your workflow to Latch using latch register. This creates a new workflow version, which you will reference in your script.
wf_name: Name of the workflow function. This is the function defined immediately after the @workflow decorator in your code (Example)
version: Workflow version registered on Latch
params: Dictionary of parameter names and values. If the python values are not typed using the exact same types used in the workflow function signature (including the module paths), the best_effort parameter should be set to True.
best_effort (default True): (available in latch >= 2.67.8) When set to True, allows for flexible conversion of params to workflow inputs. This enables launching workflows outside of the workflow environment with compatible values (e.g., strings for enums where the string matches an enum option, or dictionaries/dataclasses with all the fields of a particular dataclass).
Copy
Ask AI
import email.utils import time from datetime import datetime from latch_cli.services.launch.launch_v2 import launch_from_launch_plan from latch_cli.tinyrequests import post from latch_cli.utils import get_auth_header from latch_sdk_config.latch import config # Using a previously registered LaunchPlan execution_id = launch_from_launch_plan( wf_name="my_workflow", version="0.1.0", lp_name="Test Data", ) # Polling execution status while True: list_resp = post( url=config.api.execution.list, headers = {"Authorization": get_auth_header()}, json={"ws_account_id": "XXXXX"}, ).json() target_execution = list_resp[str(execution_id)] target_execution_status: str = target_execution.get('status') if target_execution_status == 'SUCCEEDED': break elif target_execution_status == 'FAILED': raise Exception("Execution failed") elif target_execution_status == 'ABORTED': raise Exception("Execution aborted") elif target_execution_status == 'UNDEFINED': time.sleep(5) continue start_time_str: str = target_execution.get('start_time') start_time_tuple = email.utils.parsedate_tz(start_time_str) if start_time_tuple is None: raise Exception("Failed to parse start time") start_time = datetime.fromtimestamp(email.utils.mktime_tz(start_time_tuple)) current_time = datetime.now() if (current_time - start_time).total_seconds() > 300: raise TimeoutError("Execution has been running for more than 5 minutes") time.sleep(10)
Parameters:
wf_name: Name of the workflow function. This is the function defined immediately after the @workflow decorator in your code (Example)
Important note on Python version: If the Python version in the workflow’s
Docker image does not match the Python version running the script that calls
launch or launch_from_launch_plan, the function may fail with a dill
unpickling error. In this case, it is recommended to use the same Python
version used during workflow registration.