Data Addition Trigger
We will walk through the process of creating an automation using the Data Addition
trigger type on Latch which will run a target workflow on all children of the target directory. We assume that you understand how to write and register Workflows on Latch.
Prerequisites:
- Target directory in Latch Data: this is the folder which is watched by the automation. The automation workflow will be triggered if a child is added to this folder.
Terms:
- Automation Workflow: workflow which will be called by automation. This is the workflow we create in steps 3-5 of this tutorial.
- Target Workflow: workflow which will be ran by automation workflow on child of the target directory. This workflow should contain the logic on how to process the files in child directories. This is the workflow we create in step 1 of this tutorial.
- Registry Table: we use a Registry Table in this tutorial to record child directories which are processed by the target workflow to avoid reprocessing same directories in consequent runs of automation. We create this table in step 2 of this tutorial.
1: Create the Target Workflow
This example requires another target workflow which will get executes on every child folder when automation workflow gets triggered. Below is a simple workflow example which reads every file in a child directory and prints out its Latch Path.
- Initialize a new workflow using
latch init test-workflow
. - Replace
__init__.py
andtask.py
with the following sample code.# __init__.py from wf.task import task from latch.resources.workflow import workflow from latch.types.directory import LatchDir, LatchOutputDir from latch.types.file import LatchFile from latch.types.metadata import LatchAuthor, LatchMetadata, LatchParameter metadata = LatchMetadata( display_name="Target Workflow", author=LatchAuthor( name="Your Name", ), parameters={ "input_directory": LatchParameter( display_name="Input Directory", batch_table_column=True, # Show this parameter in batched mode. ), "output_directory": LatchParameter( display_name="Output Directory", batch_table_column=True, # Show this parameter in batched mode. ), }, ) @workflow(metadata) def template_workflow( input_directory: LatchDir, output_directory: LatchOutputDir ) -> LatchOutputDir: return task(input_directory=input_directory, output_directory=output_directory)
# task.py import os from logging import Logger from urllib.parse import urljoin from latch import message from latch.resources.tasks import small_task from latch.types.directory import LatchDir, LatchFile, LatchOutputDir from latch.account import Account log = Logger("wf.task") @small_task def task(input_directory: LatchDir, output_directory: LatchOutputDir) -> LatchOutputDir: # iterate through all directories of the child input directories using iterdir() for file in input_directory.iterdir(): log.error(f"{file} {file.remote_path}") # note: `error` is used here since its the highest logging level return output_directory
- Register the sample target workflow with Latch using
latch register --remote --yes test-workflow
. - Record the ID of your workflow on the sidebar which we will use later in the example.
- Test the workflow by running it on Latch
- You will need to pass the parameters into your target workflow from your automation. To obtain the JSON representation of the workflow inputs, navigate to a previous execution of your workflow. Select Graph and Logs, click on square box around the first task, and select Inputs. Copy the workflow parameters inside the
literal
object, and pass it toparams
.
i.e.{ "literals": { # copy everything inside the brackets } }
2: Create a New Registry Table
In this example, we record all processed child directories in the Registry Table to not reprocess directories when automation workflow is runs again. This example requires you to create a new table with no existing columns. The automation workflow will add a column Processed Directory
with the directory name of processed children.
For many common use cases, Registry serves as the location to track workflow inputs and outputs, and hence we include an example of it here. However, having a registry table is not required, if you don’t want to use Registry as a mean to track your inputs and outputs.
To create a new table to be used with the automation:
- Go to Latch Registry.
- Select an existing project, and click
New Table
. - Record the Table ID on the sidebar which we will use later in the example.
3: Create the Automation Workflow
This is the workflow which will be run when automation gets triggered. To create the automation workflow, clone the Automation Workflow Template and navigate to the automation-wf/wf
directory.
$ git clone git@github.com:latchbio/automation-wf.git
Cloning into 'automation-wf'...
remote: Enumerating objects: 33, done.
remote: Counting objects: 100% (33/33), done.
remote: Compressing objects: 100% (24/24), done.
remote: Total 33 (delta 9), reused 28 (delta 6), pack-reused 0
Receiving objects: 100% (33/33), 8.52 KiB | 1.42 MiB/s, done.
Resolving deltas: 100% (9/9), done.
$ cd automation-wf/wf
File Tree:
├── Dockerfile
├── README.md
├── version
└── wf
├── __init__.py
├── automation.py
└── util.py
__init__.py
calls the automation task defined inautomation.py
.automation.py
contains the Python logic to determine how a workflow should be launched.util.py
contains the utility function which launches target workflow.
4. Configure the Target Workflow
To specify the target workflow and the registry table which you have just created, configure the following parameters in wf/__init__.py
and specify your name in workflow metadata:
output_directory
: The Latch Path to the output folder which this automation workflow will populate. i.e.latch://...
target_wf_id
: The ID of the target workflow that you have just created.params
: The parameters for your workflow. Refer to Create The Target Workflow to get the parameters.table_id
: The ID of the table which you created that stores metadata for this automation. Refer to Create A New Registry Table to create a table and get the ID.
Important: Currently, automations are only passing
input_directory
as the parameter to the automation workflow. If your workflow has different parameters automation will fail to start it.
In case you need more parameters to pass to your automation workflow, we suggest to hard-code them into the workflow while we are working on adding parameter support for automations.
# __init__.py
from latch.resources.workflow import workflow
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.metadata import LatchAuthor, LatchMetadata, LatchParameter
from wf.automation import automation_task
metadata = LatchMetadata(
# MODIFY NAMING METADATA BELOW
display_name="Automation Template",
author=LatchAuthor(
name="Your Name Here",
),
# MODIFY NAMING METADATA ABOVE
# IMPORTANT: these exact parameters are required for the workflow to work with automations
parameters={
"input_directory": LatchParameter(
display_name="Input Directory",
)
},
)
@workflow(metadata)
def automation_workflow(input_directory: LatchDir) -> None:
output_directory = LatchOutputDir(
path="latch://FIXME" # fixme: change to remote path of desired output directory
)
automation_task(
input_directory=input_directory,
output_directory=output_directory,
target_wf_id="FIXME", # fixme: change wf_id to the desired workflow id
table_id="FIXME", # fixme: change table_id to the desired registry table
)
(Optional) Change the parameters object in automation.py
from step 1.6 if your target workflow takes different parameters than input_directory
and output_directory
:
# automation.py
...
params = {
"input_directory": {
"scalar": {
"blob": {
"metadata": {"type": {"dimensionality": "MULTIPART"}},
"uri": input_directory.remote_path,
}
}
},
"output_directory": {
"scalar": {
"blob": {
"metadata": {"type": {"dimensionality": "MULTIPART"}},
"uri": output_directory.remote_path,
}
}
},
}
...
Usage Notes:
- The
input_directory
refers to the child directory (i.e. the trigger directory) to be passed to the target workflow. - The
output_directory
refers to directory where the output of the target workflow will be stored.
5. (Optional) Modify Automation Logic
The file wf/automation.py
contains the logic that determines how an execution for the target workflow should be launched.
The automation_task
defines the logic that is used to launch the workflow. The code below checks a registry table to see whether an output directory exists, and launches an execution for the target workflow if that is not the case.
Modify the function below to change the logic for launching target workflows.
# automation.py
import uuid
from typing import Set
from latch.registry.table import Table
from latch.resources.tasks import small_task
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.file import LatchFile
from .utils import launch_workflow
@small_task
def automation_task(
input_directory: LatchDir,
output_directory: LatchOutputDir,
target_wf_id: str,
table_id: str,
) -> None:
"""
Logic on how to process the input directory and launch the target workflows.
"""
# fetch the table using Latch SDK
automation_table = Table(table_id)
processed_directory_column = "Processed Directory"
# [PARAMS OMITTED]
# check if the provided table contains column `Processed Directory` and creates one if it isn't present
# we use Latch SDK to get the columns of the table and try to get the column by name
if automation_table.get_columns().get(processed_directory_column, None) is None:
with automation_table.update() as automation_table_updater: # create an update context for the table
automation_table_updater.upsert_column(processed_directory_column, LatchDir)
# fetch all the directories that have been processed and recorded in the Registry table previously
resolved_directories: Set[str] = set()
# list_records() returns a generator of records(rows) of the Registry Table
for page in automation_table.list_records():
for _, record in page.items():
value = record.get_values()[processed_directory_column]
assert isinstance(
value, LatchDir
) # we only allow processing of child directories
resolved_directories.add(str(value))
assert isinstance(input_directory.remote_path, str)
assert isinstance(output_directory.remote_path, str)
# Launch the target workflow for each child directory which hasn't been processed yet.
# Record the processed directory in the Registry table.
# iterdir() returns an iterator of the child files and directories of the input directory
for child in input_directory.iterdir():
# skip files, output directory and directories that have been processed
if (
isinstance(child, LatchFile)
or str(child) == str(output_directory)
or str(child) in resolved_directories
):
continue
with automation_table.update() as automation_table_updater:
# use a util function to launch the target workflow with the right parameters
launch_workflow(
target_wf_id=target_wf_id,
params=params,
)
# update registry table with the processed directory
automation_table_updater.upsert_record(
str(uuid.uuid4()),
**{
processed_directory_column: child,
},
)
5. Register Automation Workflow
Register the automation workflow to your Latch workspace.
$ latch register --remote --yes automation-wf
6. Create Automation
Navigate to Automations tab via Worfklows > Automations and click on the Create Automation button.
-
Input an Automation Name and Description.
-
Select the
Event Type
asData Added
. -
Specify
Follow-up Update Period
to something short like 30 seconds to make your automation easy to test. -
Select a folder where files/folders will be uploaded using the
Select Target
button. Any items uploaded to this folder will trigger the automation workflow. -
Select the automation workflow that you have just registered with Latch.
7. Test Your Automation
To test your automation, go to the target directory that you have specified when creating automation, and create a couple of folders. Upload any files to the folders, and wait for the trigger timer to expire.
Go to Worfklows > All Executions. There should be 1 automation workflow execution, and a target workflow execution for each child in your target directory. Each target workflow should print out
Was this page helpful?