Runner
executes tasks sequentially, making it ideal for debugging and testing workflows. Use it for simple workflows or for testing functionality before scaling up with the QueueRunner
.
Basic Usage
The basic usage pattern of theRunner
follows three steps:
- Initialize the runner
- Implement the logic for each stage in your Workflow you want to capture with the runner
- Execute the runner
example_agent.py
Running Agents
Basic Execution
- Connect to your Encord project
- Poll for tasks in the configured stages
- Execute your agent functions on each task
- Move tasks according to returned pathway
- Retry failed tasks up to
num_retries
times
Command Line Interface
The runner exposes configuration via CLI:Order of execution
The runner processes tasks by emptying the queue for"stage_1"
first, then successively emptying the queue for "stage_2"
. If you set the refresh_every
argument, the runner repolls both queues after emptying the initial set. This ensures data that arrived in the queue after the initial poll is picked up in the subsequent iteration. If an execution’s time already exceeds the refresh_every
threshold, the agent instantly polls for new tasks.
To illustrate the order of execution, see the pseudo-code below.
Error Handling
The runner:- Retries failed tasks up to
num_retries
times (default: 3). Changes to the label row are not rolled back. - Logs errors for debugging
- Continues processing other tasks if a task fails
- Bundles updates for better performance (configurable via
task_batch_size
)
Configuration
Initialization
Runtime Configuration
There are two ways to execute the runner.- Either run the runner directly from your code:
- Or run it using the command-line interface (CLI) by employing the
runner.run()
function.
example.py
file that looks like this:
example.py
Performance Considerations
By default, the Runner bundles task updates in batches of 300 for better performance. For debugging or when immediate updates are required, you can settask_batch_size
to 1.
struct
object. Encord then saves the results in batches. This minimizes the time your agent spends writing data. To use this feature, write your code to return a struct object instead of performing individual saves.
When the
TaskAgentReturnStruct
is returned, bundle is used to batch label row updates, significantly improving performance.Scaling with the QueueRunner
The QueueRunner is an advanced runner designed for parallel processing of multiple tasks, ideal for speeding up execution of large task volumes.
Both the Runner and QueueRunner share the same interface. The primary distinction lies in their execution:
- The Runner executes tasks sequentially using its
run()
function. - The QueueRunner converts your implementations into functions that accept a task specification as a JSON string and return a
encord_agents.tasks.models.TaskCompletionResult
as a JSON string. This stringified JSON format is necessary for passing messages over queues, which typically do not support custom object types.
Refer to the Celery example or Modal example for more information.
Comparison with Queue Runner
The key differences betweenQueueRunner
and the sequential Runner
are:
Feature | Runner | QueueRunner |
---|---|---|
Execution Model | Executes tasks sequentially in a single process | Designed for distributed execution across multiple processes |
Project Hash | Optional at initialization | Required at initialization |
Function Wrapping | Executes your function directly with injected dependencies | Additionally wraps your function to handle JSON task specifications |
Execution Control | Handles task polling and execution | You control task distribution and execution through your queue system |
Scaling | Not suitable for scaling | Suitable for scaling |