Point Cloud Data (PCD) Projects are multi-modal projects that involve labeling and reviewing 3D point cloud data. Autonomous driving, robotics, and drone technology are all examples of Projects that PCD Projects were built to support.

Data for PCD Projects

PCD Projects use “Scenes”. A Scene is a bundle of images and PCD files bound together as a coherent group. Scenes are the data units that your Taskers, and possibly your Agents, work with to create and review your labels. To register/import point cloud data into Encord, the data needs to be mirrored exactly in the cloud and locally. The main.py script to create Scenes in Encord for an autonomous driving project.
main.py
# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "matplotlib>=3.10.3",
#   "np>=1.0.2",
#   "nuscenes-devkit>=1.1.9",
#   "pillow>=11.3.0",
#   "pydantic>=2.11.5",
#   "pypcd4>=1.2.1",
#   "requests>=2.32.3",
#   "scipy>=1.15.3",
#   "tqdm>=4.67.1",
# ]
# ///
from __future__ import annotations

import argparse
import json
import os
import pathlib
import re
import shutil
import tarfile
from dataclasses import dataclass
from enum import StrEnum, auto
from math import floor
from typing import Annotated, Any, Literal

import numpy as np
import pypcd4
import requests
import tqdm
from nuscenes import nuscenes
from pydantic import BaseModel, ConfigDict, Field
from scipy.spatial.transform import Rotation

"""
This script processes scenes from the nuScenes (https://www.nuscenes.org/) dataset and converts them into a
the Encord upload JSON format for visualization and annotation. It can handle
lidar, radar, and camera data, as well as 3D annotations and ego-vehicle poses.
The script downloads the nuScenes minisplit if not found locally, and processes it, including:
- Converting the point cloud data from .bin to .pcd
- Timestamps are normalized to start from 0 at the beginning of the scene
- Converting positions so that the vehicle's starting position is treated as the origin (0, 0, 0)
"""


def snake2camel(snake: str, start_lower: bool = True) -> str:
    """
    Converts a snake_case string to camelCase.

    The `start_lower` argument determines whether the first letter in the generated camelcase should
    be lowercase (if `start_lower` is True), or capitalized (if `start_lower` is False).
    """
    camel = snake.title()
    camel = re.sub("([0-9A-Za-z])_(?=[0-9A-Z])", lambda m: m.group(1), camel)
    if start_lower:
        camel = re.sub("(^_*[A-Z])", lambda m: m.group(1).lower(), camel)
    return camel


class CamelModel(BaseModel):
    model_config = ConfigDict(alias_generator=snake2camel, populate_by_name=True)


@dataclass
class CameraIntrinsics:
    fx: Annotated[float, Field(description="Focal length x")]
    fy: Annotated[float, Field(description="Focal length y")]
    ox: Annotated[float, Field(description="Principal point offset x")]
    oy: Annotated[float, Field(description="Principal point offset y")]
    s: Annotated[float, Field(description="Axis skew")]


@dataclass
class CameraExtrinsics:
    rotation: Annotated[
        tuple[float, float, float, float, float, float, float, float, float],
        Field(description="Rotation matrix R"),
    ]
    position: Annotated[
        tuple[float, float, float], Field(description="Translation vector T")
    ]


@dataclass
class CameraParams:
    width_px: int
    height_px: int
    intrinsics: Annotated[CameraIntrinsics, Field(description="The intrinsic matrix K")]
    extrinsics: Annotated[
        CameraExtrinsics, Field(description="The extrinsic 4x4 matrix R|T")
    ]


@dataclass
class FrameOfReference:
    id: Annotated[str, Field(description="ID of this frame of reference")]
    parent_FOR: Annotated[
        str | None, Field(description="ID of a parent frame of reference")
    ]
    rotation: tuple[float, float, float, float, float, float, float, float, float]
    position: tuple[float, float, float]


Position = tuple[float, float, float]
EulerOrientation = tuple[float, float, float]
Size = tuple[float, float, float]


class Pose(CamelModel):
    position: Position
    orientation: EulerOrientation


class CuboidGeometry(CamelModel):
    type: Literal["cuboid"] = "cuboid"
    pose: Pose
    size: Size


@dataclass
class _FORIdMixin:
    frame_of_reference_id: Annotated[
        str | None, Field(description="ID of the frame of reference the entity is in")
    ] = None


@dataclass
class _URIMixin:
    uri: str


@dataclass
class _EventMixin:
    timestamp: float | None = None


class URIEvent(CamelModel, _EventMixin, _URIMixin):
    pass


class CameraParamsEvent(CamelModel, _EventMixin, CameraParams):
    pass


class FOREvent(CamelModel, _EventMixin, FrameOfReference):
    pass


class ModelEvent(CamelModel, _EventMixin):
    geometries: list[CuboidGeometry]


class CompositeScene(CamelModel):
    type: Literal["composite"] = "composite"
    streams: dict[str, EventStream]


class EntityType(StrEnum):
    POINT_CLOUD = auto()
    FRAME_OF_REFERENCE = auto()
    IMAGE = auto()
    MODEL = auto()
    CAMERA_PARAMETERS = auto()


class PCDStream(CamelModel, _FORIdMixin):
    entity_type: Literal[EntityType.POINT_CLOUD] = EntityType.POINT_CLOUD
    events: Annotated[list[URIEvent], Field(description="List of point cloud events")]


class CameraStream(CamelModel, _FORIdMixin):
    entity_type: Literal[EntityType.CAMERA_PARAMETERS] = EntityType.CAMERA_PARAMETERS
    events: list[CameraParamsEvent]


class ImageStream(CamelModel):
    entity_type: Literal[EntityType.IMAGE] = EntityType.IMAGE
    events: list[URIEvent]
    camera_id: Annotated[
        str | None,
        Field(
            description="ID of the camera associated with the image. Used to position the image in-scene"
        ),
    ]


class ModelStream(CamelModel):
    entity_type: Literal[EntityType.MODEL] = EntityType.MODEL
    events: list[URIEvent | ModelEvent]
    camera_id: str | None


class FORStream(CamelModel):
    entity_type: Literal[EntityType.FRAME_OF_REFERENCE] = EntityType.FRAME_OF_REFERENCE
    events: Annotated[
        list[FOREvent], Field(description="List of frame of reference events")
    ]


class EventStream(CamelModel):
    type: Literal["event"] = "event"
    id: str
    stream: Annotated[
        PCDStream | CameraStream | FORStream | ImageStream | ModelStream,
        Field(discriminator="entity_type"),
    ]


DATASET_DIR = pathlib.Path("./dataset")


class Config:
    env: str
    output_dir: pathlib.Path
    base_url: str

    def __init__(self):
        self.env = "remote"
        self.output_dir = pathlib.Path("./scenes")
        self.base_url = (
            "https://storage.cloud.google.com/my-bucket-name/scenes/nuscenes" # Replace this with the file path in your bucket to the dataset
        )


config = Config()


def ensure_scene_available(
    root_dir: pathlib.Path, dataset_version: str, scene_name: str
) -> None:
    """
    Ensure that the specified scene is available.

    Downloads minisplit into root_dir if scene_name is part of it and root_dir is empty.

    Raises ValueError if scene is not available and cannot be downloaded.
    """
    try:
        nusc = nuscenes.NuScenes(
            version=dataset_version, dataroot=str(root_dir), verbose=False
        )
    except AssertionError:  # dataset initialization failed
        if dataset_version == "v1.0-mini":
            download_minisplit(root_dir)
            nusc = nuscenes.NuScenes(
                version=dataset_version, dataroot=str(root_dir), verbose=False
            )
        else:
            print(
                f"Could not find dataset at {root_dir} and could not automatically download specified scene."
            )
            exit()

    scene_names = [s["name"] for s in nusc.scene]
    if scene_name not in scene_names:
        raise ValueError(f"{scene_name=} not found in dataset")


def nuscene_sensor_names(nusc: nuscenes.NuScenes, scene_name: str) -> list[str]:
    """Return all sensor names in the scene."""

    sensor_names = set()

    scene = next(s for s in nusc.scene if s["name"] == scene_name)
    first_sample = nusc.get("sample", scene["first_sample_token"])
    for sample_data_token in first_sample["data"].values():
        sample_data = nusc.get("sample_data", sample_data_token)
        if sample_data["sensor_modality"] == "camera":
            current_camera_token = sample_data_token
            while current_camera_token != "":
                sample_data = nusc.get("sample_data", current_camera_token)
                sensor_name = sample_data["channel"]
                sensor_names.add(sensor_name)
                current_camera_token = sample_data["next"]

    # For a known set of cameras, order the sensors in a circle.
    ordering = {
        "CAM_FRONT_LEFT": 0,
        "CAM_FRONT": 1,
        "CAM_FRONT_RIGHT": 2,
        "CAM_BACK_RIGHT": 3,
        "CAM_BACK": 4,
        "CAM_BACK_LEFT": 5,
    }
    return sorted(
        sensor_names, key=lambda sensor_name: ordering.get(sensor_name, float("inf"))
    )


# Write all uri assets required for the scene to a separate output directory
def write_asset(path: pathlib.Path):
    shutil.copyfile(path, pathlib.Path("./output") / path.name)


def write_nuscenes_json(scene: CompositeScene, name: str):
    OUTPUT_FILE = config.output_dir / "nuscenes.json"
    with open(OUTPUT_FILE, "w") as f:
        dummy_json = scene.model_dump_json(by_alias=True, indent=2)
        f.write(dummy_json)
        print("Wrote to", OUTPUT_FILE)


def write_upload_json(scenes: list[tuple[CompositeScene, str]]):
    scenes_final = []
    for scene, name in scenes:
        streams = list(scene.model_dump(by_alias=True)["streams"].values())
        scenes_final.append(
            {
                "title": name,
                "streams": streams,
            }
        )

    final = {"scenes": scenes_final}

    OUTPUT_FILE = config.output_dir / "upload.json"
    with open(OUTPUT_FILE, "w") as f:
        json.dump(final, f, indent=2)
        print("Wrote to", OUTPUT_FILE)


first_timestamp = 0
first_position = [0, 0, 0]
hz = 0


def sub(a, b) -> tuple[float, float, float]:
    return [a[i] - b[i] for i in range(len(a))]


def log_nuscenes(
    nusc: nuscenes.NuScenes, scene_name: str, max_time_sec: float, sample_hz: float
) -> CompositeScene:
    """Log nuScenes scene."""
    print(f"Logging scene {scene_name}")

    result = CompositeScene(streams={})

    scene = next(s for s in nusc.scene if s["name"] == scene_name)

    location = nusc.get("log", scene["log_token"])["location"]

    # Get the first sample
    first_sample_token = scene["first_sample_token"]
    first_sample = nusc.get("sample", scene["first_sample_token"])

    # Get the timestamp (in seconds)
    global first_timestamp
    first_timestamp = first_sample["timestamp"] / 1e6
    global first_position
    first_position = (0, 0, 0)
    global hz
    hz = sample_hz

    first_lidar_tokens = []
    first_radar_tokens = []
    first_camera_tokens = []
    for sample_data_token in first_sample["data"].values():
        sample_data = nusc.get("sample_data", sample_data_token)
        log_sensor_calibration(result, sample_data, nusc)

        if sample_data["sensor_modality"] == "lidar":
            first_lidar_tokens.append(sample_data_token)
        elif sample_data["sensor_modality"] == "radar":
            first_radar_tokens.append(sample_data_token)
        elif sample_data["sensor_modality"] == "camera":
            first_camera_tokens.append(sample_data_token)

    first_timestamp_us = nusc.get("sample_data", first_lidar_tokens[0])["timestamp"]
    max_timestamp_us = first_timestamp_us + 1e6 * max_time_sec

    log_lidar_and_ego_pose(result, location, first_lidar_tokens, nusc, max_timestamp_us)
    log_cameras(result, first_camera_tokens, nusc, max_timestamp_us)
    log_radars(result, first_radar_tokens, nusc, max_timestamp_us)
    log_annotations(result, location, first_sample_token, nusc, max_timestamp_us)

    return result


def log_cameras(
    scene: CompositeScene,
    first_camera_tokens: list[str],
    nusc: nuscenes.NuScenes,
    max_timestamp_us: float,
) -> None:
    """Log camera data."""
    for first_camera_token in first_camera_tokens:
        current_camera_token = first_camera_token
        last_logged_timestamp = -10000
        while current_camera_token != "":
            sample_data = nusc.get("sample_data", current_camera_token)
            if max_timestamp_us < sample_data["timestamp"]:
                break
            sensor_name = sample_data["channel"]

            if sensor_name not in scene.streams:
                scene.streams[sensor_name] = EventStream(
                    id=sensor_name,
                    stream=ImageStream(
                        events=[],
                        camera_id=sensor_name + "-camera",
                        frame_of_reference_id=sensor_name + "-calibration",
                    ),
                )

            timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
            if hz > 0:
                timestamp *= hz
                timestamp = floor(timestamp)
            if hz > 0 and timestamp - last_logged_timestamp < 1.0:
                current_camera_token = sample_data["next"]
                continue
            last_logged_timestamp = timestamp

            data_file_path = nusc.dataroot / sample_data["filename"]

            # write_asset(data_file_path)
            event = URIEvent(
                uri=config.base_url + "/" + str(data_file_path),
                timestamp=timestamp,
            )
            scene.streams[sensor_name].stream.events.append(event)

            current_camera_token = sample_data["next"]


def log_lidar_and_ego_pose(
    scene: CompositeScene,
    location: str,
    first_lidar_token: list[str],
    nusc: nuscenes.NuScenes,
    max_timestamp_us: float,
) -> None:
    """Log lidar data and vehicle pose."""

    scene.streams["ego_vehicle"] = EventStream(
        id="ego_vehicle",
        stream=FORStream(events=[]),
    )

    last_logged_timestamp = -10000

    for current_lidar_token in first_lidar_token:
        while current_lidar_token != "":
            sample_data = nusc.get("sample_data", current_lidar_token)
            sensor_name = sample_data["channel"]

            if max_timestamp_us < sample_data["timestamp"]:
                break

            timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
            if hz > 0:
                timestamp *= hz
                timestamp = floor(timestamp)
            if hz > 0 and timestamp - last_logged_timestamp < 1.0:
                current_lidar_token = sample_data["next"]
                continue
            last_logged_timestamp = timestamp

            ego_pose = nusc.get("ego_pose", sample_data["ego_pose_token"])
            rotation = (
                Rotation.from_quat(ego_pose["rotation"], scalar_first=True)
                .as_matrix()
                .transpose()
                .flatten()
            )
            position = ego_pose["translation"]
            if timestamp == 0:
                global first_position
                first_position = position

            event = FOREvent(
                id="ego_vehicle",
                parent_FOR="root",
                position=sub(position, first_position),
                rotation=rotation,
                timestamp=timestamp,
            )
            scene.streams["ego_vehicle"].stream.events.append(event)

            current_lidar_token = sample_data["next"]

            data_file_path = nusc.dataroot / sample_data["filename"]

            if sensor_name not in scene.streams:
                scene.streams[sensor_name] = EventStream(
                    id=sensor_name,
                    stream=PCDStream(
                        events=[], frame_of_reference_id=sensor_name + "-calibration"
                    ),
                )

            data_file_path = nusc.dataroot / sample_data["filename"]
            pointcloud = nuscenes.LidarPointCloud.from_file(str(data_file_path))
            points = pointcloud.points[:3].T

            fields = ("x", "y", "z")
            types = (
                np.float32,
                np.float32,
                np.float32,
            )

            pc = pypcd4.PointCloud.from_points(points, fields, types)

            # strip .bin extension from filename
            new_path = str(data_file_path.parent / data_file_path.stem)
            pc.save(new_path)

            event = URIEvent(
                uri=config.base_url + "/" + new_path,
                timestamp=timestamp,
            )
            scene.streams[sensor_name].stream.events.append(event)


def log_radars(
    scene: CompositeScene,
    first_radar_tokens: list[str],
    nusc: nuscenes.NuScenes,
    max_timestamp_us: float,
) -> None:
    """Log radar data to the scene"""
    for first_radar_token in first_radar_tokens:
        current_camera_token = first_radar_token
        last_logged_timestamp = -10000
        while current_camera_token != "":
            sample_data = nusc.get("sample_data", current_camera_token)
            if max_timestamp_us < sample_data["timestamp"]:
                break
            sensor_name = sample_data["channel"]

            if sensor_name not in scene.streams:
                scene.streams[sensor_name] = EventStream(
                    id=sensor_name,
                    stream=PCDStream(
                        events=[], frame_of_reference_id=sensor_name + "-calibration"
                    ),
                )

            timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
            if hz > 0:
                timestamp *= hz
                timestamp = floor(timestamp)
            if hz > 0 and timestamp - last_logged_timestamp < 1.0:
                current_camera_token = sample_data["next"]
                continue
            last_logged_timestamp = timestamp

            data_file_path = nusc.dataroot / sample_data["filename"]
            current_camera_token = sample_data["next"]
            # write_asset(data_file_path)
            event = URIEvent(
                uri=config.base_url + "/" + str(data_file_path),
                timestamp=timestamp,
            )
            scene.streams[sensor_name].stream.events.append(event)


def log_sensor_calibration(
    scene: CompositeScene, sample_data: dict[str, Any], nusc: nuscenes.NuScenes
) -> None:
    """Log sensor calibration (pinhole camera, sensor poses, etc.) to the scene"""
    sensor_name = sample_data["channel"]
    calibrated_sensor_token = sample_data["calibrated_sensor_token"]
    calibrated_sensor = nusc.get("calibrated_sensor", calibrated_sensor_token)
    rotation = (
        Rotation.from_quat(calibrated_sensor["rotation"], scalar_first=True)
        .as_matrix()
        .transpose()
        .flatten()
        .tolist()
    )

    id = sensor_name + "-calibration"
    scene.streams[id] = EventStream(
        id=id,
        stream=FORStream(events=[]),
    )
    position = sub(calibrated_sensor["translation"], first_position)
    event = FOREvent(
        id=id,
        parent_FOR="ego_vehicle",  # "ego_vehicle",
        position=position,
        rotation=rotation,
    )
    scene.streams[id].stream.events.append(event)

    if len(calibrated_sensor["camera_intrinsic"]) != 0:
        intrinsic = calibrated_sensor["camera_intrinsic"]
        camera_id = sensor_name + "-camera"
        scene.streams[camera_id] = EventStream(
            id=camera_id,
            stream=CameraStream(
                events=[],
                frame_of_reference_id=id,  # might be "root"
            ),
        )

        event = CameraParamsEvent(
            timestamp=0,
            width_px=1600,
            height_px=900,
            intrinsics=CameraIntrinsics(
                fx=intrinsic[0][0],
                fy=intrinsic[1][1],
                ox=intrinsic[0][2],
                oy=intrinsic[1][2],
                s=intrinsic[0][1],
            ),
            extrinsics=CameraExtrinsics(
                position=(0, 0, 0),
                rotation=(0, 0, 1, -1, 0, 0, 0, -1, 0),
            ),
        )
        scene.streams[camera_id].stream.events.append(event)


def log_annotations(
    scene: CompositeScene,
    location: str,
    first_sample_token: str,
    nusc: nuscenes.NuScenes,
    max_timestamp_us: float,
) -> None:
    """Log 3D cuboids to the scene"""

    scene.streams["anns"] = EventStream(
        id="anns",
        stream=ModelStream(events=[], camera_id=None),
    )

    current_sample_token = first_sample_token
    last_logged_timestamp = -10000
    while current_sample_token != "":
        sample_data = nusc.get("sample", current_sample_token)
        if max_timestamp_us < sample_data["timestamp"]:
            break

        timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
        if hz > 0:
            timestamp *= hz
            timestamp = floor(timestamp)
        if hz > 0 and timestamp - last_logged_timestamp < 1.0:
            current_sample_token = sample_data["next"]
            continue
        last_logged_timestamp = timestamp

        ann_tokens = sample_data["anns"]
        geometries = []
        for ann_token in ann_tokens:
            ann = nusc.get("sample_annotation", ann_token)

            width, length, height = ann["size"]

            # Convert rotation to euler angles
            rotation = Rotation.from_quat(ann["rotation"], scalar_first=True).as_euler(
                "XYZ"
            )

            geometries.append(
                CuboidGeometry(
                    pose=Pose(
                        position=sub(ann["translation"], first_position),
                        orientation=rotation,
                    ),
                    size=(length, width, height),
                )
            )

        event = ModelEvent(
            timestamp=timestamp,
            geometries=geometries,
        )
        scene.streams["anns"].stream.events.append(event)

        current_sample_token = sample_data["next"]


def download_file(url: str, dst_file_path: pathlib.Path) -> None:
    """Download file from url to dst_fpath."""
    dst_file_path.parent.mkdir(parents=True, exist_ok=True)
    print(f"Downloading {url} to {dst_file_path}")
    response = requests.get(url, stream=True)
    with tqdm.tqdm.wrapattr(
        open(dst_file_path, "wb"),
        "write",
        miniters=1,
        total=int(response.headers.get("content-length", 0)),
        desc=f"Downloading {dst_file_path.name}",
    ) as f:
        for chunk in response.iter_content(chunk_size=4096):
            f.write(chunk)


def untar_file(
    tar_file_path: pathlib.Path, dst_path: pathlib.Path, keep_tar: bool = True
) -> bool:
    """Untar tar file at tar_file_path to dst."""
    print(f"Untar file {tar_file_path}")
    try:
        with tarfile.open(tar_file_path, "r") as tf:
            tf.extractall(dst_path)
    except Exception as error:
        print(f"Error unzipping {tar_file_path}, error: {error}")
        return False
    if not keep_tar:
        os.remove(tar_file_path)
    return True


def download_minisplit(root_dir: pathlib.Path) -> None:
    """
    Download nuScenes minisplit.

    Adopted from <https://colab.research.google.com/github/nutonomy/nuscenes-devkit/blob/master/python-sdk/tutorials/nuscenes_tutorial.ipynb>
    """
    MINISPLIT_URL = "https://www.nuscenes.org/data/v1.0-mini.tgz"

    zip_file_path = pathlib.Path("./v1.0-mini.tgz")
    if not zip_file_path.is_file():
        download_file(MINISPLIT_URL, zip_file_path)
    untar_file(zip_file_path, root_dir, keep_tar=True)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Visualizes the nuScenes dataset ")
    parser.add_argument(
        "--root-dir",
        type=pathlib.Path,
        default=DATASET_DIR,
        help="Root directory of nuScenes dataset",
    )
    parser.add_argument(
        "--scene-name",
        type=str,
        default="scene-0061",
        help="Scene name to visualize (typically of form 'scene-xxxx')",
    )
    parser.add_argument(
        "--dataset-version", type=str, default="v1.0-mini", help="Scene id to visualize"
    )
    parser.add_argument(
        "--seconds",
        type=float,
        default=float("inf"),
        help="If specified, limits the number of seconds logged",
    )
    parser.add_argument(
        "--all",
        "-A",
        action="store_true",
        help="If specified, logs all scenes",
    )
    parser.add_argument(
        "--hz",
        type=float,
        default=0.0,
        help="Limit the sample rate",
    )
    args = parser.parse_args()

    # ensure_scene_available(
    #     root_dir=args.root_dir,
    #     dataset_version=args.dataset_version,
    #     scene_name=args.scene_name,
    # )

    nusc = nuscenes.NuScenes(
        version=args.dataset_version, dataroot=args.root_dir, verbose=False
    )

    scene_names: list[str] = [args.scene_name]

    if args.all:
        scene_names = [s["name"] for s in nusc.scene]

    scenes = [
        (
            log_nuscenes(
                nusc, scene_name, max_time_sec=args.seconds, sample_hz=args.hz
            ),
            scene_name,
        )
        for scene_name in scene_names
    ]
    write_upload_json(scenes)


PCD Ontologies

PCD Projects support the following object label types:
  • Cuboids
  • Segmentation
  • Polylines
  • Keypoints

Project Settings

Configure Label Editor templates to streamline the annotation and review experience for your Taskers.

Label and Review PCD Data

We strongly recommend that Taskers use a mouse when annotating or reviewing Scenes. Using a mouse makes annotating or reviewing Scenes significantly easier.