The Runner classes are the core components for building task agents in Encord. They provide a simple interface for defining agent logic and moving tasks through the Encord project workflows.

The Runners manage the execution of agent logic on tasks within specific workflow stages. They are responsible for:

  • Connect directly to your Encord project via the Encord SDK
  • Provide function decorators to associate your agent logic with workflow stages
  • Manage retries and error handling
  • Handle task fetching and updates
  • Optimize performance through batched updates and data loading

The following sections go through the different components of the Runners and how to use them.

Stage Decorators

Consider a workflow with three stages: start, agent, and complete. The Runner objects from encord_agents.tasks.runner is used to define the logic for agent stages (the purple nodes in the workflow).

In this example, the Runner is responsible for implementing the logic for the Agent 1 stage.

The @runner.stage decorator connects your functions to specific stages in your Encord workflow. For the workflow above, the logic for the Agent 1 stage can be defined in the following manner:

@runner.stage(stage = "Agent 1")
# or @runner.stage(stage = "6011c844-fb26-438b-b465-0b0825951015")
def my_agent(lr: LabelRowV2, ...) -> str | UUID | None:
    """
    Args:
        lr: Automatically injected via by the `Runner`
        ...: See the "Dependencies" section for examples of
             how to, e.g., inject assets, client metadata, and
             more.

    Returns:
        The name or UUID of the pathway where the task should go next,
        or None to leave the task in the current stage.
    """
    pass

The agent function must return the next stage for the task. This can be specified using either the pathway name or a UUID.

If None is returned, the task remains in its current stage and will be reprocessed during the next execution of that stage.

You can also define multiple stages in a single runner:

@runner.stage("prelabel")
def prelabel_task(lr: LabelRowV2) -> str:
    # Add initial labels
    return "review"

@runner.stage("validate")
def validate_task(lr: LabelRowV2) -> str:
    # Validate labels
    return "complete"

If you define multiple stages, the execution logic depends on the type of Runner used. For example, if a Runner is configured with two stages:

=== “Runner”

runner = Runner()

@runner.stage("stage_1")
def stage_1():
    return "next"

@runner.stage("stage_2")
def stage_2():
    return "next"

The Runner executes the tasks in the order in which the stages were defined in the runner. That is, the tasks are processed in stage_1 first and in stage_2 next.

=== “QueueRunner”

runner = QueueRunner()

@runner.stage("stage_1")
def stage_1():
    return "next"

@runner.stage("stage_2")
def stage_2():
    return "next"

The QueueRunner gives you control over the task queues for each stage. Please refer to the QueueRunner documentation for more information.

Optional arguments

When you wrap a function with @runner.stage(...), you can include the label_row_metadata_include_args: LabelRowMetadataIncludeArgs argument. This argument passes directly to the Encord Project’s list_label_rows_v2 method, enabling you to read client metadata associated with a task. If you need to update metadata, use the dep_storage_item dependency.

Here is an example:

args = LabelRowMetadataIncludeArgs(
    include_client_metadata=True,
)
@runner.stage("<my_stage_name>", label_row_metadata_include_args=args)
def my_agent(lr: LabelRowV2):
    lr.client_metadata  # will now be populated

When developing your agent in a REPL environment (like Jupyter notebooks), re-running cells or snippets can be useful. By default, the library raises an error upon re-execution of the following snippet because it prevents defining multiple agents for a given stage.

@runner.stage("<my_stage_name>")
def stage():
    ...

If you want to overwrite the function associated to a given stage, this can be done by:

@runner.stage("<my_stage_name>", overwrite=True)
def stage():
    ...

This ensures the definition for a given stage updates on each re-run and preserves the original order of execution for stages.

Dependencies

The Runner supports dependency injection, similar to FastAPI. Dependencies are functions that provide common resources or utilities to your agent functions.

Built-in Dependencies

Example

The library provides multiple commonly used dependencies. See the References section for an explicit list. Below, we demonstrate how an agent function obtains both label rows from “twin projects” and a frame iterator for videos simply by specifying these as dependencies.

from typing_extensions import Annotated
from encord.workflow.stages.agent import AgentStage
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import (
    Twin,              # Access a "twin" project's labels
    dep_twin_label_row,# Get label row from twin project
    dep_video_iterator # Iterate over video frames
)

@runner.stage("my_stage")
def my_agent(
    task: AgentTask,
    lr: LabelRowV2,
    twin: Annotated[Twin, Depends(dep_twin_label_row(twin_project_hash="..."))],
    frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
) -> str:
    # Use the dependencies
    pass

In the function body above, the task, lr, twin, and frames variables are automatically injected with their respective dependencies. This provides the flexibility to inject only necessary dependencies, allowing you to focus solely on your agent’s logic.

Annotations

You can access three core object types in your stage implementation without extensive type annotations: AgentTask, Project, and LabelRowV2.

If you type any parameter of your stage implementation (e.g., the my_agent function) with one of these types, the system automatically injects the corresponding object, matching the task at hand.

For example:

from encord.project import Project
...

@runner.stage("your_stage_name")
def my_agent(project: Project):
    ...

Here, the project variable automatically receives the workflow Project instance corresponding to the project_hash defined or executed by the runner.

Similarly, access the task and label_row (associated with the given task) like this:

from encord.objects import LabelRowV2
from encord.workflow.stages.agent import AgentTask

@runner.stage("your_stage_name")
def my_agent(task: AgentTask, label_row: LabelRowV2):
    ...

The remaining dependencies must be specified with a encord_agents.tasks.dependencies.Depends type annotation using one of the following two patterns.

from typing_extensions import Annotated

from encord.storage import StorageItem
from encord_agents.tasks.dependencies import (
    Depends, 
    dep_storage_item,
)


@runner.stage("your_stage_name")
def my_agent(
    storage_item_1: Annotated[StorageItem, Depends(dep_storage_item)],
    # or storage_item_1: StorageItem = Depends(dep_storage_item)
):
    ...

Custom Dependencies

Dependencies can be any function that has a similar function declaration to the ones above. Specifically, functions that have parameters typed with AgentTask, Project, LabelRowV2, or other dependencies annotated with Depends.

Your custom dependencies should not include any default values or additional arguments directly. Instead, wrap the dependency in a function to pass additional arguments or set default values:

from functools import partial

def my_dependency(arg1: str, arg2: int):
    # Your dependency logic here
    pass

def my_dependency_with_default_arg(arg1: str, arg2: int = 42):
    """
    Wraps 'my_dependency' to set a default value for 'arg2'.
    """
    return partial(my_dependency, arg2=arg2) 

@runner.stage("my_stage")
def my_agent(
    # Notice the call to the function to apply defaults
    dep: Annotated[MyDependency, Depends(my_dependency_with_default_arg())]  
):
    # Your agent logic here
    pass

This approach allows you to set default values for your dependency arguments when you call the wrapper function.

You can create your own dependencies that can also use nested dependencies in the following manner:

from encord.objects import LabelRowV2
from encord.storage import StorageItem

def my_custom_dependency(
    lr: LabelRowV2,
    storage_item: StorageItem = Depends(dep_storage_item)
) -> dict:
    """Custom dependencies can use LabelRowV2 and other dependencies"""
    return {
        "data_title": lr.data_title,
        "metadata": storage_item.client_metadata
    }

@runner.stage("my_stage")
def my_agent(
    metadata: Annotated[dict, Depends(my_custom_dependency)]
) -> str:
    # metadata is automatically injected
    return "next_stage"

Optional Pre-Execution Agent Validation

To add validation that your runner is suitable for your Project, such as checking for an appropriate Ontology or workflow stages, pass an additional pre_execution_callback parameter when defining your Runner.

def pre_execution_callback(runner: Runner) -> None:
    assert runner.project
    project = runner.project
    # Throws if child not found
    project.ontology_structure.get_child_by_title("Car")
    assert runner.agents

runner = Runner(pre_execution_callback=pre_execution_callback)
# Won't yet validate

# Define the agent
@runner.stage("Agent stage")
def agent_stage(task: AgentTask) -> str:
    ...
    return "labeled"

if __name__ == "__main__":
    runner.run()

You can execute the script with: python agent.py --project-hash=<your-project-hash>. Before execution, we fetch the Project and run validation, which has access to the entire runner object. This validation occurs just before fetching and executing tasks, allowing you to perform arbitrary checks, including referencing agents, as shown above.

Running the Runner

There are two different types of runners with different use-cases. They also have two slightly different execution interfaces.

Refer to the following pages for more information:

  1. Runner: This is a simple sequential runner to run the agent functions one after the other. It is easier to debug and understand. Use this for simple workflows or for testing out functionality before you scale it with the QueueRunner.
  2. QueueRunner: This is a more advanced runner that allows you to run the agent functions in parallel. It is useful when you have a lot of tasks to process and you want to speed up the processing time via parallel execution.

Comparison between Runner and QueueRunner

The key differences between QueueRunner and the sequential Runner are:

FeatureRunnerQueueRunner
Execution ModelExecutes tasks sequentially in a single processDesigned for distributed execution across multiple processes
Project HashOptional at initializationRequired at initialization
Function WrappingExecutes your function directly with injected dependenciesAdditionally wraps your function to handle JSON task specifications
Execution ControlHandles task polling and executionYou control task distribution and execution through your queue system
ScalingNot suitable for scalingSuitable for scaling