You can execute a workflow from python using python parameter values, defined identically to those in LaunchPlans.
There are a few requirements for this feature:
- The
wf_name
parameter must match the name in the .latch/workflow_name
file in the workflow directory.
- The
wf_name
and version
must be registered to latch.
- The latch version in the workflow container must be greater than or equal to
2.62.0
.
- The python version in the workflow container should match the python version launching the execution. It may work without this requirement.
- The types used by the workflow must be the same types as used by the workflow header (even if two types have the same structure, but are from different modules, parameter validation will fail). Thus, you should import from
wf.entrypoint
in the case of snakemake and nextflow workflows.
Below is an example of launching a workflow and polling for status.
import email.utils
import time
from datetime import datetime
from latch.types.file import LatchFile
from latch_cli.services.launch.launch_v2 import launch
from latch_cli.tinyrequests import post
from latch_cli.utils import get_auth_header
from latch_sdk_config.latch import config
from wf.entrypoint import GenomeReference, Sample
execution_id = launch(
wf_name="my_workflow",
version="0.1.0",
params={
"input": [
Sample(
patient="TestSample",
fastq_1=LatchFile(
"latch:///test_L1_1.fastq.gz"
),
...
),
...
],
"genome": GenomeReference.GATK_GRCh37,
...
}
)
while True:
list_resp = post(
url=config.api.execution.list,
headers = {"Authorization": get_auth_header()},
json={"ws_account_id": "XXXXX"},
).json()
target_execution = list_resp[str(execution_id)]
target_execution_status: str = target_execution.get('status')
if target_execution_status == 'SUCCEEDED':
break
elif target_execution_status == 'FAILED':
raise Exception("Execution failed")
elif target_execution_status == 'ABORTED':
raise Exception("Execution aborted")
elif target_execution_status == 'UNDEFINED':
time.sleep(5)
continue
start_time_str: str = target_execution.get('start_time')
start_time_tuple = email.utils.parsedate_tz(start_time_str)
if start_time_tuple is None:
raise Exception("Failed to parse start time")
start_time = datetime.fromtimestamp(email.utils.mktime_tz(start_time_tuple))
current_time = datetime.now()
if (current_time - start_time).total_seconds() > 300:
raise TimeoutError("Execution has been running for more than 5 minutes")
time.sleep(10)