Agents Reference Task Agents
Task Agents
models
TaskAgentReturnStruct Objects
Return this from your agent and we will handle propagating the updates in batches
pathway
The pathway that the task will follow on task completion
label_row
The label to be saved (if present)
label_row_priority
The priority of the label row to be saved.
TaskCompletionResult Objects
Data model to hold information about the completion result of
encord_agents.tasks.QueueRunner
agents.
dependencies
dep_client
Dependency to provide an authenticated user client.
Example:
dep_storage_item
Get the storage item associated with the underlying agent task.
The StorageItem
is useful for multiple things like
- Updating client metadata
- Reading file properties like storage location, fps, duration, DICOM tags, etc.
Note: When marking a task agent with the StorageItem dependency, we will bulk fetch the storage items for the tasks and then inject them independently with each task. Trivial method for backwards compatibility. Can do: storage_item: StorageItem directly
Example
Arguments:
storage_item
- StorageItem
Returns:
The storage item.
dep_single_frame
Dependency to inject the first frame of the underlying asset.
The downloaded asset will be named lr.data_hash.{suffix}
.
When the function has finished, the downloaded file will be removed from the file system.
Example:
Arguments:
storage_item
- The Storage item. Automatically injected (see example above).
Returns:
Numpy array of shape [h, w, 3] RGB colors.
dep_video_iterator
Dependency to inject a video frame iterator for doing things over many frames. This will use OpenCV and the local backend on your machine. Decoding support may vary dependent on the video format, codec and your local configuration
Intended use
Arguments:
storage_item
- Automatically injected Storage item dependency.
Raises:
NotImplementedError
- Will fail for other data types than video.
Yields:
An iterator.
dep_video_sampler
Dependency to inject a video sampler for doing things over many frames. This will use OpenCV and the local backend on your machine. Decoding support may vary dependent on the video format, codec and your local configuration.
Arguments:
-
storage_item
- Automatically injected Storage item dependency.Example:
dep_asset
Get a local file path to data asset temporarily stored till end of task execution.
This dependency will fetch the underlying data asset based on a signed url. It will temporarily store the data on disk. Once the task is completed, the asset will be removed from disk again.
Example:
Returns:
The path to the asset.
Raises:
ValueError
- if the underlying assets are not videos, images, or audio.EncordException
- if data type not supported by SDK yet.
Twin Objects
Dataclass to hold “label twin” information.
dep_twin_label_row
Dependency to link assets between two Projects. When your Runner
in running on
<project_hash_a>
, you can use this to get a Twin
of labels and the underlying
task in the “twin project” with <project_hash_b>
.
This is useful in situations like:
- When you want to transfer labels from a source project” to a sink project.
- If you want to compare labels to labels from other projects upon label submission.
- If you want to extend an existing project with labels from another project on the same underlying data.
Example:
Arguments:
twin_project_hash
- The project has of the twin project (attached to the same datasets) from which you want to load the additional data.init_labels
- If true, the label row will be initialized before calling the agent.include_task
- If true, thetask
field of theTwin
will be populated. If population fails, e.g., for non-workflow projects, the task will also be None.
Returns:
The twin.
Raises:
encord.AuthorizationError
- if you do not have access to the project.
dep_data_lookup
Get a lookup to easily retrieve data rows and storage items associated with the given task.
!!! warning “Deprecated”
dep_data_lookup
is deprecated and will be removed in version 0.2.10.
Use dep_storage_item
instead for accessing storage items.
Migration Guide:
Arguments:
lookup
- The object that you can use to lookup data rows and storage items. Automatically injected.
Returns:
The (shared) lookup object.
runner.queue_runner
QueueRunner Objects
This class is intended to hold agent implementations. It makes it easy to put agent task specifications into a queue and then execute them in a distributed fashion.
Below is a template for how that would work.
Example:
__init__
Initialize the QueueRunner with a project hash.
This is the hash of the project that you want to run the tasks on.
Arguments:
project_hash
- The hash of the project to run the tasks on.
stage
Agent wrapper intended for queueing systems and distributed workloads.
Define your agent as you are used to with dependencies in the method declaration and return the pathway from the project workflow that the task should follow upon completion. The function will be wrapped in logic that does the following (in pseudo code):
When you have an encord.workflow.stages.agent.AgentTask
instance at hand, let’s call
it task
, then you can call your wrapped_function
with task.model_dump_json()
.
Similarly, you can put task.model_dump_json()
int a queue and read from that queue, e.g.,
from another instance/process, to execute wrapped_function
there.
As the pseudo code indicates, wrapped_function
understands how to take that string from
the queue and resolve all your defined dependencies before calling your_function
.
Arguments:
stage
- The name or uuid of the stage that the function should be associated with.label_row_metadata_include_args
- Arguments to be passed toproject.list_label_rows_v2(...)
label_row_initialise_labels_args
- Arguments to be passed tolabel_row.initialise_labels(...)
will_set_priority
- Indicates whether you will be returning aTaskAgentReturnStruct
with alabel_row_priority
field set. This field is only required if you are returning the priority of the label row but not depending on the label row it self. That is, if your function signature does not include aLabelRowV2
parameter.
Returns:
The decorated function.
get_agent_stages
Get the agent stages for which there exist an agent implementation.
This function is intended to make it easy to iterate through all current agent tasks and put the task specs into external queueing systems like Celery or Modal.
For a concrete example, please see the doc string for the class it self.
Note that if you didn’t specify an implementation (by decorating your
function with @runner.stage
) for a given agent stage, the stage will
not show up by calling this function.
Returns:
An iterable over encord.workflow.stages.agent.AgentStage
objects
where the runner contains an agent implementation.
Raises:
AssertionError
- if the runner does not have an associated project.
runner.runner_base
RunnerBase Objects
__init__
Initialize the runner with an optional project hash.
The project_hash
will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
-
project_hash
- The project hash that the runner applies to.Can be left unspecified to be able to reuse same runner on multiple projects.
runner.sequential_runner
SequentialRunner Objects
Runs agents against Workflow projects.
When called, it will iteratively run agent stages till they are empty.
By default, runner will exit after finishing the tasks identified at the point of trigger.
To automatically re-run, you can use the refresh_every
keyword.
Example:
__init__
Initialize the runner with an optional project hash.
The project_hash
will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
-
project_hash
- The project hash that the runner applies to.Can be left unspecified to be able to reuse same runner on multiple projects.
-
pre_execution_callback
- Callable[RunnerBase, None]Allows for optional additional validation e.g. Check specific Ontology form
stage
Decorator to associate a function with an agent stage.
A function decorated with a stage is added to the list of stages that will be handled by the runner. The runner will call the function for every task which is in that stage.
Example:
The function declaration can be any function that takes parameters that are type annotated with the following types:
- Project: the
encord.project.Project
that the runner is operating on. - LabelRowV2: the
encord.objects.LabelRowV2
that the task is associated with. - AgentTask: the
encord.workflow.stages.agent.AgentTask
that the task is associated with. - Any other type: which is annotated with a dependency
All those parameters will be automatically injected when the agent is called.
Example:
Arguments:
stage
- The name or uuid of the stage that the function should be associated with.label_row_metadata_include_args
- Arguments to be passed toproject.list_label_rows_v2(...)
label_row_initialise_labels_args
- Arguments to be passed tolabel_row.initialise_labels(...)
overwrite
- Overwrite the method associated to this stage if it already exists will throw an error otherwisewill_set_priority
- Indicates whether you will be returning aTaskAgentReturnStruct
with alabel_row_priority
field set. This field is only required if you are returning the priority of the label row but not depending on the label row it self. That is, if your function signature does not include aLabelRowV2
parameter.
Returns:
The decorated function.
__call__
Run your task agent runner(...)
.
???+ info “Self-updating/Polling runner”
The runner can continuously poll new tasks in the project and execute the defined stage agents.
To do so, please set the refresh_every
parameter.
When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.
Arguments:
refresh_every
- Fetch task statuses from the Encord Project everyrefresh_every
seconds. IfNone
, the runner will exit once task queue is empty.num_retries
- If an agent fails on a task, how many times should the runner retry it?task_batch_size
- Number of tasks for which labels are loaded into memory at once.project_hash
- The project hash if not defined at runner instantiation.
Returns:
None
run
Execute the runner.
This function is intended to be called from the “main file”. It is an entry point to be able to run the agent(s) via your shell with command line arguments.
Example:
You can then run execute the runner with:
to see the options is has (it’s those from Runner.__call__
).