Task Agents
models
TaskAgentReturnStruct Objects
pathway
The pathway that the task will follow on task completionlabel_row
The label to be saved (if present)label_row_priority
The priority of the label row to be saved.TaskCompletionResult Objects
encord_agents.tasks.QueueRunner
agents.
dependencies
dep_client
dep_storage_item
StorageItem
is useful for multiple things like
- Updating client metadata
- Reading file properties like storage location, fps, duration, DICOM tags, etc.
storage_item
- StorageItem
dep_single_frame
lr.data_hash.{suffix}
.
When the function has finished, the downloaded file will be removed from the file system.
Example:
storage_item
- The Storage item. Automatically injected (see example above).
dep_video_iterator
storage_item
- Automatically injected Storage item dependency.
NotImplementedError
- Will fail for other data types than video.
dep_video_sampler
-
storage_item
- Automatically injected Storage item dependency. Example:
dep_asset
ValueError
- if the underlying assets are not videos, images, or audio.EncordException
- if data type not supported by SDK yet.
Twin Objects
dep_twin_label_row
Runner
in running on
<project_hash_a>
, you can use this to get a Twin
of labels and the underlying
task in the “twin project” with <project_hash_b>
.
This is useful in situations like:
- When you want to transfer labels from a source project” to a sink project.
- If you want to compare labels to labels from other projects upon label submission.
- If you want to extend an existing project with labels from another project on the same underlying data.
twin_project_hash
- The project has of the twin project (attached to the same datasets) from which you want to load the additional data.init_labels
- If true, the label row will be initialized before calling the agent.include_task
- If true, thetask
field of theTwin
will be populated. If population fails, e.g., for non-workflow projects, the task will also be None.
encord.AuthorizationError
- if you do not have access to the project.
dep_data_lookup
dep_data_lookup
is deprecated and will be removed in version 0.2.10.
Use dep_storage_item
instead for accessing storage items.
Migration Guide:
lookup
- The object that you can use to lookup data rows and storage items. Automatically injected.
runner.queue_runner
QueueRunner Objects
__init__
project_hash
- The hash of the project to run the tasks on.
stage
encord.workflow.stages.agent.AgentTask
instance at hand, let’s call
it task
, then you can call your wrapped_function
with task.model_dump_json()
.
Similarly, you can put task.model_dump_json()
int a queue and read from that queue, e.g.,
from another instance/process, to execute wrapped_function
there.
As the pseudo code indicates, wrapped_function
understands how to take that string from
the queue and resolve all your defined dependencies before calling your_function
.
stage
- The name or uuid of the stage that the function should be associated with.label_row_metadata_include_args
- Arguments to be passed toproject.list_label_rows_v2(...)
label_row_initialise_labels_args
- Arguments to be passed tolabel_row.initialise_labels(...)
will_set_priority
- Indicates whether you will be returning aTaskAgentReturnStruct
with alabel_row_priority
field set. This field is only required if you are returning the priority of the label row but not depending on the label row it self. That is, if your function signature does not include aLabelRowV2
parameter.
get_agent_stages
@runner.stage
) for a given agent stage, the stage will
not show up by calling this function.
Returns:
An iterable over encord.workflow.stages.agent.AgentStage
objects
where the runner contains an agent implementation.
Raises:
AssertionError
- if the runner does not have an associated project.
runner.runner_base
RunnerBase Objects
__init__
project_hash
will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
-
project_hash
- The project hash that the runner applies to. Can be left unspecified to be able to reuse same runner on multiple projects.
runner.sequential_runner
SequentialRunner Objects
refresh_every
keyword.
Example:
example_agent.py
__init__
project_hash
will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
-
project_hash
- The project hash that the runner applies to. Can be left unspecified to be able to reuse same runner on multiple projects. -
pre_execution_callback
- Callable[RunnerBase, None] Allows for optional additional validation e.g. Check specific Ontology form
stage
- Project: the
encord.project.Project
that the runner is operating on. - LabelRowV2: the
encord.objects.LabelRowV2
that the task is associated with. - AgentTask: the
encord.workflow.stages.agent.AgentTask
that the task is associated with. - Any other type: which is annotated with a dependency
stage
- The name or uuid of the stage that the function should be associated with.label_row_metadata_include_args
- Arguments to be passed toproject.list_label_rows_v2(...)
label_row_initialise_labels_args
- Arguments to be passed tolabel_row.initialise_labels(...)
overwrite
- Overwrite the method associated to this stage if it already exists will throw an error otherwisewill_set_priority
- Indicates whether you will be returning aTaskAgentReturnStruct
with alabel_row_priority
field set. This field is only required if you are returning the priority of the label row but not depending on the label row it self. That is, if your function signature does not include aLabelRowV2
parameter.
__call__
runner(...)
.
???+ info “Self-updating/Polling runner”
The runner can continuously poll new tasks in the project and execute the defined stage agents.
To do so, please set the refresh_every
parameter.
When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.
Arguments:
refresh_every
- Fetch task statuses from the Encord Project everyrefresh_every
seconds. IfNone
, the runner will exit once task queue is empty.num_retries
- If an agent fails on a task, how many times should the runner retry it?task_batch_size
- Number of tasks for which labels are loaded into memory at once.project_hash
- The project hash if not defined at runner instantiation.
run
Runner.__call__
).
example.py