# /// script
# requires-python = ">=3.12"
# dependencies = [
# "matplotlib>=3.10.3",
# "np>=1.0.2",
# "nuscenes-devkit>=1.1.9",
# "pillow>=11.3.0",
# "pydantic>=2.11.5",
# "pypcd4>=1.2.1",
# "requests>=2.32.3",
# "scipy>=1.15.3",
# "tqdm>=4.67.1",
# ]
# ///
from __future__ import annotations
import argparse
import json
import os
import pathlib
import re
import shutil
import tarfile
from dataclasses import dataclass
from enum import StrEnum, auto
from math import floor
from typing import Annotated, Any, Literal
import numpy as np
import pypcd4
import requests
import tqdm
from nuscenes import nuscenes
from pydantic import BaseModel, ConfigDict, Field
from scipy.spatial.transform import Rotation
"""
This script processes scenes from the nuScenes (https://www.nuscenes.org/) dataset and converts them into a
the Encord upload JSON format for visualization and annotation. It can handle
lidar, radar, and camera data, as well as 3D annotations and ego-vehicle poses.
The script downloads the nuScenes minisplit if not found locally, and processes it, including:
- Converting the point cloud data from .bin to .pcd
- Timestamps are normalized to start from 0 at the beginning of the scene
- Converting positions so that the vehicle's starting position is treated as the origin (0, 0, 0)
"""
def snake2camel(snake: str, start_lower: bool = True) -> str:
"""
Converts a snake_case string to camelCase.
The `start_lower` argument determines whether the first letter in the generated camelcase should
be lowercase (if `start_lower` is True), or capitalized (if `start_lower` is False).
"""
camel = snake.title()
camel = re.sub("([0-9A-Za-z])_(?=[0-9A-Z])", lambda m: m.group(1), camel)
if start_lower:
camel = re.sub("(^_*[A-Z])", lambda m: m.group(1).lower(), camel)
return camel
class CamelModel(BaseModel):
model_config = ConfigDict(alias_generator=snake2camel, populate_by_name=True)
@dataclass
class CameraIntrinsics:
fx: Annotated[float, Field(description="Focal length x")]
fy: Annotated[float, Field(description="Focal length y")]
ox: Annotated[float, Field(description="Principal point offset x")]
oy: Annotated[float, Field(description="Principal point offset y")]
s: Annotated[float, Field(description="Axis skew")]
@dataclass
class CameraExtrinsics:
rotation: Annotated[
tuple[float, float, float, float, float, float, float, float, float],
Field(description="Rotation matrix R"),
]
position: Annotated[
tuple[float, float, float], Field(description="Translation vector T")
]
@dataclass
class CameraParams:
width_px: int
height_px: int
intrinsics: Annotated[CameraIntrinsics, Field(description="The intrinsic matrix K")]
extrinsics: Annotated[
CameraExtrinsics, Field(description="The extrinsic 4x4 matrix R|T")
]
@dataclass
class FrameOfReference:
id: Annotated[str, Field(description="ID of this frame of reference")]
parent_FOR: Annotated[
str | None, Field(description="ID of a parent frame of reference")
]
rotation: tuple[float, float, float, float, float, float, float, float, float]
position: tuple[float, float, float]
Position = tuple[float, float, float]
EulerOrientation = tuple[float, float, float]
Size = tuple[float, float, float]
class Pose(CamelModel):
position: Position
orientation: EulerOrientation
class CuboidGeometry(CamelModel):
type: Literal["cuboid"] = "cuboid"
pose: Pose
size: Size
@dataclass
class _FORIdMixin:
frame_of_reference_id: Annotated[
str | None, Field(description="ID of the frame of reference the entity is in")
] = None
@dataclass
class _URIMixin:
uri: str
@dataclass
class _EventMixin:
timestamp: float | None = None
class URIEvent(CamelModel, _EventMixin, _URIMixin):
pass
class CameraParamsEvent(CamelModel, _EventMixin, CameraParams):
pass
class FOREvent(CamelModel, _EventMixin, FrameOfReference):
pass
class ModelEvent(CamelModel, _EventMixin):
geometries: list[CuboidGeometry]
class CompositeScene(CamelModel):
type: Literal["composite"] = "composite"
streams: dict[str, EventStream]
class EntityType(StrEnum):
POINT_CLOUD = auto()
FRAME_OF_REFERENCE = auto()
IMAGE = auto()
MODEL = auto()
CAMERA_PARAMETERS = auto()
class PCDStream(CamelModel, _FORIdMixin):
entity_type: Literal[EntityType.POINT_CLOUD] = EntityType.POINT_CLOUD
events: Annotated[list[URIEvent], Field(description="List of point cloud events")]
class CameraStream(CamelModel, _FORIdMixin):
entity_type: Literal[EntityType.CAMERA_PARAMETERS] = EntityType.CAMERA_PARAMETERS
events: list[CameraParamsEvent]
class ImageStream(CamelModel):
entity_type: Literal[EntityType.IMAGE] = EntityType.IMAGE
events: list[URIEvent]
camera_id: Annotated[
str | None,
Field(
description="ID of the camera associated with the image. Used to position the image in-scene"
),
]
class ModelStream(CamelModel):
entity_type: Literal[EntityType.MODEL] = EntityType.MODEL
events: list[URIEvent | ModelEvent]
camera_id: str | None
class FORStream(CamelModel):
entity_type: Literal[EntityType.FRAME_OF_REFERENCE] = EntityType.FRAME_OF_REFERENCE
events: Annotated[
list[FOREvent], Field(description="List of frame of reference events")
]
class EventStream(CamelModel):
type: Literal["event"] = "event"
id: str
stream: Annotated[
PCDStream | CameraStream | FORStream | ImageStream | ModelStream,
Field(discriminator="entity_type"),
]
DATASET_DIR = pathlib.Path("./dataset")
class Config:
env: str
output_dir: pathlib.Path
base_url: str
def __init__(self):
self.env = "remote"
self.output_dir = pathlib.Path("./scenes")
self.base_url = (
"https://storage.cloud.google.com/my-bucket-name/scenes/nuscenes" # Replace this with the file path in your bucket to the dataset
)
config = Config()
def ensure_scene_available(
root_dir: pathlib.Path, dataset_version: str, scene_name: str
) -> None:
"""
Ensure that the specified scene is available.
Downloads minisplit into root_dir if scene_name is part of it and root_dir is empty.
Raises ValueError if scene is not available and cannot be downloaded.
"""
try:
nusc = nuscenes.NuScenes(
version=dataset_version, dataroot=str(root_dir), verbose=False
)
except AssertionError: # dataset initialization failed
if dataset_version == "v1.0-mini":
download_minisplit(root_dir)
nusc = nuscenes.NuScenes(
version=dataset_version, dataroot=str(root_dir), verbose=False
)
else:
print(
f"Could not find dataset at {root_dir} and could not automatically download specified scene."
)
exit()
scene_names = [s["name"] for s in nusc.scene]
if scene_name not in scene_names:
raise ValueError(f"{scene_name=} not found in dataset")
def nuscene_sensor_names(nusc: nuscenes.NuScenes, scene_name: str) -> list[str]:
"""Return all sensor names in the scene."""
sensor_names = set()
scene = next(s for s in nusc.scene if s["name"] == scene_name)
first_sample = nusc.get("sample", scene["first_sample_token"])
for sample_data_token in first_sample["data"].values():
sample_data = nusc.get("sample_data", sample_data_token)
if sample_data["sensor_modality"] == "camera":
current_camera_token = sample_data_token
while current_camera_token != "":
sample_data = nusc.get("sample_data", current_camera_token)
sensor_name = sample_data["channel"]
sensor_names.add(sensor_name)
current_camera_token = sample_data["next"]
# For a known set of cameras, order the sensors in a circle.
ordering = {
"CAM_FRONT_LEFT": 0,
"CAM_FRONT": 1,
"CAM_FRONT_RIGHT": 2,
"CAM_BACK_RIGHT": 3,
"CAM_BACK": 4,
"CAM_BACK_LEFT": 5,
}
return sorted(
sensor_names, key=lambda sensor_name: ordering.get(sensor_name, float("inf"))
)
# Write all uri assets required for the scene to a separate output directory
def write_asset(path: pathlib.Path):
shutil.copyfile(path, pathlib.Path("./output") / path.name)
def write_nuscenes_json(scene: CompositeScene, name: str):
OUTPUT_FILE = config.output_dir / "nuscenes.json"
with open(OUTPUT_FILE, "w") as f:
dummy_json = scene.model_dump_json(by_alias=True, indent=2)
f.write(dummy_json)
print("Wrote to", OUTPUT_FILE)
def write_upload_json(scenes: list[tuple[CompositeScene, str]]):
scenes_final = []
for scene, name in scenes:
streams = list(scene.model_dump(by_alias=True)["streams"].values())
scenes_final.append(
{
"title": name,
"streams": streams,
}
)
final = {"scenes": scenes_final}
OUTPUT_FILE = config.output_dir / "upload.json"
with open(OUTPUT_FILE, "w") as f:
json.dump(final, f, indent=2)
print("Wrote to", OUTPUT_FILE)
first_timestamp = 0
first_position = [0, 0, 0]
hz = 0
def sub(a, b) -> tuple[float, float, float]:
return [a[i] - b[i] for i in range(len(a))]
def log_nuscenes(
nusc: nuscenes.NuScenes, scene_name: str, max_time_sec: float, sample_hz: float
) -> CompositeScene:
"""Log nuScenes scene."""
print(f"Logging scene {scene_name}")
result = CompositeScene(streams={})
scene = next(s for s in nusc.scene if s["name"] == scene_name)
location = nusc.get("log", scene["log_token"])["location"]
# Get the first sample
first_sample_token = scene["first_sample_token"]
first_sample = nusc.get("sample", scene["first_sample_token"])
# Get the timestamp (in seconds)
global first_timestamp
first_timestamp = first_sample["timestamp"] / 1e6
global first_position
first_position = (0, 0, 0)
global hz
hz = sample_hz
first_lidar_tokens = []
first_radar_tokens = []
first_camera_tokens = []
for sample_data_token in first_sample["data"].values():
sample_data = nusc.get("sample_data", sample_data_token)
log_sensor_calibration(result, sample_data, nusc)
if sample_data["sensor_modality"] == "lidar":
first_lidar_tokens.append(sample_data_token)
elif sample_data["sensor_modality"] == "radar":
first_radar_tokens.append(sample_data_token)
elif sample_data["sensor_modality"] == "camera":
first_camera_tokens.append(sample_data_token)
first_timestamp_us = nusc.get("sample_data", first_lidar_tokens[0])["timestamp"]
max_timestamp_us = first_timestamp_us + 1e6 * max_time_sec
log_lidar_and_ego_pose(result, location, first_lidar_tokens, nusc, max_timestamp_us)
log_cameras(result, first_camera_tokens, nusc, max_timestamp_us)
log_radars(result, first_radar_tokens, nusc, max_timestamp_us)
log_annotations(result, location, first_sample_token, nusc, max_timestamp_us)
return result
def log_cameras(
scene: CompositeScene,
first_camera_tokens: list[str],
nusc: nuscenes.NuScenes,
max_timestamp_us: float,
) -> None:
"""Log camera data."""
for first_camera_token in first_camera_tokens:
current_camera_token = first_camera_token
last_logged_timestamp = -10000
while current_camera_token != "":
sample_data = nusc.get("sample_data", current_camera_token)
if max_timestamp_us < sample_data["timestamp"]:
break
sensor_name = sample_data["channel"]
if sensor_name not in scene.streams:
scene.streams[sensor_name] = EventStream(
id=sensor_name,
stream=ImageStream(
events=[],
camera_id=sensor_name + "-camera",
frame_of_reference_id=sensor_name + "-calibration",
),
)
timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
if hz > 0:
timestamp *= hz
timestamp = floor(timestamp)
if hz > 0 and timestamp - last_logged_timestamp < 1.0:
current_camera_token = sample_data["next"]
continue
last_logged_timestamp = timestamp
data_file_path = nusc.dataroot / sample_data["filename"]
# write_asset(data_file_path)
event = URIEvent(
uri=config.base_url + "/" + str(data_file_path),
timestamp=timestamp,
)
scene.streams[sensor_name].stream.events.append(event)
current_camera_token = sample_data["next"]
def log_lidar_and_ego_pose(
scene: CompositeScene,
location: str,
first_lidar_token: list[str],
nusc: nuscenes.NuScenes,
max_timestamp_us: float,
) -> None:
"""Log lidar data and vehicle pose."""
scene.streams["ego_vehicle"] = EventStream(
id="ego_vehicle",
stream=FORStream(events=[]),
)
last_logged_timestamp = -10000
for current_lidar_token in first_lidar_token:
while current_lidar_token != "":
sample_data = nusc.get("sample_data", current_lidar_token)
sensor_name = sample_data["channel"]
if max_timestamp_us < sample_data["timestamp"]:
break
timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
if hz > 0:
timestamp *= hz
timestamp = floor(timestamp)
if hz > 0 and timestamp - last_logged_timestamp < 1.0:
current_lidar_token = sample_data["next"]
continue
last_logged_timestamp = timestamp
ego_pose = nusc.get("ego_pose", sample_data["ego_pose_token"])
rotation = (
Rotation.from_quat(ego_pose["rotation"], scalar_first=True)
.as_matrix()
.transpose()
.flatten()
)
position = ego_pose["translation"]
if timestamp == 0:
global first_position
first_position = position
event = FOREvent(
id="ego_vehicle",
parent_FOR="root",
position=sub(position, first_position),
rotation=rotation,
timestamp=timestamp,
)
scene.streams["ego_vehicle"].stream.events.append(event)
current_lidar_token = sample_data["next"]
data_file_path = nusc.dataroot / sample_data["filename"]
if sensor_name not in scene.streams:
scene.streams[sensor_name] = EventStream(
id=sensor_name,
stream=PCDStream(
events=[], frame_of_reference_id=sensor_name + "-calibration"
),
)
data_file_path = nusc.dataroot / sample_data["filename"]
pointcloud = nuscenes.LidarPointCloud.from_file(str(data_file_path))
points = pointcloud.points[:3].T
fields = ("x", "y", "z")
types = (
np.float32,
np.float32,
np.float32,
)
pc = pypcd4.PointCloud.from_points(points, fields, types)
# strip .bin extension from filename
new_path = str(data_file_path.parent / data_file_path.stem)
pc.save(new_path)
event = URIEvent(
uri=config.base_url + "/" + new_path,
timestamp=timestamp,
)
scene.streams[sensor_name].stream.events.append(event)
def log_radars(
scene: CompositeScene,
first_radar_tokens: list[str],
nusc: nuscenes.NuScenes,
max_timestamp_us: float,
) -> None:
"""Log radar data to the scene"""
for first_radar_token in first_radar_tokens:
current_camera_token = first_radar_token
last_logged_timestamp = -10000
while current_camera_token != "":
sample_data = nusc.get("sample_data", current_camera_token)
if max_timestamp_us < sample_data["timestamp"]:
break
sensor_name = sample_data["channel"]
if sensor_name not in scene.streams:
scene.streams[sensor_name] = EventStream(
id=sensor_name,
stream=PCDStream(
events=[], frame_of_reference_id=sensor_name + "-calibration"
),
)
timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
if hz > 0:
timestamp *= hz
timestamp = floor(timestamp)
if hz > 0 and timestamp - last_logged_timestamp < 1.0:
current_camera_token = sample_data["next"]
continue
last_logged_timestamp = timestamp
data_file_path = nusc.dataroot / sample_data["filename"]
current_camera_token = sample_data["next"]
# write_asset(data_file_path)
event = URIEvent(
uri=config.base_url + "/" + str(data_file_path),
timestamp=timestamp,
)
scene.streams[sensor_name].stream.events.append(event)
def log_sensor_calibration(
scene: CompositeScene, sample_data: dict[str, Any], nusc: nuscenes.NuScenes
) -> None:
"""Log sensor calibration (pinhole camera, sensor poses, etc.) to the scene"""
sensor_name = sample_data["channel"]
calibrated_sensor_token = sample_data["calibrated_sensor_token"]
calibrated_sensor = nusc.get("calibrated_sensor", calibrated_sensor_token)
rotation = (
Rotation.from_quat(calibrated_sensor["rotation"], scalar_first=True)
.as_matrix()
.transpose()
.flatten()
.tolist()
)
id = sensor_name + "-calibration"
scene.streams[id] = EventStream(
id=id,
stream=FORStream(events=[]),
)
position = sub(calibrated_sensor["translation"], first_position)
event = FOREvent(
id=id,
parent_FOR="ego_vehicle", # "ego_vehicle",
position=position,
rotation=rotation,
)
scene.streams[id].stream.events.append(event)
if len(calibrated_sensor["camera_intrinsic"]) != 0:
intrinsic = calibrated_sensor["camera_intrinsic"]
camera_id = sensor_name + "-camera"
scene.streams[camera_id] = EventStream(
id=camera_id,
stream=CameraStream(
events=[],
frame_of_reference_id=id, # might be "root"
),
)
event = CameraParamsEvent(
timestamp=0,
width_px=1600,
height_px=900,
intrinsics=CameraIntrinsics(
fx=intrinsic[0][0],
fy=intrinsic[1][1],
ox=intrinsic[0][2],
oy=intrinsic[1][2],
s=intrinsic[0][1],
),
extrinsics=CameraExtrinsics(
position=(0, 0, 0),
rotation=(0, 0, 1, -1, 0, 0, 0, -1, 0),
),
)
scene.streams[camera_id].stream.events.append(event)
def log_annotations(
scene: CompositeScene,
location: str,
first_sample_token: str,
nusc: nuscenes.NuScenes,
max_timestamp_us: float,
) -> None:
"""Log 3D cuboids to the scene"""
scene.streams["anns"] = EventStream(
id="anns",
stream=ModelStream(events=[], camera_id=None),
)
current_sample_token = first_sample_token
last_logged_timestamp = -10000
while current_sample_token != "":
sample_data = nusc.get("sample", current_sample_token)
if max_timestamp_us < sample_data["timestamp"]:
break
timestamp = sample_data["timestamp"] * 1e-6 - first_timestamp
if hz > 0:
timestamp *= hz
timestamp = floor(timestamp)
if hz > 0 and timestamp - last_logged_timestamp < 1.0:
current_sample_token = sample_data["next"]
continue
last_logged_timestamp = timestamp
ann_tokens = sample_data["anns"]
geometries = []
for ann_token in ann_tokens:
ann = nusc.get("sample_annotation", ann_token)
width, length, height = ann["size"]
# Convert rotation to euler angles
rotation = Rotation.from_quat(ann["rotation"], scalar_first=True).as_euler(
"XYZ"
)
geometries.append(
CuboidGeometry(
pose=Pose(
position=sub(ann["translation"], first_position),
orientation=rotation,
),
size=(length, width, height),
)
)
event = ModelEvent(
timestamp=timestamp,
geometries=geometries,
)
scene.streams["anns"].stream.events.append(event)
current_sample_token = sample_data["next"]
def download_file(url: str, dst_file_path: pathlib.Path) -> None:
"""Download file from url to dst_fpath."""
dst_file_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Downloading {url} to {dst_file_path}")
response = requests.get(url, stream=True)
with tqdm.tqdm.wrapattr(
open(dst_file_path, "wb"),
"write",
miniters=1,
total=int(response.headers.get("content-length", 0)),
desc=f"Downloading {dst_file_path.name}",
) as f:
for chunk in response.iter_content(chunk_size=4096):
f.write(chunk)
def untar_file(
tar_file_path: pathlib.Path, dst_path: pathlib.Path, keep_tar: bool = True
) -> bool:
"""Untar tar file at tar_file_path to dst."""
print(f"Untar file {tar_file_path}")
try:
with tarfile.open(tar_file_path, "r") as tf:
tf.extractall(dst_path)
except Exception as error:
print(f"Error unzipping {tar_file_path}, error: {error}")
return False
if not keep_tar:
os.remove(tar_file_path)
return True
def download_minisplit(root_dir: pathlib.Path) -> None:
"""
Download nuScenes minisplit.
Adopted from <https://colab.research.google.com/github/nutonomy/nuscenes-devkit/blob/master/python-sdk/tutorials/nuscenes_tutorial.ipynb>
"""
MINISPLIT_URL = "https://www.nuscenes.org/data/v1.0-mini.tgz"
zip_file_path = pathlib.Path("./v1.0-mini.tgz")
if not zip_file_path.is_file():
download_file(MINISPLIT_URL, zip_file_path)
untar_file(zip_file_path, root_dir, keep_tar=True)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Visualizes the nuScenes dataset ")
parser.add_argument(
"--root-dir",
type=pathlib.Path,
default=DATASET_DIR,
help="Root directory of nuScenes dataset",
)
parser.add_argument(
"--scene-name",
type=str,
default="scene-0061",
help="Scene name to visualize (typically of form 'scene-xxxx')",
)
parser.add_argument(
"--dataset-version", type=str, default="v1.0-mini", help="Scene id to visualize"
)
parser.add_argument(
"--seconds",
type=float,
default=float("inf"),
help="If specified, limits the number of seconds logged",
)
parser.add_argument(
"--all",
"-A",
action="store_true",
help="If specified, logs all scenes",
)
parser.add_argument(
"--hz",
type=float,
default=0.0,
help="Limit the sample rate",
)
args = parser.parse_args()
# ensure_scene_available(
# root_dir=args.root_dir,
# dataset_version=args.dataset_version,
# scene_name=args.scene_name,
# )
nusc = nuscenes.NuScenes(
version=args.dataset_version, dataroot=args.root_dir, verbose=False
)
scene_names: list[str] = [args.scene_name]
if args.all:
scene_names = [s["name"] for s in nusc.scene]
scenes = [
(
log_nuscenes(
nusc, scene_name, max_time_sec=args.seconds, sample_hz=args.hz
),
scene_name,
)
for scene_name in scene_names
]
write_upload_json(scenes)