Example: Creating Agents

❗️

CRITICAL INFORMATION

We strongly recommend that highly technical users (examples: IT professionals, software developers, or system administrators) perform the steps outlined in this process.

ℹ️

Note

Third-party products, such as OpenAI's API, come with their own terms and conditions.

The following example uses ChatGPT-4o to automatically update classifications in Encord each time the Agent triggers. The example uses FastAPI as a server to host and run the code. The server returns an empty 200 response, indicating that the request has succeeded and causing the Label Editor to refresh.

ℹ️

Note

Only HTTPS endpoints are supported.

Assumptions

This example makes the following assumptions:

  • The payload from Encord includes the project hash, data hash, and frame number in the following format:
{
  "dataHash": "038ed92d-dbe8-4991-a3aa-6ede35d6e62d",
  "projectHash": "027e0c65-c53f-426d-9f02-04fafe8bd80e",
  "frame": 10
}

Step 1: Create a directory to host your code

Create a directory containing the following files.

  • requirements.txt: Installs all the dependencies necessary for the example to work.
  • openai_api_agent.py: Makes the call to the OpenAI API. Ensure that you replace <classification_name> with the name of the classification you want GPT 4o to update.
  • data_models.py: Ensures that the response from OpenAI has the specific format required by Encord.
  • dependencies.py: Retrieves the information necessary for the query to be made and the label row to be updated.
  • main.py: Runs the program in the correct order.
import base64
import logging
from pathlib import Path

from encord.objects.classification import Classification
from encord.objects.classification_instance import ClassificationInstance
from encord.objects.ontology_labels_impl import LabelRowV2
from encord.objects.ontology_structure import OntologyStructure
from openai import OpenAI
from openai.types.chat import (
    ChatCompletionContentPartImageParam,
    ChatCompletionContentPartTextParam,
    ChatCompletionSystemMessageParam,
    ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_content_part_image_param import ImageURL
from openai.types.chat.completion_create_params import ResponseFormat
from pydantic import BaseModel, Field


def to_image_completion_content_part(
    image_path: Path,
) -> ChatCompletionContentPartImageParam:
    """
    Convert an image path into a base64 encoding to be sent to gpt.
    """
    with image_path.open("rb") as image_file:
        content = base64.b64encode(image_file.read()).decode("utf-8")
    return ChatCompletionContentPartImageParam(
        image_url=ImageURL(url=f"data:image/jpeg;base64,{content}", detail="auto"),
        type="image_url",
    )


def get_ontology_classification(ontology: OntologyStructure) -> Classification:
    """
    Replace <classification_name> with the name of the classification in your Ontology you want to update.
    """
    return ontology.get_child_by_title("<classification_name>", type_=Classification)


"""
Below is an example of how to define a pydantic model for extracting text.
GPT also understands if you use list types and enums. For more examples,
have a look at these notebooks:
    - [GPT-4o example with videos](https://colab.research.google.com/drive/1ctV-Zpoks7PDEXisVvpP1NeocyBkWXzp?usp=sharing)
    - [Gemini 1.5 Pro with advanced pydantic models](https://colab.research.google.com/drive/1jeCCZrumLnCwdVHbn-wK46xUPQQ9KCtf?usp=sharing)
"""


class DescriptionModel(BaseModel):
    description: str = Field(
        min_length=25,
        max_length=1000,
        description="A detailed description of the scene",
    )


def describe_scene(label_row: LabelRowV2, asset: Path) -> ClassificationInstance | None:
    system_instruction = f"""
Your are an image analysis expert. Your task is to extract the most relevant description of the image content provided.

You are expected to only respond in the form of the following JSON schema.
```json
{DescriptionModel.model_json_schema()}
```

Ensure that you do not wrap the object in a list. Only a single object conforming to the JSON schema is allowed.  
"""  
    client = OpenAI()

```
completion = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        ChatCompletionSystemMessageParam(role="system", content=system_instruction),
        ChatCompletionUserMessageParam(
            role="user",
            content=[to_image_completion_content_part(asset)]
            + [
                ChatCompletionContentPartTextParam(
                    type="text",
                    text="Please build a JSON object with respect to this visual data. Follow the JSON schema provided to fill in the schema as accurately as you can.",
                )
            ],
        ),
    ],
    response_format=ResponseFormat(type="json_object"),
    max_tokens=1000,
)

raw_text = completion.choices[0].message.content
if raw_text is None:
    logging.error("No response")
    raise ValueError("Missing response from GPT-4o")

try:
    labels = DescriptionModel.model_validate_json(raw_text)
except Exception:
    logging.error(
        "Unable to parse text",
    )
    logging.error(raw_text)
    return None

ontology_classification = get_ontology_classification(label_row.ontology_structure)
instance = ontology_classification.create_instance()
instance.set_answer(labels.description)
return instance
from pathlib import Path
from typing import Annotated

from encord.objects.ontology_labels_impl import LabelRowV2
from fastapi import Depends, FastAPI, status
from fastapi.middleware.cors import CORSMiddleware

from data_models import FrameData
from dependencies import dep_asset, dep_label_row
from openai_api_agent import describe_scene

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.encord.com"],
)


@app.post("/", status_code=status.HTTP_200_OK)
def custom_agent(
    label_row: Annotated[LabelRowV2, Depends(dep_label_row)],
    frame_data: FrameData,
    asset: Annotated[Path, Depends(dep_asset)],
) -> None:

    classification_instance = describe_scene(label_row, asset)

    if classification_instance is None:
        raise ValueError("Couldn't describe the scene")

    frame_view = label_row.get_frame_view(frame_data.frame)
    frame_view.add_classification_instance(classification_instance)
    label_row.save()
from pydantic import BaseModel, Field


class FrameData(BaseModel):
    project_hash: str = Field(
        description="The project_hash of the Project from within which the label editor triggered the agent",
        validation_alias="projectHash",
    )
    data_hash: str = Field(
        description="The data_hash for the data item that the label editor has open when triggering the agent",
        validation_alias="dataHash",
    )
    frame: int = Field(description="The current frame in the label editor")
import mimetypes
from pathlib import Path
from typing import Annotated

import cv2
import numpy as np
import requests
from encord.constants.enums import DataType
from encord.objects.ontology_labels_impl import LabelRowV2
from encord.user_client import EncordUserClient
from fastapi import Depends

from data_models import FrameData


def dep_client():
    # Assumes that the `ENCORD_SSH_KEY_FILE` env variable is set.
    return EncordUserClient.create_with_ssh_private_key()


def _guess_file_suffix(url: str, lr: LabelRowV2) -> str:
    """
    Best effort attempt to guess file suffix based on information in following order:

        0. `url`
        1. `lr.data_title`
        2. `lr.data_type` (fallback)

    args:
        - url: the data url
        - lr: the associated label row

    returns:
        A file suffix that can be used to store the file. For example, ".jpg" or ".mp4".

    """
    fallback_mimetype = "video/mp4" if lr.data_type == DataType.VIDEO else "image/png"
    mimetype, _ = next(
        (
            t
            for t in (
                mimetypes.guess_type(url),
                mimetypes.guess_type(lr.data_title),
                (fallback_mimetype, None),
            )
            if t[0] is not None
        )
    )
    if mimetype is None:
        raise ValueError("This should not have happened")

    file_type, suffix = mimetype.split("/")[:2]

    if file_type == "video" and lr.data_type != DataType.VIDEO:
        raise ValueError(
            f"Mimetype {mimetype} and lr data type {lr.data_type} did not match"
        )
    elif file_type == "image" and lr.data_type not in {
        DataType.IMG_GROUP,
        DataType.IMAGE,
    }:
        raise ValueError(
            f"Mimetype {mimetype} and lr data type {lr.data_type} did not match"
        )
    elif file_type not in {"image", "video"}:
        raise ValueError("File type not video or image. Only images or videos are supported.")

    return f".{suffix}"


def dep_label_row(
    user_client: Annotated[EncordUserClient, Depends(dep_client)], frame_data: FrameData
) -> LabelRowV2:
    """
    Match a unique label row in a Project based on data_hash.
    Additionally, initialise the label row to download the label data.
    :param parsed_request: ParsedRequest object containing the data_hash to match against.
    :param user_client: An EncordUserClient to access data.
    :return: An initialised label row matched on data_hash.
    """
    project = user_client.get_project(frame_data.project_hash)
    matched_lrs = project.list_label_rows_v2(data_hashes=[frame_data.data_hash])
    num_matches = len(matched_lrs)
    if num_matches > 1:
        raise Exception(f"Non unique match: matched {num_matches} label rows!")
    elif num_matches == 0:
        raise Exception("No label rows were matched!")
    lr = matched_lrs.pop()
    lr.initialise_labels()
    return lr


def get_frame(video_path: Path, desired_frame: int) -> np.ndarray:
    """
    Extract a given frame from a downloaded video.
    :param video_path: The path to which the video was downloaded.
    :param desired_frame: The frame which you would like to extract.
    :return: The extracted frame.
    """
    cap = cv2.VideoCapture(video_path.as_posix())
    if not cap.isOpened():
        raise Exception("Unable to open video file.")

    cap.set(cv2.CAP_PROP_POS_FRAMES, desired_frame)

    ret, frame = cap.read()
    if not ret:
        raise Exception("Unable to retrieve frame.")

    cap.release()
    return frame


def dep_asset(lr: Annotated[LabelRowV2, Depends(dep_label_row)], frame_data: FrameData):
    """
    Download the underlying asset being annotated (video, image) in a specific label row to disk.
    The downloaded asset will be named `lr.data_hash.{suffix}`.
    When the context is exited, the downloaded file will be removed.
    :param lr: The label row for whose asset should be downloaded.
    :param frame_number The frame to extract of the entire thing as is if None
    :return: The path to which the asset was downloaded.
    """
    video_item, images_list = lr._project_client.get_data(
        lr.data_hash, get_signed_url=True
    )
    if lr.data_type in [DataType.VIDEO, DataType.IMAGE] and video_item:
        url = video_item["file_link"]
    elif lr.data_type == DataType.IMG_GROUP and images_list:
        url = images_list[frame_data.frame]["file_link"]
    else:
        raise ValueError("Couldn't load asset")

    response = requests.get(url)
    response.raise_for_status()

    suffix = _guess_file_suffix(url, lr)
    file_path = Path(lr.data_hash).with_suffix(suffix)
    with open(file_path, "wb") as f:
        f.write(response.content)

    files_to_unlink = [file_path]
    if lr.data_type == DataType.VIDEO:  # Get that exact frame
        frame = get_frame(file_path, frame_data.frame)
        frame_file = file_path.with_name(
            f"{file_path.name}_{frame_data.frame}"
        ).with_suffix(".png")
        cv2.imwrite(frame_file.as_posix(), frame)
        files_to_unlink.append(frame_file)
        file_path = frame_file

    try:
        yield file_path
    finally:
        [f.unlink(missing_ok=True) for f in files_to_unlink]
encord
fastapi
pydantic >= 2.7.*
requests
opencv-python
openai

Step 2: Test your Agent locally

We strongly recommend you test your Agent locally before deploying it to your server.

  1. Create a new virtual environment and install all the requirements using the following terminal commands:
$ python -m venv venv
$ source venv/bin/activate
$ python -m pip install -r requirements.txt
  1. Run your Agent and server locally using the following terminal command. Ensure that you replace:
  • <private_key_path> with the path to your private Encord API key.
  • <openai_key_path> with the path / key to your OpenAI API key.
ENCORD_SSH_KEY_FILE=<private_key_path> OPENAI_API_KEY='<openai_key_path>' fastapi dev main.py

❗️

CRITICAL INFORMATION

Keep this terminal window open as long as you want to run your server locally. The following steps should be performed in a new terminal window.

  1. In a new terminal window, point the server to a specific data asset in Encord. This step replicates the triggering of the Agent in the Encord platform. Ensure that you:
  • Replace <project_hash> with the hash of your Project.
  • Replace <data_hash> with the hash of the data unit you want to run your Agent on.

👍

Tip

Both the <project_hash> and the <data_hash> can be found in the url of the Label Editor. The url is structured like this:
https://app.encord.com/label_editor/<project_hash>/<data_hash>

curl localhost:8000 -H "Content-type: application/json" -d '{
    "projectHash": "<project_hash>",
    "dataHash": "<data_hash>",
    "frame": 0
}'

Step 3: Deploy the agent to your server

The Agent deploys to your server using a container image, such as Docker. The exact method of deployment varies depending on your choice of server.

  • Install the requirements from the requirements.txt file. Documentation for creating a Docker image for FastAPI can be found here.

  • Ensure that you set up environment variables to authenticate with Encord and OpenAI. Information on authenticating with OpenAI can be found in their documentation here.

    • ENCORD_SSH_KEY_FILE=/path/to/your/private_key_file
    • OPENAI_API_KEY=<your_openai_api_key>