The Latch SDK introduces a construct called map_task to help parallelize a task across a list of inputs. This means you can run multiple instances of the task at the same time inside a single workflow, providing valuable performance gains.

Let’s look at a simple example below!

First, import map_task into your workflow:

from typing import List

from latch import map_task, small_task, workflow

Next, define a task to use in the map task.

A map task can only accept one input and produce one output.

@small_task
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified

Let’s also define a task that collects the mapped output and returns a string:

@small_task
def coalesce(b: List[str]) -> str:
    coalesced = "".join(b)
    return coalesced

We can run a_mappable_task across a collection of inputs using the map_task function. This function takes in a_mappable_task and returns a mapped version of that task. This mapped version takes as input a list of inputs to a_mappable_task , and returns a list of the outputs of a_mappable_task run on all inputs in the list in parallel.

@workflow
def my_map_workflow(a: typing.List[int]) -> str:
    mapped_out = map_task(a_mappable_task)(a=a)
    coalesced = coalesce(b=mapped_out)
    return coalesced

That’s it! You’ve successfully defined a_mappable_task that is passed to a map_task() and run repeatedly on a list of inputs in parallel. You have also defined a coalesce task to collect the list of outputs from the mapped task and returns a string.

Map a Task with Multiple Inputs

You may want to map a task with multiple inputs.

For example, the task below takes in 2 inputs, a base and a DNA sequence, and returns the percentage of that base in the sequence:

@small_task
def count_task(base: str, dna_sequence: str) -> float:
    return dna_sequence.count(base) / len(dna_sequence) * 100

But we only want to map this task with the base input while the dna_sequence stays the same. Since a map task accepts only one input, we can do this by creating a new task that prepares the map task’s inputs.

We start by putting the inputs in a Dataclass and dataclass_json.

from dataclasses import dataclass
from dataclasses_json import dataclass_json

@dataclass_json
@dataclass
class MapInput:
    base: str
    dna_sequence: str

Let’s also define our helper task to prepare the map task’s inputs.

@small_task
def prepare_map_inputs(list_base: List[str], dna_sequence: str) -> List[MapInput]:
    return [MapInput(base, dna_sequence) for base in list_base]

We now refactor the original count_task. Instead of 2 inputs, count_task has a single input:

@small_task
def mappable_task(input: MapInput) -> float:
    return input.dna_sequence.count(input.base) / len(input.dna_sequence) * 100

Let’s use the new mappable_task in our workflow:

@workflow
def count_wf(list_base: List[str] = ["A", "T", "C", "G"], dna_sequence: str = "AAAATTTCCGG") -> List[float]:
    prepared = prepare_map_inputs(list_base=list_base, dna_sequence=dna_sequence)
    return map_task(mappable_task)(input=prepared)

Great! Now, we are able to use the count_wf to spin up four tasks in parallel. The map_task returns a list of four floats, each of which is the percentage of base pair in the DNA sequence.

Bonus: Learning through a Biological Example

In the example below, we walk through a practical example of how we can use the map task construct to run FastQC on multiple samples and summarize their results in a MultiQC report.

First, we define a Dataclass that contains a sample name and its associated FastQ file:

@dataclass_json
@dataclass
class Sample:
    sample_name: str
    fastq: LatchFile

Then, we create a task to run FastQC on a single sample and output the result under the FastQC Results folder on Latch.

@small_task
def fastqc_task(sample) -> LatchDir:

    outdir = Path("/root/fastqc_result").resolve()
    outdir.mkdir(exist_ok=True)

    _fastqc_cmd = [
        "/root/FastQC/fastqc",
        sample.fastq.local_path,
        f"--outdir={outdir}"
    ]

    subprocess.run(_fastqc_cmd, check=True)

    return LatchDir("/root/fastqc_result", f"latch:///FastQC Results/{sample.sample_name}")

Concept check: Note how this task will later be mapped across a list of samples. Therefore, the task is defined to accept one input and return one output.

Next, define a second task to run MultiQC on a given directory for analysis logs and compiles a HTML report.

@small_task
def multiqc_task(fastqc_results: List[LatchDir]) -> LatchDir:

    outdir = Path("/root/multiqc_results").resolve()
    outdir.mkdir(exist_ok=True)

    fastqc_dirs = [result.local_path for result in fastqc_results]

    _multiqc_cmd = ["multiqc"] + fastqc_dirs + ["-o", outdir]

    subprocess.run(_multiqc_cmd, check=True)

    return LatchDir(outdir, "latch:///MultiQC Results")

Concept check: Because the map task will return a list of LatchDirs, each of which contains an individual sample’s FastQC results, the multiqc_task needs to also accept a list of LatchDirs.

Finally, we can specify our workflow, which accepts a list of Samples and returns a single directory with the MultiQC report:

@workflow(metadata)
def fastqc_multiqc_wf(samples: List[Sample]) -> LatchDir:
    fastqc_results = map_task(fastqc_task)(sample=samples)  # returns List[LatchDir]
    return multiqc_task(fastqc_results=fastqc_results) # accepts a List[LatchDir] and return a single LatchDir with the MultiQC result