The Runner executes tasks sequentially, making it ideal for debugging and testing workflows. Use it for simple workflows or for testing functionality before scaling up with the QueueRunner.

Basic Usage

The basic usage pattern of the Runner follows three steps:
  1. Initialize the runner
  2. Implement the logic for each stage in your Workflow you want to capture with the runner
  3. Execute the runner
The following example shows how to initialize the runner and implement the logic for each stage in your Workflow you want to capture with the runner.
example_agent.py
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks import Runner

# Step 1: Initialization
# Initialize the runner
# Project hash is optional but allows you to "fail fast" if stages are misconfigured
runner = Runner(project_hash="<your_project_hash>")

# Step 2: Definition
# Define agent logic for a specific stage
@runner.stage(stage="my_stage_name")  # or stage="<stage_uuid>"
def process_task(lr: LabelRowV2) -> str | None:
    # Modify the label row as needed
    lr.set_priority(0.5)

    # Return the pathway name or UUID where the task should go next
    return "next_stage"

# Step 3: Execution
if __name__ == "__main__":
    # via the CLI
    runner.run()

    # or via code
    runner(
        project_hash="<your_project_hash">,
        refresh_every=3600,  # seconds
        num_retries = 1,
        task_batch_size = 1,
    )
To execute the runner via the CLI, you can do:
# simple
python example_agent.py --project-hash <your_project_hash>
# use help for additional configurations
python example_agent.py --help

Running Agents

Basic Execution

runner.run()  # will run the runner as CLI tool
runner()      # will run the runner directly
Both options:
  1. Connect to your Encord project
  2. Poll for tasks in the configured stages
  3. Execute your agent functions on each task
  4. Move tasks according to returned pathway
  5. Retry failed tasks up to num_retries times
See the configuration options below.

Command Line Interface

The runner exposes configuration via CLI:
python my_agent.py \
    --project-hash "<project_hash>" \
    --task-batch-size 1 \
    --num-retries 3
    --refresh-every 3600 # seconds

Order of execution

The runner processes tasks by emptying the queue for "stage_1" first, then successively emptying the queue for "stage_2". If you set the refresh_every argument, the runner repolls both queues after emptying the initial set. This ensures data that arrived in the queue after the initial poll is picked up in the subsequent iteration. If an execution’s time already exceeds the refresh_every threshold, the agent instantly polls for new tasks. To illustrate the order of execution, see the pseudo-code below.
# ⚠️  PSEUDO CODE - not intended for copying ⚠️
def execute(self, refresh_every = None):
    timestamp = datetime.now()
    while True:
        # self.agents ≈ [stage_1, stage_2]
        for agent in self.agents:  
            for task in agent.get_tasks():
                # Inject params based on task
                stage.execute(solve_dependencies(task, agent))  

        if refresh_every is None:
            break
        else:
            # repeat after timestamp + timedelta(seconds=refresh_every)
            # or straight away if already exceeded
            ...

Error Handling

The runner:
  • Retries failed tasks up to num_retries times (default: 3). Changes to the label row are not rolled back.
  • Logs errors for debugging
  • Continues processing other tasks if a task fails
  • Bundles updates for better performance (configurable via task_batch_size)

Configuration

Initialization

encord_agents.tasks.runner.Runner.__init__
    options:
        show_if_no_docstring: false
        show_subodules: false

Runtime Configuration

There are two ways to execute the runner.
  1. Either run the runner directly from your code:
runner = Runner()
runner(project_hash="<your_project_hash>")  # See available parameters below
  1. Or run it using the command-line interface (CLI) by employing the runner.run() function.
Suppose you have an example.py file that looks like this:
example.py
...
runner = Runner()
...
if __name__ == "__main__":
    runner.run()
Then, the runner functions as a CLI tool, accepting the same arguments as when executed in code.
$ python example.py --help

 Usage: example.py [OPTIONS]

 Execute the runner.

╭─ Options ──────────────────────────────────────────────────────────╮
 --refresh-every   INTEGER  Fetch task statuses from the Encord
                            Project every `refresh_every` seconds.
                            If `None`, the runner will exit once
                            task queue is empty.
                            [default: None]
 --num-retries     INTEGER  If an agent fails on a task, how many
                            times should the runner retry it?
                            [default: 3]
 --task-batch-size INTEGER  Number of tasks for which labels are
                            loaded into memory at once.
                            [default: 300]
 --project-hash    TEXT     The project hash if not defined at
                            runner instantiation.
                            [default: None]
 --help                     Show this message and exit.
╰────────────────────────────────────────────────────────────────────╯

Performance Considerations

By default, the Runner bundles task updates in batches of 300 for better performance. For debugging or when immediate updates are required, you can set task_batch_size to 1.
# Via CLI
python my_agent.py --task-batch-size 1
Or in code
runner(task_batch_size=1)
To speed up your agent’s label row updates, you can return a struct object. Encord then saves the results in batches. This minimizes the time your agent spends writing data. To use this feature, write your code to return a struct object instead of performing individual saves.
When the TaskAgentReturnStruct is returned, bundle is used to batch label row updates, significantly improving performance.
from encord_agents.tasks.models import TaskAgentReturnStruct
from encord.storage import StorageItem
from encord.objects import LabelRowV2

@runner.stage("Agent Stage")
def agent_stage(label_row: LabelRowV2, storage_item: StorageItem) -> TaskAgentReturnStruct:
    # Modify the label row in any manner
    # ...
    return TaskAgentReturnStruct(pathway="Modified label", label_row=label_row)

Scaling with the QueueRunner

The QueueRunner is an advanced runner designed for parallel processing of multiple tasks, ideal for speeding up execution of large task volumes. Both the Runner and QueueRunner share the same interface. The primary distinction lies in their execution:
  • The Runner executes tasks sequentially using its run() function.
  • The QueueRunner converts your implementations into functions that accept a task specification as a JSON string and return a encord_agents.tasks.models.TaskCompletionResult as a JSON string. This stringified JSON format is necessary for passing messages over queues, which typically do not support custom object types.
Here’s an example of how this difference manifests:
runner = Runner()
@runner.stage("my_stage")
def my_agent(task: AgentTask, label_row: LabelRowV2):
    ...
runner()
Refer to the Celery example or Modal example for more information.

Comparison with Queue Runner

The key differences between QueueRunner and the sequential Runner are:
FeatureRunnerQueueRunner
Execution ModelExecutes tasks sequentially in a single processDesigned for distributed execution across multiple processes
Project HashOptional at initializationRequired at initialization
Function WrappingExecutes your function directly with injected dependenciesAdditionally wraps your function to handle JSON task specifications
Execution ControlHandles task polling and executionYou control task distribution and execution through your queue system
ScalingNot suitable for scalingSuitable for scaling