Source code for genesis.recorders.recorder_manager

from typing import TYPE_CHECKING, Any, Callable, Type

import genesis as gs

if TYPE_CHECKING:
    from .base_recorder import Recorder, RecorderOptions


[docs]class RecorderManager: """ Manage the creation, processing, and cleanup of all data recorders. Parameters ---------- step_dt: float The simulation time step. """ RECORDER_TYPES_MAP = {} def __init__(self, step_dt: float): self._step_dt = step_dt self._recorders: list["Recorder"] = [] self._is_recording = False self._is_built = False
[docs] @gs.assert_unbuilt def add_recorder(self, data_func: Callable[[], Any], rec_options: "RecorderOptions") -> "Recorder": """ Automatically read and process data. See RecorderOptions for more details. Parameters ---------- data_func: Callable[[], Any] A function with no arguments that returns the data to be recorded. rec_options: RecorderOptions The options for the recorder which determines how the data is recorded and processed. Returns ------- recorder : Recorder The created recorder object. """ recorder_cls = RecorderManager.RECORDER_TYPES_MAP[type(rec_options)] recorder = recorder_cls(self, rec_options, data_func) self._recorders.append(recorder) return recorder
[docs] @gs.assert_unbuilt def build(self): """Start data recording.""" for recorder in self._recorders: recorder.build() recorder.start() self._is_recording = True self._is_built = True
[docs] @gs.assert_built def stop(self): """Stop and complete data recording.""" if not self._is_recording: gs.logger.warning("[DataRecorder] Ignoring stop(): data recording is not active.") else: self._is_recording = False for recorder in self._recorders: recorder.stop() self._recorders.clear()
[docs] @gs.assert_built def reset(self, envs_idx=None): for recorder in self._recorders: recorder.reset(envs_idx) recorder.start()
[docs] @gs.assert_built def step(self, global_step: int): """ Increment the step count and process data from each recording configuration. In threaded mode, data is put in queues. In non-threaded mode, data is processed synchronously. """ if not self._is_recording: return for recorder in self._recorders: recorder.step(global_step)
@property def is_recording(self) -> bool: return self._is_recording @property def is_built(self) -> bool: return self._is_built
def register_recording(options_cls: Type["RecorderOptions"]): def _impl(recorder_cls: Type["Recorder"]): RecorderManager.RECORDER_TYPES_MAP[options_cls] = recorder_cls return recorder_cls return _impl