Skip to content

viewer

PointCloudColorMode

Bases: str, Enum

Color mode of point cloud.

Source code in t4_devkit/viewer/color.py
@unique
class PointCloudColorMode(str, Enum):
    """Color mode of point cloud."""

    DISTANCE = "distance"
    INTENSITY = "intensity"

RerunViewer

A viewer class that renders some components powered by rerun.

Source code in t4_devkit/viewer/viewer.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
class RerunViewer:
    """A viewer class that renders some components powered by rerun."""

    def __init__(
        self,
        app_id: str,
        config: ViewerConfig = ViewerConfig(),
        save_dir: str | None = None,
    ) -> None:
        """Construct a new object.

        Args:
            app_id (str): Application ID.
            config (ViewerConfig): Configuration of the viewer.
            save_dir (str | None, optional): Directory path to save the recording.
                Viewer will be spawned if it is None, otherwise not.

        Examples:
            >>> from t4_devkit.viewer import ViewerBuilder
            >>> viewer = (
                    ViewerBuilder()
                    .with_spatial3d()
                    .with_spatial2d(cameras=["CAM_FRONT", "CAM_BACK"])
                    .with_labels(label2id={"car": 1, "pedestrian": 2})
                    .with_streetmap(latlon=[48.8566, 2.3522])
                    .build(app_id="my_viewer")
                )
        """
        self.app_id = app_id
        self.config = config
        self.blueprint = self.config.to_blueprint()

        rr.init(
            application_id=self.app_id,
            recording_id=None,
            spawn=save_dir is None,
            default_enabled=True,
            strict=True,
            default_blueprint=self.blueprint,
        )

        # NOTE: rr.save() must be invoked before logging
        if save_dir is not None:
            self._start_saving(save_dir=save_dir)

        rr.log(self.config.map_entity, rr.ViewCoordinates.RIGHT_HAND_Z_UP, static=True)

        rr.log(
            self.config.map_entity,
            rr.AnnotationContext(
                [
                    rr.AnnotationInfo(id=label_id, label=label)
                    for label, label_id in self.label2id.items()
                ]
            ),
            static=True,
        )

    @property
    def label2id(self) -> dict[str, int]:
        return self.config.label2id

    @property
    def latlon(self) -> Vector2Like | None:
        return self.config.latlon

    def _start_saving(self, save_dir: str) -> None:
        """Save recording result as `save_dir/{app_id}.rrd`.

        Note:
            This method must be called before any logging started.

        Args:
            save_dir (str): Directory path to save the result.
        """
        filepath = osp.join(save_dir, f"{self.app_id}.rrd")
        rr.save(filepath, default_blueprint=self.blueprint)

    @overload
    def render_box3ds(self, seconds: float, boxes: Sequence[Box3D]) -> None:
        """Render 3D boxes.

        Args:
            seconds (float): Timestamp in [sec].
            boxes (Sequence[Box3D]): Sequence of `Box3D`s.
        """
        pass

    @overload
    def render_box3ds(
        self,
        seconds: float,
        frame_id: str,
        centers: Sequence[Vector3Like],
        rotations: Sequence[RotationLike],
        sizes: Sequence[Vector3Like],
        class_ids: Sequence[int],
        velocities: Sequence[Vector3Like] | None = None,
        uuids: Sequence[str] | None = None,
        futures: Sequence[Future] | None = None,
    ) -> None:
        """Render 3D boxes with its elements.

        Args:
            seconds (float): Timestamp in [sec].
            frame_id (str): Frame ID.
            centers (Sequence[Vector3Like]): Sequence of 3D positions in the order of (x, y, z).
            rotations (Sequence[RotationLike]): Sequence of rotations.
            sizes (Sequence[Vector3Like]): Sequence of box sizes in the order of (width, length, height).
            class_ids (Sequence[int]): Sequence of class IDs.
            velocities (Sequence[Vector3Like] | None, optional): Sequence of velocities.
            uuids (Sequence[str] | None, optional): Sequence of unique identifiers.
            futures (Sequence[Future] | None, optional): Sequence future trajectories.
        """
        pass

    @_check_spatial3d
    def render_box3ds(self, *args, **kwargs) -> None:
        """Render 3D boxes."""
        if len(args) + len(kwargs) == 2:
            self._render_box3ds_with_boxes(*args, **kwargs)
        else:
            self._render_box3ds_with_elements(*args, **kwargs)

    def _render_box3ds_with_boxes(self, seconds: float, boxes: Sequence[Box3D]) -> None:
        rr.set_time_seconds(self.config.timeline, seconds)

        batches: dict[str, BatchBox3D] = {}
        for box in boxes:
            if box.frame_id not in batches:
                batches[box.frame_id] = BatchBox3D(label2id=self.label2id)
            batches[box.frame_id].append(box)

        for frame_id, batch in batches.items():
            # record boxes 3d
            rr.log(
                format_entity(self.config.map_entity, frame_id, "box"),
                batch.as_boxes3d(),
            )
            # record velocities
            rr.log(
                format_entity(self.config.map_entity, frame_id, "velocity"),
                batch.as_arrows3d(),
            )
            # record futures
            rr.log(
                format_entity(self.config.map_entity, frame_id, "future"),
                batch.as_linestrips3d(),
            )

    def _render_box3ds_with_elements(
        self,
        seconds: float,
        frame_id: str,
        centers: Sequence[Vector3Like],
        rotations: Sequence[RotationLike],
        sizes: Sequence[Vector3Like],
        class_ids: Sequence[int],
        velocities: Sequence[Vector3Like] | None = None,
        uuids: Sequence[str] | None | None = None,
        futures: Sequence[Future] | None = None,
    ) -> None:
        if uuids is None:
            uuids = [None] * len(centers)

        if velocities is None:
            velocities = [None] * len(centers)
            show_arrows = False
        else:
            show_arrows = True

        if futures is None:
            futures = [None] * len(centers)
            show_futures = False
        else:
            show_futures = True

        batch = BatchBox3D(label2id=self.label2id)
        for center, rotation, size, class_id, velocity, uuid, future in zip(
            centers,
            rotations,
            sizes,
            class_ids,
            velocities,
            uuids,
            futures,
            strict=True,
        ):
            batch.append(
                center=center,
                rotation=rotation,
                size=size,
                class_id=class_id,
                velocity=velocity,
                uuid=uuid,
                future=future,
            )

        rr.set_time_seconds(self.config.timeline, seconds)

        rr.log(format_entity(self.config.map_entity, frame_id, "box"), batch.as_boxes3d())

        if show_arrows:
            rr.log(
                format_entity(self.config.map_entity, frame_id, "velocity"),
                batch.as_arrows3d(),
            )

        if show_futures:
            rr.log(
                format_entity(self.config.map_entity, frame_id, "future"),
                batch.as_linestrips3d(),
            )

    @overload
    def render_box2ds(self, seconds: float, boxes: Sequence[Box2D]) -> None:
        """Render 2D boxes. Note that if the viewer initialized without `cameras=None`,
        no 2D box will be rendered.

        Args:
            seconds (float): Timestamp in [sec].
            boxes (Sequence[Box2D]): Sequence of `Box2D`s.
        """
        pass

    @overload
    def render_box2ds(
        self,
        seconds: float,
        camera: str,
        rois: Sequence[RoiLike],
        class_ids: Sequence[int],
        uuids: Sequence[str] | None = None,
    ) -> None:
        """Render 2D boxes with its elements.

        Args:
            seconds (float): Timestamp in [sec].
            camera (str): Camera name.
            rois (Sequence[RoiLike]): Sequence of ROIs in the order of (xmin, ymin, xmax, ymax).
            class_ids (Sequence[int]): Sequence of class IDs.
            uuids (Sequence[str] | None, optional): Sequence of unique identifiers.
        """
        pass

    @_check_spatial2d
    def render_box2ds(self, *args, **kwargs) -> None:
        """Render 2D boxes."""
        if len(args) + len(kwargs) == 2:
            self._render_box2ds_with_boxes(*args, **kwargs)
        else:
            self._render_box2ds_with_elements(*args, **kwargs)

    def _render_box2ds_with_boxes(self, seconds: float, boxes: Sequence[Box2D]) -> None:
        rr.set_time_seconds(self.config.timeline, seconds)

        batches: dict[str, BatchBox2D] = {}
        for box in boxes:
            if box.frame_id not in batches:
                batches[box.frame_id] = BatchBox2D(label2id=self.label2id)
            batches[box.frame_id].append(box)

        for frame_id, batch in batches.items():
            rr.log(
                format_entity(self.config.ego_entity, frame_id, "box"),
                batch.as_boxes2d(),
            )

    def _render_box2ds_with_elements(
        self,
        seconds: float,
        camera: str,
        rois: Sequence[RoiLike],
        class_ids: Sequence[int],
        uuids: Sequence[str] | None = None,
    ) -> None:
        if uuids is None:
            uuids = [None] * len(rois)

        batch = BatchBox2D(label2id=self.label2id)
        for roi, class_id, uuid in zip(rois, class_ids, uuids, strict=True):
            batch.append(roi=roi, class_id=class_id, uuid=uuid)

        rr.set_time_seconds(self.config.timeline, seconds)
        rr.log(format_entity(self.config.ego_entity, camera, "box"), batch.as_boxes2d())

    @_check_spatial2d
    def render_segmentation2d(
        self,
        seconds: float,
        camera: str,
        masks: Sequence[NDArrayU8],
        class_ids: Sequence[int],
        uuids: Sequence[str | None] | None = None,
    ) -> None:
        """Render 2D segmentation image.

        Args:
            seconds (float): Timestamp in [sec].
            camera (str): Name of camera channel.
            masks (Sequence[NDArrayU8]): Sequence of segmentation mask of each instance,
                each mask is the shape of (W, H).
            class_ids (Sequence[int]): Sequence of label ids.
            uuids (Sequence[str | None] | None, optional): Sequence of each instance ID.
        """
        rr.set_time_seconds(self.config.timeline, seconds)

        batch = BatchSegmentation2D()
        if uuids is None:
            uuids = [None] * len(masks)
        for mask, class_id, uuid in zip(masks, class_ids, uuids, strict=True):
            batch.append(mask, class_id, uuid)

        rr.log(
            format_entity(self.config.ego_entity, camera, "segmentation"),
            batch.as_segmentation_image(),
        )

    @_check_spatial3d
    def render_pointcloud(
        self,
        seconds: float,
        channel: str,
        pointcloud: PointCloudLike,
        color_mode: PointCloudColorMode = PointCloudColorMode.DISTANCE,
    ) -> None:
        """Render pointcloud.

        Args:
            seconds (float): Timestamp in [sec].
            channel (str): Name of the pointcloud sensor channel.
            pointcloud (PointCloudLike): Inherence object of `PointCloud`.
            color_mode (PointCloudColorMode, optional): Color mode for pointcloud.
        """
        # TODO(ktro2828): add support of rendering pointcloud on images
        rr.set_time_seconds(self.config.timeline, seconds)

        colors = pointcloud_color(pointcloud, color_mode=color_mode)
        rr.log(
            format_entity(self.config.ego_entity, channel),
            rr.Points3D(pointcloud.points[:3].T, colors=colors),
        )

    @_check_spatial2d
    def render_image(self, seconds: float, camera: str, image: str | NDArrayU8) -> None:
        """Render an image.

        Args:
            seconds (float): Timestamp in [sec].
            camera (str): Name of the camera channel.
            image (str | NDArrayU8): Image tensor or path of the image file.
        """
        rr.set_time_seconds(self.config.timeline, seconds)

        if isinstance(image, str):
            rr.log(format_entity(self.config.ego_entity, camera), rr.ImageEncoded(path=image))
        else:
            rr.log(format_entity(self.config.ego_entity, camera), rr.Image(image))

    @overload
    def render_ego(self, ego_pose: EgoPose) -> None:
        """Render an ego pose.

        Args:
            ego_pose (EgoPose): `EgoPose` object.
        """
        pass

    @overload
    def render_ego(
        self,
        seconds: float,
        translation: Vector3Like,
        rotation: RotationLike,
        geocoordinate: Vector3Like | None = None,
    ) -> None:
        """Render an ego pose.

        Args:
            seconds (float): Timestamp in [sec].
            translation (Vector3Like): 3D position in the map coordinate system
              , in the order of (x, y, z) in [m].
            rotation (RotationLike): Rotation in the map coordinate system.
            geocoordinate (Vector3Like | None, optional): Coordinates in the WGS 84
                reference ellipsoid (latitude, longitude, altitude) in degrees and meters.
        """
        pass

    @_check_spatial3d
    def render_ego(self, *args, **kwargs) -> None:
        """Render an ego pose."""
        if len(args) + len(kwargs) == 1:
            self._render_ego_with_schema(*args, **kwargs)
        else:
            self._render_ego_without_schema(*args, **kwargs)

    def _render_ego_with_schema(self, ego_pose: EgoPose) -> None:
        self._render_ego_without_schema(
            seconds=microseconds2seconds(ego_pose.timestamp),
            translation=ego_pose.translation,
            rotation=ego_pose.rotation,
            geocoordinate=ego_pose.geocoordinate,
        )

    def _render_ego_without_schema(
        self,
        seconds: float,
        translation: Vector3Like,
        rotation: RotationLike,
        geocoordinate: Vector3Like | None = None,
    ) -> None:
        rr.set_time_seconds(self.config.timeline, seconds)

        rr.log(
            self.config.ego_entity,
            rr.Transform3D(
                translation=translation,
                rotation=_to_rerun_quaternion(rotation),
                relation=rr.TransformRelation.ParentFromChild,
            ),
        )

        if geocoordinate is not None:
            latitude, longitude, _ = geocoordinate
            rr.log(
                self.config.geocoordinate_entity,
                rr.GeoPoints(lat_lon=(latitude, longitude)),
            )
        elif self.latlon is not None:
            latitude, longitude = calculate_geodetic_point(translation, self.latlon)
            rr.log(
                self.config.geocoordinate_entity,
                rr.GeoPoints(lat_lon=(latitude, longitude)),
            )

    @overload
    def render_calibration(
        self,
        sensor: Sensor,
        calibration: CalibratedSensor,
        resolution: Vector2Like | None = None,
    ) -> None:
        """Render a sensor calibration.

        Args:
            sensor (Sensor): `Sensor` object.
            calibration (CalibratedSensor): `CalibratedSensor` object.
            resolution (Vector2Like | None, optional): Camera resolution (width, height).
        """
        pass

    @overload
    def render_calibration(
        self,
        channel: str,
        modality: str | SensorModality,
        translation: Vector3Like,
        rotation: RotationLike,
        camera_intrinsic: CameraIntrinsicLike | None = None,
        resolution: Vector2Like | None = None,
    ) -> None:
        """Render a sensor calibration.

        Args:
            channel (str): Name of the sensor channel.
            modality (str | SensorModality): Sensor modality.
            translation (Vector3Like): Sensor translation in ego centric coords.
            rotation (RotationLike): Sensor rotation in ego centric coords.
            camera_intrinsic (CameraIntrinsicLike | None, optional): Camera intrinsic matrix.
            resolution (Vector2Like | None, optional): Camera resolution (width, height).
        """
        pass

    @_check_spatial3d
    def render_calibration(self, *args, **kwargs) -> None:
        """Render a sensor calibration."""
        if len(args) + len(kwargs) <= 3:
            self._render_calibration_with_schema(*args, **kwargs)
        else:
            self._render_calibration_without_schema(*args, **kwargs)

    def _render_calibration_with_schema(
        self,
        sensor: Sensor,
        calibration: CalibratedSensor,
        resolution: Vector2Like | None = None,
    ) -> None:
        self._render_calibration_without_schema(
            channel=sensor.channel,
            modality=sensor.modality,
            translation=calibration.translation,
            rotation=calibration.rotation,
            camera_intrinsic=calibration.camera_intrinsic,
            resolution=resolution,
        )

    def _render_calibration_without_schema(
        self,
        channel: str,
        modality: str | SensorModality,
        translation: Vector3Like,
        rotation: RotationLike,
        camera_intrinsic: CameraIntrinsicLike | None = None,
        resolution: Vector2Like | None = None,
    ) -> None:
        """Render a sensor calibration.

        Args:
            channel (str): Name of the sensor channel.
            modality (str | SensorModality): Sensor modality.
            translation (Vector3Like): Sensor translation in ego centric coords.
            rotation (RotationLike): Sensor rotation in ego centric coords.
            camera_intrinsic (CameraIntrinsicLike | None, optional): Camera intrinsic matrix.
            resolution (Vector2Like | None, optional): Camera resolution (width, height).
        """
        rr.log(
            format_entity(self.config.ego_entity, channel),
            rr.Transform3D(translation=translation, rotation=_to_rerun_quaternion(rotation)),
            static=True,
        )

        if modality == SensorModality.CAMERA:
            rr.log(
                format_entity(self.config.ego_entity, channel),
                rr.Pinhole(image_from_camera=camera_intrinsic, resolution=resolution),
                static=True,
            )

    @_check_filepath
    @_check_spatial3d
    def render_map(self, filepath: str) -> None:
        """Render vector map.

        Args:
            filepath (str): Path to OSM file.
        """
        parser = LaneletParser(filepath, verbose=False)

        root_entity = format_entity(self.config.map_entity, "vector_map")
        render_lanelets(parser, root_entity)
        render_traffic_elements(parser, root_entity)
        render_ways(parser, root_entity)

        render_geographic_borders(parser, f"{self.config.geocoordinate_entity}/vector_map")

__init__(app_id, config=ViewerConfig(), save_dir=None)

Construct a new object.

Parameters:

Name Type Description Default
app_id str

Application ID.

required
config ViewerConfig

Configuration of the viewer.

ViewerConfig()
save_dir str | None

Directory path to save the recording. Viewer will be spawned if it is None, otherwise not.

None

Examples:

>>> from t4_devkit.viewer import ViewerBuilder
>>> viewer = (
        ViewerBuilder()
        .with_spatial3d()
        .with_spatial2d(cameras=["CAM_FRONT", "CAM_BACK"])
        .with_labels(label2id={"car": 1, "pedestrian": 2})
        .with_streetmap(latlon=[48.8566, 2.3522])
        .build(app_id="my_viewer")
    )
Source code in t4_devkit/viewer/viewer.py
def __init__(
    self,
    app_id: str,
    config: ViewerConfig = ViewerConfig(),
    save_dir: str | None = None,
) -> None:
    """Construct a new object.

    Args:
        app_id (str): Application ID.
        config (ViewerConfig): Configuration of the viewer.
        save_dir (str | None, optional): Directory path to save the recording.
            Viewer will be spawned if it is None, otherwise not.

    Examples:
        >>> from t4_devkit.viewer import ViewerBuilder
        >>> viewer = (
                ViewerBuilder()
                .with_spatial3d()
                .with_spatial2d(cameras=["CAM_FRONT", "CAM_BACK"])
                .with_labels(label2id={"car": 1, "pedestrian": 2})
                .with_streetmap(latlon=[48.8566, 2.3522])
                .build(app_id="my_viewer")
            )
    """
    self.app_id = app_id
    self.config = config
    self.blueprint = self.config.to_blueprint()

    rr.init(
        application_id=self.app_id,
        recording_id=None,
        spawn=save_dir is None,
        default_enabled=True,
        strict=True,
        default_blueprint=self.blueprint,
    )

    # NOTE: rr.save() must be invoked before logging
    if save_dir is not None:
        self._start_saving(save_dir=save_dir)

    rr.log(self.config.map_entity, rr.ViewCoordinates.RIGHT_HAND_Z_UP, static=True)

    rr.log(
        self.config.map_entity,
        rr.AnnotationContext(
            [
                rr.AnnotationInfo(id=label_id, label=label)
                for label, label_id in self.label2id.items()
            ]
        ),
        static=True,
    )

render_box2ds(*args, **kwargs)

render_box2ds(
    seconds: float, boxes: Sequence[Box2D]
) -> None
render_box2ds(
    seconds: float,
    camera: str,
    rois: Sequence[RoiLike],
    class_ids: Sequence[int],
    uuids: Sequence[str] | None = None,
) -> None

Render 2D boxes.

Source code in t4_devkit/viewer/viewer.py
@_check_spatial2d
def render_box2ds(self, *args, **kwargs) -> None:
    """Render 2D boxes."""
    if len(args) + len(kwargs) == 2:
        self._render_box2ds_with_boxes(*args, **kwargs)
    else:
        self._render_box2ds_with_elements(*args, **kwargs)

render_box3ds(*args, **kwargs)

render_box3ds(
    seconds: float, boxes: Sequence[Box3D]
) -> None
render_box3ds(
    seconds: float,
    frame_id: str,
    centers: Sequence[Vector3Like],
    rotations: Sequence[RotationLike],
    sizes: Sequence[Vector3Like],
    class_ids: Sequence[int],
    velocities: Sequence[Vector3Like] | None = None,
    uuids: Sequence[str] | None = None,
    futures: Sequence[Future] | None = None,
) -> None

Render 3D boxes.

Source code in t4_devkit/viewer/viewer.py
@_check_spatial3d
def render_box3ds(self, *args, **kwargs) -> None:
    """Render 3D boxes."""
    if len(args) + len(kwargs) == 2:
        self._render_box3ds_with_boxes(*args, **kwargs)
    else:
        self._render_box3ds_with_elements(*args, **kwargs)

render_calibration(*args, **kwargs)

render_calibration(
    sensor: Sensor,
    calibration: CalibratedSensor,
    resolution: Vector2Like | None = None,
) -> None
render_calibration(
    channel: str,
    modality: str | SensorModality,
    translation: Vector3Like,
    rotation: RotationLike,
    camera_intrinsic: CameraIntrinsicLike | None = None,
    resolution: Vector2Like | None = None,
) -> None

Render a sensor calibration.

Source code in t4_devkit/viewer/viewer.py
@_check_spatial3d
def render_calibration(self, *args, **kwargs) -> None:
    """Render a sensor calibration."""
    if len(args) + len(kwargs) <= 3:
        self._render_calibration_with_schema(*args, **kwargs)
    else:
        self._render_calibration_without_schema(*args, **kwargs)

render_ego(*args, **kwargs)

render_ego(ego_pose: EgoPose) -> None
render_ego(
    seconds: float,
    translation: Vector3Like,
    rotation: RotationLike,
    geocoordinate: Vector3Like | None = None,
) -> None

Render an ego pose.

Source code in t4_devkit/viewer/viewer.py
@_check_spatial3d
def render_ego(self, *args, **kwargs) -> None:
    """Render an ego pose."""
    if len(args) + len(kwargs) == 1:
        self._render_ego_with_schema(*args, **kwargs)
    else:
        self._render_ego_without_schema(*args, **kwargs)

render_image(seconds, camera, image)

Render an image.

Parameters:

Name Type Description Default
seconds float

Timestamp in [sec].

required
camera str

Name of the camera channel.

required
image str | NDArrayU8

Image tensor or path of the image file.

required
Source code in t4_devkit/viewer/viewer.py
@_check_spatial2d
def render_image(self, seconds: float, camera: str, image: str | NDArrayU8) -> None:
    """Render an image.

    Args:
        seconds (float): Timestamp in [sec].
        camera (str): Name of the camera channel.
        image (str | NDArrayU8): Image tensor or path of the image file.
    """
    rr.set_time_seconds(self.config.timeline, seconds)

    if isinstance(image, str):
        rr.log(format_entity(self.config.ego_entity, camera), rr.ImageEncoded(path=image))
    else:
        rr.log(format_entity(self.config.ego_entity, camera), rr.Image(image))

render_map(filepath)

Render vector map.

Parameters:

Name Type Description Default
filepath str

Path to OSM file.

required
Source code in t4_devkit/viewer/viewer.py
@_check_filepath
@_check_spatial3d
def render_map(self, filepath: str) -> None:
    """Render vector map.

    Args:
        filepath (str): Path to OSM file.
    """
    parser = LaneletParser(filepath, verbose=False)

    root_entity = format_entity(self.config.map_entity, "vector_map")
    render_lanelets(parser, root_entity)
    render_traffic_elements(parser, root_entity)
    render_ways(parser, root_entity)

    render_geographic_borders(parser, f"{self.config.geocoordinate_entity}/vector_map")

render_pointcloud(seconds, channel, pointcloud, color_mode=PointCloudColorMode.DISTANCE)

Render pointcloud.

Parameters:

Name Type Description Default
seconds float

Timestamp in [sec].

required
channel str

Name of the pointcloud sensor channel.

required
pointcloud PointCloudLike

Inherence object of PointCloud.

required
color_mode PointCloudColorMode

Color mode for pointcloud.

DISTANCE
Source code in t4_devkit/viewer/viewer.py
@_check_spatial3d
def render_pointcloud(
    self,
    seconds: float,
    channel: str,
    pointcloud: PointCloudLike,
    color_mode: PointCloudColorMode = PointCloudColorMode.DISTANCE,
) -> None:
    """Render pointcloud.

    Args:
        seconds (float): Timestamp in [sec].
        channel (str): Name of the pointcloud sensor channel.
        pointcloud (PointCloudLike): Inherence object of `PointCloud`.
        color_mode (PointCloudColorMode, optional): Color mode for pointcloud.
    """
    # TODO(ktro2828): add support of rendering pointcloud on images
    rr.set_time_seconds(self.config.timeline, seconds)

    colors = pointcloud_color(pointcloud, color_mode=color_mode)
    rr.log(
        format_entity(self.config.ego_entity, channel),
        rr.Points3D(pointcloud.points[:3].T, colors=colors),
    )

render_segmentation2d(seconds, camera, masks, class_ids, uuids=None)

Render 2D segmentation image.

Parameters:

Name Type Description Default
seconds float

Timestamp in [sec].

required
camera str

Name of camera channel.

required
masks Sequence[NDArrayU8]

Sequence of segmentation mask of each instance, each mask is the shape of (W, H).

required
class_ids Sequence[int]

Sequence of label ids.

required
uuids Sequence[str | None] | None

Sequence of each instance ID.

None
Source code in t4_devkit/viewer/viewer.py
@_check_spatial2d
def render_segmentation2d(
    self,
    seconds: float,
    camera: str,
    masks: Sequence[NDArrayU8],
    class_ids: Sequence[int],
    uuids: Sequence[str | None] | None = None,
) -> None:
    """Render 2D segmentation image.

    Args:
        seconds (float): Timestamp in [sec].
        camera (str): Name of camera channel.
        masks (Sequence[NDArrayU8]): Sequence of segmentation mask of each instance,
            each mask is the shape of (W, H).
        class_ids (Sequence[int]): Sequence of label ids.
        uuids (Sequence[str | None] | None, optional): Sequence of each instance ID.
    """
    rr.set_time_seconds(self.config.timeline, seconds)

    batch = BatchSegmentation2D()
    if uuids is None:
        uuids = [None] * len(masks)
    for mask, class_id, uuid in zip(masks, class_ids, uuids, strict=True):
        batch.append(mask, class_id, uuid)

    rr.log(
        format_entity(self.config.ego_entity, camera, "segmentation"),
        batch.as_segmentation_image(),
    )

ViewerBuilder

Builder for creating a RerunViewer instance.

Examples:

>>> from t4_devkit.viewer import ViewerBuilder
>>> viewer = (
        ViewerBuilder()
        .with_spatial3d()
        .with_spatial2d(cameras=["CAM_FRONT", "CAM_BACK"])
        .with_labels(label2id={"car": 1, "pedestrian": 2})
        .with_streetmap(latlon=[48.8566, 2.3522])
        .build(app_id="my_viewer")
    )
Source code in t4_devkit/viewer/builder.py
class ViewerBuilder:
    """Builder for creating a RerunViewer instance.

    Examples:
        >>> from t4_devkit.viewer import ViewerBuilder
        >>> viewer = (
                ViewerBuilder()
                .with_spatial3d()
                .with_spatial2d(cameras=["CAM_FRONT", "CAM_BACK"])
                .with_labels(label2id={"car": 1, "pedestrian": 2})
                .with_streetmap(latlon=[48.8566, 2.3522])
                .build(app_id="my_viewer")
            )
    """

    def __init__(self) -> None:
        self._config = ViewerConfig()

    def with_spatial3d(self) -> Self:
        self._config.spatial3ds.append(rrb.Spatial3DView(name="3D", origin=ViewerConfig.map_entity))
        return self

    def with_spatial2d(self, cameras: Sequence[str]) -> Self:
        self._config.spatial2ds.extend(
            [
                rrb.Spatial2DView(name=name, origin=format_entity(ViewerConfig.ego_entity, name))
                for name in cameras
            ]
        )
        return self

    def with_labels(self, label2id: dict[str, int]) -> Self:
        self._config.label2id = label2id
        return self

    def with_streetmap(self, latlon: Vector2Like | None = None) -> Self:
        self._config.spatial3ds.append(
            rrb.MapView(name="Map", origin=self._config.geocoordinate_entity)
        )
        if latlon is not None:
            self._config.latlon = latlon
        return self

    def build(self, app_id: str, save_dir: str | None = None) -> RerunViewer:
        return RerunViewer(app_id=app_id, config=self._config, save_dir=save_dir)

ViewerConfig

Source code in t4_devkit/viewer/config.py
@define
class ViewerConfig:
    map_entity: ClassVar[str] = "map"
    ego_entity: ClassVar[str] = "map/base_link"
    geocoordinate_entity: ClassVar[str] = "geocoordinate"
    timeline: ClassVar[str] = "timeline"

    spatial3ds: list[rrb.SpaceView] = field(factory=list)
    spatial2ds: list[rrb.SpaceView] = field(factory=list)
    label2id: dict[str, int] = field(factory=dict)
    latlon: Vector2Like | None = field(default=None)

    def to_blueprint(self) -> rrb.BlueprintLike:
        """Return the recording blueprint."""
        views = []
        if self.spatial3ds:
            views.append(rrb.Horizontal(*self.spatial3ds, column_shares=[3, 1]))
        if self.spatial2ds:
            views.append(rrb.Grid(*self.spatial2ds))

        return rrb.Vertical(*views, row_shares=[4, 2])

    def has_spatial3d(self) -> bool:
        """Return `True` if the configuration contains 3D view space."""
        return len(self.spatial3ds) > 0

    def has_spatial2d(self) -> bool:
        """Return `True` if the configuration contains 2D view space."""
        return len(self.spatial2ds) > 0

has_spatial2d()

Return True if the configuration contains 2D view space.

Source code in t4_devkit/viewer/config.py
def has_spatial2d(self) -> bool:
    """Return `True` if the configuration contains 2D view space."""
    return len(self.spatial2ds) > 0

has_spatial3d()

Return True if the configuration contains 3D view space.

Source code in t4_devkit/viewer/config.py
def has_spatial3d(self) -> bool:
    """Return `True` if the configuration contains 3D view space."""
    return len(self.spatial3ds) > 0

to_blueprint()

Return the recording blueprint.

Source code in t4_devkit/viewer/config.py
def to_blueprint(self) -> rrb.BlueprintLike:
    """Return the recording blueprint."""
    views = []
    if self.spatial3ds:
        views.append(rrb.Horizontal(*self.spatial3ds, column_shares=[3, 1]))
    if self.spatial2ds:
        views.append(rrb.Grid(*self.spatial2ds))

    return rrb.Vertical(*views, row_shares=[4, 2])

calculate_geodetic_point(position, origin)

Transform a position in a map coordinate system to a position in a geodetic coordinate system.

Parameters:

Name Type Description Default
position Vector3Like

3D position in a map coordinate system.

required
origin Vector2Like

Map origin position in a geodetic coordinate system, which is (latitude, longitude).

required

Returns:

Type Description
Vector2

Transformed position in a geodetic coordinate system, which is (latitude, longitude).

Source code in t4_devkit/viewer/geography.py
def calculate_geodetic_point(position: Vector3Like, origin: Vector2Like) -> Vector2:
    """Transform a position in a map coordinate system to a position in a geodetic coordinate system.

    Args:
        position (Vector3Like): 3D position in a map coordinate system.
        origin (Vector2Like): Map origin position in a geodetic coordinate system,
            which is (latitude, longitude).

    Returns:
        Transformed position in a geodetic coordinate system, which is (latitude, longitude).
    """
    x, y, _ = Vector3(position)
    bearing = math.atan2(x, y)
    distance = math.hypot(x, y)

    latitude, longitude = np.radians(Vector2(origin))
    angular_distance = distance / EARTH_RADIUS_METERS

    target_latitude = math.asin(
        math.sin(latitude) * math.cos(angular_distance)
        + math.cos(latitude) * math.sin(angular_distance) * math.cos(bearing)
    )
    target_longitude = longitude + math.atan2(
        math.sin(bearing) * math.sin(angular_distance) * math.cos(latitude),
        math.cos(angular_distance) - math.sin(latitude) * math.sin(target_latitude),
    )

    return Vector2(math.degrees(target_latitude), math.degrees(target_longitude))

format_entity(*entities)

Format entity path.

Parameters:

Name Type Description Default
*entities Sequence[str]

Entity path(s).

()

Returns:

Type Description
str

Formatted entity path.

Examples:

>>> format_entity("map")
"map"
>>> format_entity("map", "map/base_link")
"map/base_link"
>>> format_entity("map", "map/base_link", "camera")
"map/base_link/camera"
Source code in t4_devkit/viewer/config.py
def format_entity(*entities: Sequence[str]) -> str:
    """Format entity path.

    Args:
        *entities: Entity path(s).

    Returns:
        Formatted entity path.

    Examples:
        >>> format_entity("map")
        "map"
        >>> format_entity("map", "map/base_link")
        "map/base_link"
        >>> format_entity("map", "map/base_link", "camera")
        "map/base_link/camera"
    """
    if not entities:
        return ""

    flattened = []
    for entity in entities:
        for part in entity.split("/"):
            if part and flattened and flattened[-1] == part:
                continue
            flattened.append(part)
    return "/".join(flattened)

pointcloud_color(pointcloud, color_mode=PointCloudColorMode.DISTANCE)

Return color map depending on the specified color mode.

Parameters:

Name Type Description Default
pointcloud PointCloudLike

Any inheritance of PointCloud class.

required
color_mode PointCloudColorMode

Color mode for pointcloud.

DISTANCE
Source code in t4_devkit/viewer/color.py
def pointcloud_color(
    pointcloud: PointCloudLike,
    color_mode: PointCloudColorMode = PointCloudColorMode.DISTANCE,
) -> NDArrayF64:
    """Return color map depending on the specified color mode.

    Args:
        pointcloud (PointCloudLike): Any inheritance of `PointCloud` class.
        color_mode (PointCloudColorMode, optional): Color mode for pointcloud.
    """
    match color_mode:
        case PointCloudColorMode.DISTANCE:
            values = np.linalg.norm(pointcloud.points[:3].T, axis=1)
        case PointCloudColorMode.INTENSITY:
            values = pointcloud.points[3]
        case _:
            raise ValueError(f"Unsupported color mode: {color_mode}")

    return _normalize_color(values)