Source code for genesis.vis.visualizer

import genesis as gs
from genesis.repr_base import RBC

from .camera import Camera
from .rasterizer import Rasterizer

VIEWER_DEFAULT_HEIGHT_RATIO = 0.5
VIEWER_DEFAULT_ASPECT_RATIO = 0.75


class DummyViewerLock:
    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        pass


[docs]class Visualizer(RBC): """ This abstraction layer manages viewer and renderers. """ def __init__(self, scene, show_viewer, vis_options, viewer_options, renderer_options): self._t = -1 self._scene = scene self._is_built = False self._context = None self._viewer = None self._rasterizer = None self._raytracer = None self._batch_renderer = None self.viewer_lock = DummyViewerLock() # Rasterizer context is shared by viewer and rasterizer try: from .rasterizer_context import RasterizerContext from .viewer import Viewer except Exception as e: gs.raise_exception_from("Rendering not working on this machine.", e) self._context = RasterizerContext(vis_options) try: screen_height, _screen_width, screen_scale = gs.utils.try_get_display_size() self._has_display = True except Exception as e: if show_viewer: gs.raise_exception_from("No display detected. Use `show_viewer=False` for headless mode.", e) self._has_display = False if show_viewer: if gs._scene_registry: gs.raise_exception( "Interactive viewer not supported when managing multiple scenes. Please set `show_viewer=False` " "or call `del scene`." ) if viewer_options.res is None: viewer_height = (screen_height * screen_scale) * VIEWER_DEFAULT_HEIGHT_RATIO viewer_width = viewer_height / VIEWER_DEFAULT_ASPECT_RATIO viewer_options.res = (int(viewer_width), int(viewer_height)) if viewer_options.run_in_thread is None: if gs.platform == "Linux": viewer_options.run_in_thread = True elif gs.platform == "macOS": viewer_options.run_in_thread = False elif gs.platform == "Windows": viewer_options.run_in_thread = True if gs.platform == "macOS" and viewer_options.run_in_thread: gs.raise_exception("Running viewer in background thread is not supported on MacOS.") self._viewer = Viewer(viewer_options, self._context) if not viewer_options.run_in_thread: gs.logger.warning( "Interactive viewer running in main thread. It will only be responsive if a simulation is running." ) # Rasterizer is always needed for depth and segmentation mask rendering. self._rasterizer = Rasterizer(self._viewer, self._context) if isinstance(renderer_options, gs.renderers.BatchRenderer): from .batch_renderer import BatchRenderer self._renderer = self._batch_renderer = BatchRenderer(self, renderer_options, vis_options) elif isinstance(renderer_options, gs.renderers.RayTracer): from .raytracer import Raytracer self._renderer = self._raytracer = Raytracer(renderer_options, vis_options) elif isinstance(renderer_options, gs.renderers.Rasterizer): self._renderer = self._rasterizer self._cameras = gs.List() def __del__(self): self.destroy()
[docs] def destroy(self): if self._rasterizer is not None: self._rasterizer.destroy() self._rasterizer = None if self._viewer is not None: self._viewer.stop() self._viewer = None if self._batch_renderer is not None: self._batch_renderer.destroy() self._batch_renderer = None if self._raytracer is not None: self._raytracer.destroy() self._raytracer = None if self._context is not None: self._context.destroy() del self._context self._context = None self.viewer_lock = None self._renderer = None
[docs] def add_camera( self, res, pos, lookat, up, model, fov, aperture, focus_dist, GUI, spp, denoise, near, far, env_idx, debug ): cam_idx = len([camera for camera in self._cameras if camera.debug == debug]) camera = Camera( self, cam_idx, model, res, pos, lookat, up, fov, aperture, focus_dist, GUI, spp, denoise, near, far, env_idx=env_idx, debug=debug, ) self._cameras.append(camera) return camera
[docs] def add_mesh_light(self, mesh, color, intensity, pos, quat, revert_dir, double_sided, cutoff): if self._raytracer is not None: self._raytracer.add_mesh_light(mesh, color, intensity, pos, quat, revert_dir, double_sided, cutoff) else: gs.raise_exception("`add_mesh_light` is specific to raytracer renderer.")
[docs] def add_light(self, pos, dir, color, intensity, directional, castshadow, cutoff, attenuation): if self._batch_renderer is not None: self._batch_renderer.add_light(pos, dir, color, intensity, directional, castshadow, cutoff, attenuation) else: gs.raise_exception("`add_light` is specific to batch renderer.")
[docs] @gs.assert_built def reset(self): self._t = -1 self._context.reset() if self._raytracer is not None: self._raytracer.reset() if self._batch_renderer is not None: self._batch_renderer.reset() if self._viewer is not None: self._viewer.update(auto_refresh=True)
[docs] def build(self): self._context.build(self._scene) if self._viewer is not None: self._viewer.build(self._scene) self.viewer_lock = self._viewer.lock self._rasterizer.build() if self._raytracer is not None: self._raytracer.build(self._scene) for camera in self._cameras: camera.build() # Batch renderer needs to be built after cameras are built if self._batch_renderer is not None: self._batch_renderer.build() # Fully initialized at this point self._is_built = True # Make sure that the viewer is fully compiled and in a clean state self.reset()
[docs] def update(self, force=True, auto=None): if force: # force update self.reset() elif self._viewer is not None: if self._viewer.is_alive(): self._viewer.update(auto_refresh=auto, force=force) else: gs.raise_exception("Viewer closed.")
[docs] def update_visual_states(self, force_render: bool = False): """ Update all visualization-only variables here. """ # Early return if already updated previously if not force_render and self._t >= self.scene._t: return for camera in self._cameras: if camera.is_built: if camera._attached_link is not None: camera.move_to_attach() elif camera._followed_entity is not None: camera.update_following() if self._scene.rigid_solver.is_active: self._scene.rigid_solver.update_geoms_render_T() self._scene.rigid_solver.update_vgeoms() # drone propellers for entity in self._scene.rigid_solver.entities: if isinstance(entity, gs.engine.entities.DroneEntity): entity.update_propeller_vgeoms() self._scene.rigid_solver.update_vgeoms_render_T() if self._scene.mpm_solver.is_active: self._scene.mpm_solver.update_render_fields() if self._scene.sph_solver.is_active: self._scene.sph_solver.update_render_fields() if self._scene.pbd_solver.is_active: self._scene.pbd_solver.update_render_fields() self._t = self._scene._t
[docs] def colorize_seg_idxc_arr(self, seg_idxc_arr): if self._batch_renderer is not None: return self._batch_renderer.colorize_seg_idxc_arr(seg_idxc_arr) else: return self._context.colorize_seg_idxc_arr(seg_idxc_arr)
# ------------------------------------------------------------------------------------ # ----------------------------------- properties ------------------------------------- # ------------------------------------------------------------------------------------ @property def is_built(self) -> bool: return self._is_built @property def viewer(self): return self._viewer @property def rasterizer(self): return self._rasterizer @property def batch_renderer(self): return self._batch_renderer @property def context(self): return self._context @property def raytracer(self): return self._raytracer @property def renderer(self): return self._renderer @property def scene(self): return self._scene @property def has_display(self): return self._has_display @property def cameras(self): return self._cameras @property def segmentation_idx_dict(self): if self._batch_renderer is not None: return self._batch_renderer.seg_idxc_map else: return self._context.seg_idxc_map