Task Agents

models

TaskAgentReturnStruct Objects

@dataclass
class TaskAgentReturnStruct()

Return this from your agent and we will handle propagating the updates in batches

pathway

The pathway that the task will follow on task completion

label_row

The label to be saved (if present)

label_row_priority

The priority of the label row to be saved.

TaskCompletionResult Objects

class TaskCompletionResult(BaseModel)

Data model to hold information about the completion result of encord_agents.tasks.QueueRunner agents.

dependencies

dep_client

def dep_client() -> EncordUserClient

Dependency to provide an authenticated user client.

Example:

from encord.user_client import EncordUserClient
from encord_agents.tasks.dependencies import dep_client
...
@runner.stage("<my_stage_name>")
def my_agent(
    client: Annotated[EncordUserClient, Depends(dep_client)]
) -> str:
    # Client will authenticated and ready to use.
    client.get_dataset("")

dep_storage_item

def dep_storage_item(storage_item: StorageItem) -> StorageItem

Get the storage item associated with the underlying agent task.

The StorageItem is useful for multiple things like

  • Updating client metadata
  • Reading file properties like storage location, fps, duration, DICOM tags, etc.

Note: When marking a task agent with the StorageItem dependency, we will bulk fetch the storage items for the tasks and then inject them independently with each task. Trivial method for backwards compatibility. Can do: storage_item: StorageItem directly

Example

from encord.storage import StorageItem
from encord_agents.tasks.dependencies import dep_storage_item

@runner.stage(stage="<my_stage_name>")
def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
    print(storage_item.name)
    print(storage_item.client_metadata)
    ...

Arguments:

  • storage_item - StorageItem

Returns:

The storage item.

dep_single_frame

def dep_single_frame(storage_item: StorageItem) -> NDArray[np.uint8]

Dependency to inject the first frame of the underlying asset.

The downloaded asset will be named lr.data_hash.{suffix}. When the function has finished, the downloaded file will be removed from the file system.

Example:

from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_single_frame
...

@runner.stage("<my_stage_name>")
def my_agent(
    frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
) -> str:
    assert frame.ndim == 3, "Will work"

Arguments:

  • storage_item - The Storage item. Automatically injected (see example above).

Returns:

Numpy array of shape [h, w, 3] RGB colors.

dep_video_iterator

def dep_video_iterator(
        storage_item: StorageItem) -> Generator[Iterator[Frame], None, None]

Dependency to inject a video frame iterator for doing things over many frames. This will use OpenCV and the local backend on your machine. Decoding support may vary dependent on the video format, codec and your local configuration

Intended use

from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_video_iterator
...

@runner.stage("<my_stage_name>")
def my_agent(
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
) -> str:
    for frame in video_frames:
        print(frame.frame, frame.content.shape)

Arguments:

  • storage_item - Automatically injected Storage item dependency.

Raises:

  • NotImplementedError - Will fail for other data types than video.

Yields:

An iterator.

dep_video_sampler

def dep_video_sampler(
    storage_item: StorageItem
) -> Generator[Callable[[float | Sequence[int]], Iterable[Frame]], None, None]

Dependency to inject a video sampler for doing things over many frames. This will use OpenCV and the local backend on your machine. Decoding support may vary dependent on the video format, codec and your local configuration.

Arguments:

  • storage_item - Automatically injected Storage item dependency.

    Example:

from encord_agents.tasks.dependencies import dep_video_sampler
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<stage_name_or_uuid>")
def my_agent(
    video_sampler: Annotated[Callable[[float | Sequence[int]], Iterable[Frame]], Depends(dep_video_sampler)],
) -> str | None:
    for frame in video_sampler(1/5):
        # Get every 5th frame
        # i.e: [0,5,10,15,...]
    for frame in video_sampler([1, 2, 3]):
        # Get frames 1, 2, 3
    ...

dep_asset

def dep_asset(storage_item: StorageItem) -> Generator[Path, None, None]

Get a local file path to data asset temporarily stored till end of task execution.

This dependency will fetch the underlying data asset based on a signed url. It will temporarily store the data on disk. Once the task is completed, the asset will be removed from disk again.

Example:

from encord_agents.tasks.dependencies import dep_asset
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<stage_name_or_uuid>")
def my_agent(
    asset: Annotated[Path, Depends(dep_asset)],
) -> str | None:
    asset.stat()  # read file stats
    ...

Returns:

The path to the asset.

Raises:

  • ValueError - if the underlying assets are not videos, images, or audio.
  • EncordException - if data type not supported by SDK yet.

Twin Objects

@dataclass(frozen=True)
class Twin()

Dataclass to hold “label twin” information.

dep_twin_label_row

def dep_twin_label_row(
        twin_project_hash: str,
        init_labels: bool = True,
        include_task: bool = False) -> Callable[[LabelRowV2], Twin | None]

Dependency to link assets between two Projects. When your Runner in running on <project_hash_a>, you can use this to get a Twin of labels and the underlying task in the “twin project” with <project_hash_b>.

This is useful in situations like:

  • When you want to transfer labels from a source project” to a sink project.
  • If you want to compare labels to labels from other projects upon label submission.
  • If you want to extend an existing project with labels from another project on the same underlying data.

Example:

from encord.workflow.common import WorkflowTask
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<my_stage_name_in_project_a>")
def my_agent(
    project_a_label_row: LabelRowV2,
    twin: Annotated[
        Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
    ],
) -> str | None:
    label_row_from_project_b: LabelRowV2 = twin.label_row
    task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)

Arguments:

  • twin_project_hash - The project has of the twin project (attached to the same datasets) from which you want to load the additional data.
  • init_labels - If true, the label row will be initialized before calling the agent.
  • include_task - If true, the task field of the Twin will be populated. If population fails, e.g., for non-workflow projects, the task will also be None.

Returns:

The twin.

Raises:

  • encord.AuthorizationError - if you do not have access to the project.

dep_data_lookup

def dep_data_lookup(
        lookup: Annotated[DataLookup,
                          Depends(DataLookup.sharable)]) -> DataLookup

Get a lookup to easily retrieve data rows and storage items associated with the given task.

!!! warning “Deprecated” dep_data_lookup is deprecated and will be removed in version 0.2.10. Use dep_storage_item instead for accessing storage items.

Migration Guide:

## Old way (deprecated)
from encord_agents.tasks.dependencies import dep_data_lookup, DataLookup

@runner.stage(stage="Agent 1")
def my_agent(
    task: AgentTask,
    lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
) -> str:
    storage_item = lookup.get_storage_item(task.data_hash)
    client_metadata = storage_item.client_metadata
    ...

## New way (recommended)
from encord_agents.tasks.dependencies import dep_storage_item

@runner.stage(stage="Agent 1")
def my_agent(
    task: AgentTask,
    storage_item: Annotated[StorageItem, Depends(dep_storage_item)]
) -> str:
    # storage_item is directly available
    client_metadata = storage_item.client_metadata

    # Update metadata
    storage_item.update(
        client_metadata={
            "new": "entry",
            **(client_metadata or {})
        }
    )
    ...

Arguments:

  • lookup - The object that you can use to lookup data rows and storage items. Automatically injected.

Returns:

The (shared) lookup object.

runner.queue_runner

QueueRunner Objects

class QueueRunner(RunnerBase)

This class is intended to hold agent implementations. It makes it easy to put agent task specifications into a queue and then execute them in a distributed fashion.

Below is a template for how that would work.

Example:

runner = QueueRunner(project_hash="...")

@runner.stage("Agent 1")
def my_agent_implementation() -> str:
    # ... do your thing
    return "<pathway_name>"

## Populate the queue
my_queue = ...
for stage in runner.get_agent_stages():
    for task in stage.get_tasks():
        my_queue.append(task.model_dump_json())

## Execute on the queue
while my_queue:
    task_spec = my_queue.pop()
    result_json = my_agent_implementation(task_spec)
    result = TaskCompletionResult.model_validate_json(result_json)

__init__

def __init__(project_hash: str | UUID)

Initialize the QueueRunner with a project hash.

This is the hash of the project that you want to run the tasks on.

Arguments:

  • project_hash - The hash of the project to run the tasks on.

stage

def stage(
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs
    | None = None,
    will_set_priority: bool = False
) -> Callable[[Callable[..., TaskAgentReturnType]], Callable[[str], str]]

Agent wrapper intended for queueing systems and distributed workloads.

Define your agent as you are used to with dependencies in the method declaration and return the pathway from the project workflow that the task should follow upon completion. The function will be wrapped in logic that does the following (in pseudo code):

When you have an encord.workflow.stages.agent.AgentTask instance at hand, let’s call it task, then you can call your wrapped_function with task.model_dump_json(). Similarly, you can put task.model_dump_json() int a queue and read from that queue, e.g., from another instance/process, to execute wrapped_function there.

As the pseudo code indicates, wrapped_function understands how to take that string from the queue and resolve all your defined dependencies before calling your_function.

@runner.stage("stage_name")
def my_function(...)
    ...

## is equivalent to

def wrapped_function(task_json_spec: str) -> str (result_json):
    task = fetch_task(task_sped)
    resources = load_resources(task)
    pathway = your_function(resources)  # <- this is where your code goes
    task.proceed(pathway)
    return TaskCompletionResult.model_dump_json()

Arguments:

  • stage - The name or uuid of the stage that the function should be associated with.
  • label_row_metadata_include_args - Arguments to be passed to project.list_label_rows_v2(...)
  • label_row_initialise_labels_args - Arguments to be passed to label_row.initialise_labels(...)
  • will_set_priority - Indicates whether you will be returning a TaskAgentReturnStruct with a label_row_priority field set. This field is only required if you are returning the priority of the label row but not depending on the label row it self. That is, if your function signature does not include a LabelRowV2 parameter.

Returns:

The decorated function.

get_agent_stages

def get_agent_stages() -> Iterable[AgentStage]

Get the agent stages for which there exist an agent implementation.

This function is intended to make it easy to iterate through all current agent tasks and put the task specs into external queueing systems like Celery or Modal.

For a concrete example, please see the doc string for the class it self.

Note that if you didn’t specify an implementation (by decorating your function with @runner.stage) for a given agent stage, the stage will not show up by calling this function.

Returns:

An iterable over encord.workflow.stages.agent.AgentStage objects where the runner contains an agent implementation.

Raises:

  • AssertionError - if the runner does not have an associated project.

runner.runner_base

RunnerBase Objects

class RunnerBase()

__init__

def __init__(project_hash: str | UUID | None = None)

Initialize the runner with an optional project hash.

The project_hash will allow stricter stage validation. If left unspecified, errors will first be raised during execution of the runner.

Arguments:

  • project_hash - The project hash that the runner applies to.

    Can be left unspecified to be able to reuse same runner on multiple projects.

runner.sequential_runner

SequentialRunner Objects

class SequentialRunner(RunnerBase)

Runs agents against Workflow projects.

When called, it will iteratively run agent stages till they are empty. By default, runner will exit after finishing the tasks identified at the point of trigger. To automatically re-run, you can use the refresh_every keyword.

Example:

example_agent.py
from uuid import UUID
from encord_agents.tasks import Runner
runner = Runner()

@runner.stage("<workflow_node_name>")
## or
@runner.stage("<workflow_node_uuid>")
def my_agent(task: AgentTask) -> str | UUID | None:
    ...
    return "pathway name"  # or pathway uuid

runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
## or
if __name__ == "__main__":
    # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
    runner.run()

__init__

def __init__(project_hash: str | None = None,
             *,
             pre_execution_callback: Callable[[Self], None] | None = None)

Initialize the runner with an optional project hash.

The project_hash will allow stricter stage validation. If left unspecified, errors will first be raised during execution of the runner.

Arguments:

  • project_hash - The project hash that the runner applies to.

    Can be left unspecified to be able to reuse same runner on multiple projects.

  • pre_execution_callback - Callable[RunnerBase, None]

    Allows for optional additional validation e.g. Check specific Ontology form

stage

def stage(
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs
    | None = None,
    overwrite: bool = False,
    will_set_priority: bool = False
) -> Callable[[DecoratedCallable], DecoratedCallable]

Decorator to associate a function with an agent stage.

A function decorated with a stage is added to the list of stages that will be handled by the runner. The runner will call the function for every task which is in that stage.

Example:

The function declaration can be any function that takes parameters that are type annotated with the following types:

  • Project: the encord.project.Project that the runner is operating on.
  • LabelRowV2: the encord.objects.LabelRowV2 that the task is associated with.
  • AgentTask: the encord.workflow.stages.agent.AgentTask that the task is associated with.
  • Any other type: which is annotated with a dependency

All those parameters will be automatically injected when the agent is called.

Example:

runner = Runner()

@runner.stage("<stage_name_or_uuid>")
def my_func() -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"
from typing import Iterator
from typing_extensions import Annotated

from encord.project import Project
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import dep_video_iterator
from encord.workflow.stages.agent import AgentTask

runner = Runner()

def random_value() -> float:
    import random
    return random.random()

@runner.stage("<stage_name_or_uuid>")
def my_func(
    project: Project,
    lr: LabelRowV2,
    task: AgentTask,
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
    custom: Annotated[float, Depends(random_value)]
) -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

Arguments:

  • stage - The name or uuid of the stage that the function should be associated with.
  • label_row_metadata_include_args - Arguments to be passed to project.list_label_rows_v2(...)
  • label_row_initialise_labels_args - Arguments to be passed to label_row.initialise_labels(...)
  • overwrite - Overwrite the method associated to this stage if it already exists will throw an error otherwise
  • will_set_priority - Indicates whether you will be returning a TaskAgentReturnStruct with a label_row_priority field set. This field is only required if you are returning the priority of the label row but not depending on the label row it self. That is, if your function signature does not include a LabelRowV2 parameter.

Returns:

The decorated function.

__call__

def __call__(
    refresh_every: Annotated[
        Optional[int],
        Option(
            help=
            "Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
        ),
    ] = None,
    num_retries: Annotated[
        int,
        Option(
            help=
            "If an agent fails on a task, how many times should the runner retry it?"
        )] = 3,
    task_batch_size: Annotated[
        int,
        Option(
            help=
            "Number of tasks for which labels are loaded into memory at once."
        )] = 300,
    project_hash: Annotated[
        Optional[str],
        Option(help="The project hash if not defined at runner instantiation."
               )] = None,
    max_tasks_per_stage: Annotated[
        Optional[int],
        Option(
            help=
            "Max number of tasks to try to process per stage on a given run. If `None`, will attempt all",
        ),
    ] = None
) -> None

Run your task agent runner(...).

???+ info “Self-updating/Polling runner” The runner can continuously poll new tasks in the project and execute the defined stage agents. To do so, please set the refresh_every parameter. When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

Arguments:

  • refresh_every - Fetch task statuses from the Encord Project every refresh_every seconds. If None, the runner will exit once task queue is empty.
  • num_retries - If an agent fails on a task, how many times should the runner retry it?
  • task_batch_size - Number of tasks for which labels are loaded into memory at once.
  • project_hash - The project hash if not defined at runner instantiation.

Returns:

None

run

def run() -> None

Execute the runner.

This function is intended to be called from the “main file”. It is an entry point to be able to run the agent(s) via your shell with command line arguments.

Example:

You can then run execute the runner with:

to see the options is has (it’s those from Runner.__call__).

example.py
runner = Runner(project_hash="<your_project_hash>")

@runner.stage(stage="...")
def your_func() -> str:
    ...

if __name__ == "__main__":
    runner.run()
python example.py --help