Custom Workflow Agents
models
TaskAgentReturnStruct Objects
@dataclass
class TaskAgentReturnStruct()
Return this from your agent and we will handle propagating the updates in batches
pathway
The pathway that the task will follow on task completion
label_row
The label to be saved (if present)
label_row_priority
The priority of the label row to be saved.
TaskCompletionResult Objects
class TaskCompletionResult(BaseModel)
Data model to hold information about the completion result of
encord_agents.tasks.QueueRunner agents.
dependencies
dep_client
def dep_client() -> EncordUserClient
Dependency to provide an authenticated user client.
Example:
from encord.user_client import EncordUserClient
from encord_agents.tasks.dependencies import dep_client
...
@runner.stage("<my_stage_name>")
def my_agent(
client: Annotated[EncordUserClient, Depends(dep_client)]
) -> str:
# Client will authenticated and ready to use.
client.get_dataset("")
dep_storage_item
def dep_storage_item(storage_item: StorageItem) -> StorageItem
Get the storage item associated with the underlying agent task.
The StorageItem
is useful for multiple things like
- Updating client metadata
- Reading file properties like storage location, fps, duration, DICOM tags, etc.
When marking an agent with the StorageItem dependency, we bulk fetch the storage items for the tasks
and then inject them independently with each task. Trivial method for backwards compatibility. Can do: storage_item: StorageItem directly.
Example
from encord.storage import StorageItem
from encord_agents.tasks.dependencies import dep_storage_item
@runner.stage(stage="<my_stage_name>")
def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
print(storage_item.name)
print(storage_item.client_metadata)
...
Arguments:
storage_item - StorageItem
Returns:
The storage item.
dep_single_frame
def dep_single_frame(storage_item: StorageItem) -> NDArray[np.uint8]
Dependency to inject the first frame of the underlying asset.
The downloaded asset will be named lr.data_hash.{suffix}.
When the function has finished, the downloaded file will be removed from the file system.
Example:
from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_single_frame
...
@runner.stage("<my_stage_name>")
def my_agent(
frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
) -> str:
assert frame.ndim == 3, "Will work"
Arguments:
storage_item - The Storage item. Automatically injected (see example above).
Returns:
Numpy array of shape [h, w, 3] RGB colors.
dep_video_iterator
def dep_video_iterator(
storage_item: StorageItem) -> Generator[Iterator[Frame], None, None]
Dependency to inject a video frame iterator for doing things over many frames.
This will use OpenCV and the local backend on your machine.
Decoding support may vary dependent on the video format, codec and your local configuration
Intended use
from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_video_iterator
...
@runner.stage("<my_stage_name>")
def my_agent(
video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
) -> str:
for frame in video_frames:
print(frame.frame, frame.content.shape)
Arguments:
storage_item - Automatically injected Storage item dependency.
Raises:
NotImplementedError - Will fail for other data types than video.
Yields:
An iterator.
dep_video_sampler
def dep_video_sampler(
storage_item: StorageItem
) -> Generator[Callable[[float | Sequence[int]], Iterable[Frame]], None, None]
Dependency to inject a video sampler for doing things over many frames.
This will use OpenCV and the local backend on your machine.
Decoding support may vary dependent on the video format, codec and your local configuration.
Arguments:
-
storage_item - Automatically injected Storage item dependency.
Example:
from encord_agents.tasks.dependencies import dep_video_sampler
...
runner = Runner(project_hash="<project_hash_a>")
@runner.stage("<stage_name_or_uuid>")
def my_agent(
video_sampler: Annotated[Callable[[float | Sequence[int]], Iterable[Frame]], Depends(dep_video_sampler)],
) -> str | None:
for frame in video_sampler(1/5):
# Get every 5th frame
# i.e: [0,5,10,15,...]
for frame in video_sampler([1, 2, 3]):
# Get frames 1, 2, 3
...
dep_asset
def dep_asset(storage_item: StorageItem) -> Generator[Path, None, None]
Get a local file path to data asset temporarily stored till end of task execution.
This dependency will fetch the underlying data asset based on a signed url.
It will temporarily store the data on disk. Once the task is completed, the
asset will be removed from disk again.
Example:
from encord_agents.tasks.dependencies import dep_asset
...
runner = Runner(project_hash="<project_hash_a>")
@runner.stage("<stage_name_or_uuid>")
def my_agent(
asset: Annotated[Path, Depends(dep_asset)],
) -> str | None:
asset.stat() # read file stats
...
Returns:
The path to the asset.
Raises:
ValueError - if the underlying assets are not videos, images, or audio.
EncordException - if data type not supported by SDK yet.
Twin Objects
@dataclass(frozen=True)
class Twin()
Dataclass to hold “label twin” information.
dep_twin_label_row
def dep_twin_label_row(
twin_project_hash: str,
init_labels: bool = True,
include_task: bool = False) -> Callable[[LabelRowV2], Twin | None]
Dependency to link assets between two Projects. When your Runner in running on
<project_hash_a>, you can use this to get a Twin of labels and the underlying
task in the “twin project” with <project_hash_b>.
This is useful in situations like:
- When you want to transfer labels from a source project” to a sink project.
- If you want to compare labels to labels from other projects upon label submission.
- If you want to extend an existing project with labels from another project on the same underlying data.
Example:
from encord.workflow.common import WorkflowTask
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
...
runner = Runner(project_hash="<project_hash_a>")
@runner.stage("<my_stage_name_in_project_a>")
def my_agent(
project_a_label_row: LabelRowV2,
twin: Annotated[
Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
],
) -> str | None:
label_row_from_project_b: LabelRowV2 = twin.label_row
task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)
Arguments:
twin_project_hash - The project has of the twin project (attached to the same datasets)
from which you want to load the additional data.
init_labels - If true, the label row will be initialized before calling the agent.
include_task - If true, the task field of the Twin will be populated. If population
fails, e.g., for non-workflow projects, the task will also be None.
Returns:
The twin.
Raises:
encord.AuthorizationError - if you do not have access to the project.
dep_data_lookup
def dep_data_lookup(
lookup: Annotated[DataLookup,
Depends(DataLookup.sharable)]) -> DataLookup
Get a lookup to easily retrieve data rows and storage items associated with the given task.
!!! warning “Deprecated”
dep_data_lookup is deprecated and will be removed in version 0.2.10.
Use dep_storage_item instead for accessing storage items.
Migration Guide:
## Old way (deprecated)
from encord_agents.tasks.dependencies import dep_data_lookup, DataLookup
@runner.stage(stage="Agent 1")
def my_agent(
task: AgentTask,
lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
) -> str:
storage_item = lookup.get_storage_item(task.data_hash)
client_metadata = storage_item.client_metadata
...
## New way (recommended)
from encord_agents.tasks.dependencies import dep_storage_item
@runner.stage(stage="Agent 1")
def my_agent(
task: AgentTask,
storage_item: Annotated[StorageItem, Depends(dep_storage_item)]
) -> str:
# storage_item is directly available
client_metadata = storage_item.client_metadata
# Update metadata
storage_item.update(
client_metadata={
"new": "entry",
**(client_metadata or {})
}
)
...
Arguments:
lookup - The object that you can use to lookup data rows and storage items. Automatically injected.
Returns:
The (shared) lookup object.
runner.queue_runner
QueueRunner Objects
class QueueRunner(RunnerBase)
This class is intended to hold agent implementations.
It makes it easy to put agent task specifications into
a queue and then execute them in a distributed fashion.
Below is a template for how that would work.
Example:
runner = QueueRunner(project_hash="...")
@runner.stage("Agent 1")
def my_agent_implementation() -> str:
# ... do your thing
return "<pathway_name>"
## Populate the queue
my_queue = ...
for stage in runner.get_agent_stages():
for task in stage.get_tasks():
my_queue.append(task.model_dump_json())
## Execute on the queue
while my_queue:
task_spec = my_queue.pop()
result_json = my_agent_implementation(task_spec)
result = TaskCompletionResult.model_validate_json(result_json)
__init__
def __init__(project_hash: str | UUID)
Initialize the QueueRunner with a project hash.
This is the hash of the project that you want to run the tasks on.
Arguments:
project_hash - The hash of the project to run the tasks on.
stage
def stage(
stage: str | UUID,
*,
label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs
| None = None,
will_set_priority: bool = False
) -> Callable[[Callable[..., TaskAgentReturnType]], Callable[[str], str]]
Agent wrapper intended for queueing systems and distributed workloads.
Define your agent as you are used to with dependencies in the method declaration and
return the pathway from the project workflow that the task should follow upon completion.
The function will be wrapped in logic that does the following (in pseudo code):
When you have an encord.workflow.stages.agent.AgentTask instance at hand, let’s call
it task, then you can call your wrapped_function with task.model_dump_json().
Similarly, you can put task.model_dump_json() int a queue and read from that queue, e.g.,
from another instance/process, to execute wrapped_function there.
As the pseudo code indicates, wrapped_function understands how to take that string from
the queue and resolve all your defined dependencies before calling your_function.
@runner.stage("stage_name")
def my_function(...)
...
## is equivalent to
def wrapped_function(task_json_spec: str) -> str (result_json):
task = fetch_task(task_sped)
resources = load_resources(task)
pathway = your_function(resources) # <- this is where your code goes
task.proceed(pathway)
return TaskCompletionResult.model_dump_json()
Arguments:
stage - The name or uuid of the stage that the function should be
associated with.
label_row_metadata_include_args - Arguments to be passed to
project.list_label_rows_v2(...)
label_row_initialise_labels_args - Arguments to be passed to
label_row.initialise_labels(...)
will_set_priority - Indicates whether you will be returning a TaskAgentReturnStruct
with a label_row_priority field set. This field is only required if you are
returning the priority of the label row but not depending on the label row it self.
That is, if your function signature does not include a LabelRowV2 parameter.
Returns:
The decorated function.
get_agent_stages
def get_agent_stages() -> Iterable[AgentStage]
Get the agent stages for which there exist an agent implementation.
This function is intended to make it easy to iterate through all current
agent tasks and put the task specs into external queueing systems like
Celery or Modal.
For a concrete example, please see the doc string for the class it self.
Note that if you didn’t specify an implementation (by decorating your
function with @runner.stage) for a given agent stage, the stage will
not show up by calling this function.
Returns:
An iterable over encord.workflow.stages.agent.AgentStage objects
where the runner contains an agent implementation.
Raises:
AssertionError - if the runner does not have an associated project.
runner.runner_base
RunnerBase Objects
__init__
def __init__(project_hash: str | UUID | None = None)
Initialize the runner with an optional project hash.
The project_hash will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
-
project_hash - The project hash that the runner applies to.
Can be left unspecified to be able to reuse same runner on multiple projects.
runner.sequential_runner
SequentialRunner Objects
class SequentialRunner(RunnerBase)
Runs agents against Workflow projects.
When called, it will iteratively run agent stages until they are empty. By default, runner exits after finishing the tasks identified at the point of trigger. To automatically re-run, you can use the refresh_every keyword.
Example:
from uuid import UUID
from encord_agents.tasks import Runner
runner = Runner()
@runner.stage("<workflow_node_name>")
## or
@runner.stage("<workflow_node_uuid>")
def my_agent(task: AgentTask) -> str | UUID | None:
...
return "pathway name" # or pathway uuid
runner(project_hash="<project_hash>") # (see __call__ for more arguments)
## or
if __name__ == "__main__":
# for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
runner.run()
__init__
def __init__(project_hash: str | None = None,
*,
pre_execution_callback: Callable[[Self], None] | None = None)
Initialize the runner with an optional project hash.
The project_hash will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
-
project_hash - The project hash that the runner applies to.
Can be left unspecified to be able to reuse same runner on multiple projects.
-
pre_execution_callback - Callable[RunnerBase, None]
Allows for optional additional validation e.g. Check specific Ontology form
stage
def stage(
stage: str | UUID,
*,
label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs
| None = None,
overwrite: bool = False,
will_set_priority: bool = False
) -> Callable[[DecoratedCallable], DecoratedCallable]
Decorator to associate a function with an agent stage.
A function decorated with a stage is added to the list of stages
that will be handled by the runner.
The runner will call the function for every task which is in that
stage.
Example:
The function declaration can be any function that takes parameters
that are type annotated with the following types:
- [Project][docs-project]: the
encord.project.Project
that the runner is operating on.
- [LabelRowV2][docs-label-row]: the
encord.objects.LabelRowV2
that the task is associated with.
- [AgentTask][docs-project]: the
encord.workflow.stages.agent.AgentTask
that the task is associated with.
- Any other type: which is annotated with a dependency
All those parameters will be automatically injected when the agent is called.
Example:
runner = Runner()
@runner.stage("<stage_name_or_uuid>")
def my_func() -> str | None:
...
return "<pathway_name or pathway_uuid>"
from typing import Iterator
from typing_extensions import Annotated
from encord.project import Project
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import dep_video_iterator
from encord.workflow.stages.agent import AgentTask
runner = Runner()
def random_value() -> float:
import random
return random.random()
@runner.stage("<stage_name_or_uuid>")
def my_func(
project: Project,
lr: LabelRowV2,
task: AgentTask,
video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
custom: Annotated[float, Depends(random_value)]
) -> str | None:
...
return "<pathway_name or pathway_uuid>"
Arguments:
stage - The name or uuid of the stage that the function should be
associated with.
label_row_metadata_include_args - Arguments to be passed to
project.list_label_rows_v2(...)
label_row_initialise_labels_args - Arguments to be passed to
label_row.initialise_labels(...)
overwrite - Overwrite the method associated to this stage if it already exists
will throw an error otherwise
will_set_priority - Indicates whether you will be returning a TaskAgentReturnStruct
with a label_row_priority field set. This field is only required if you are
returning the priority of the label row but not depending on the label row it self.
That is, if your function signature does not include a LabelRowV2 parameter.
Returns:
The decorated function.
__call__
def __call__(
refresh_every: Annotated[
Optional[int],
Option(
help=
"Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
),
] = None,
num_retries: Annotated[
int,
Option(
help=
"If an agent fails on a task, how many times should the runner retry it?"
)] = 3,
task_batch_size: Annotated[
int,
Option(
help=
"Number of tasks for which labels are loaded into memory at once."
)] = 300,
project_hash: Annotated[
Optional[str],
Option(help="The project hash if not defined at runner instantiation."
)] = None,
max_tasks_per_stage: Annotated[
Optional[int],
Option(
help=
"Max number of tasks to try to process per stage on a given run. If `None`, will attempt all",
),
] = None
) -> None
Run your agent runner(...).
???+ info “Self-updating/Polling runner”
The runner can continuously poll new tasks in the project and execute the defined stage agents.
To do so, please set the refresh_every parameter.
When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.
Arguments:
refresh_every - Fetch task statuses from the Encord Project every refresh_every seconds.
If None, the runner will exit once task queue is empty.
num_retries - If an agent fails on a task, how many times should the runner retry it?
task_batch_size - Number of tasks for which labels are loaded into memory at once.
project_hash - The project hash if not defined at runner instantiation.
Returns:
None
run
Execute the runner.
This function is intended to be called from the “main file”.
It is an entry point to be able to run the agent(s) via your shell
with command line arguments.
Example:
You can then run execute the runner with:
to see the options is has (it’s those from Runner.__call__).
runner = Runner(project_hash="<your_project_hash>")
@runner.stage(stage="...")
def your_func() -> str:
...
if __name__ == "__main__":
runner.run()