Source code for genesis.engine.entities.emitter

import gstaichi as ti
import numpy as np
import torch

import genesis as gs
import genesis.utils.geom as gu
import genesis.utils.particle as pu
from genesis.repr_base import RBC


[docs]@ti.data_oriented class Emitter(RBC): """ A particle emitter for fluid or material simulation. The Emitter manages the generation of particles into the simulation domain, allowing directional or omnidirectional emissions with various droplet shapes. It supports resetting, shape-based emission, and spherical omni-emission. Parameters ---------- max_particles : int The maximum number of particles that this emitter can handle. """ def __init__(self, max_particles): self._uid = gs.UID() self._entity = None self._max_particles = max_particles self._acc_droplet_len = 0.0 # accumulated droplet length to be emitted gs.logger.info( f"Creating ~<{self._repr_type()}>~. id: ~~~<{self._uid}>~~~, max_particles: ~<{max_particles}>~." )
[docs] def set_entity(self, entity): """ Assign an entity to the emitter and initialize relevant simulation and solver references. Parameters ---------- entity : Entity The entity to associate with the emitter. This entity should contain the solver, simulation context, and particle sampler. """ self._entity = entity self._sim = entity.sim self._solver = entity.solver self._next_particle = 0 gs.logger.info(f"~<{self._repr_briefer()}>~ created using ~<{entity._repr_briefer()}.")
[docs] def reset(self): """ Reset the emitter's internal particle index to start emitting from the beginning. """ self._next_particle = 0
[docs] def emit( self, droplet_shape, droplet_size, droplet_length=None, pos=(0.5, 0.5, 1.0), direction=(0, 0, -1), theta=0.0, speed=1.0, p_size=None, ): """ Emit particles in a specified shape and direction from a nozzle. Parameters ---------- droplet_shape : str The shape of the emitted droplet. Options: "circle", "sphere", "square", "rectangle". droplet_size : float or tuple Size of the droplet. A single float for symmetric shapes, or a tuple of (width, height) for rectangles. droplet_length : float, optional Length of the droplet in the emitting direction. If None, calculated from speed and simulation timing. pos : tuple of float World position of the nozzle from which the droplet is emitted. direction : tuple of float Direction vector of the emitted droplet. theta : float Rotation angle (in radians) around the droplet axis. speed : float Emission speed of the particles. p_size : float, optional Particle size used for filling the droplet. Defaults to the solver's particle size. Raises ------ Exception If the shape is unsupported or the emission would place particles outside the simulation boundary. """ assert self._entity is not None if droplet_shape in ["circle", "sphere", "square"]: assert isinstance(droplet_size, (int, float)) elif droplet_shape == "rectangle": assert isinstance(droplet_size, (tuple, list)) and len(droplet_size) == 2 else: gs.raise_exception(f"Unsupported nozzle shape: {droplet_shape}.") direction = np.asarray(direction, dtype=gs.np_float) if np.linalg.norm(direction) < gs.EPS: gs.raise_exception("Zero-length direction.") else: direction = gu.normalize(direction) p_size = self._entity.particle_size if p_size is None else p_size if droplet_length is None: # Use the speed to determine the length of the droplet in the emitting direction droplet_length = speed * self._solver.substep_dt * self._sim.substeps + self._acc_droplet_len if droplet_length < p_size: # too short, so we should not emit self._acc_droplet_len = droplet_length droplet_length = 0.0 else: self._acc_droplet_len = 0.0 if droplet_length > 0.0: if droplet_shape == "circle": positions = pu.cylinder_to_particles( p_size=p_size, radius=droplet_size / 2, height=droplet_length, sampler=self._entity.sampler, ) elif droplet_shape == "sphere": # sphere droplet ignores droplet_length positions = pu.sphere_to_particles( p_size=p_size, radius=droplet_size / 2, sampler=self._entity.sampler, ) elif droplet_shape == "square": positions = pu.box_to_particles( p_size=p_size, size=np.array([droplet_size, droplet_size, droplet_length]), sampler=self._entity.sampler, ) elif droplet_shape == "rectangle": positions = pu.box_to_particles( p_size=p_size, size=np.array([droplet_size[0], droplet_size[1], droplet_length]), sampler=self._entity.sampler, ) else: gs.raise_exception(f"Unsupported droplet shape '{droplet_shape}'") positions = gu.transform_by_trans_R( positions.astype(gs.np_float, copy=False), np.asarray(pos, dtype=gs.np_float), gu.z_up_to_R(direction) @ gu.axis_angle_to_R(np.array([0.0, 0.0, 1.0], dtype=gs.np_float), theta), ) if not self._solver.boundary.is_inside(positions): gs.raise_exception("Emitted particles are outside the boundary.") n_particles = len(positions) # Expand vels with batch dimension vels = speed * direction if n_particles > self._entity.n_particles: gs.raise_exception( f"Number of particles to emit ({n_particles}) at the current step is larger than the maximum " f"number of particles ({self._entity.n_particles})." ) particles_idx = torch.arange( self._next_particle, self._next_particle + n_particles, dtype=gs.tc_int, device=gs.device ) self._entity.set_particles_pos(positions, particles_idx) self._entity.set_particles_vel(vels, particles_idx) self._entity.set_particles_active(gs.ACTIVE, particles_idx) self._next_particle += n_particles # recycle particles if self._next_particle + n_particles > self._entity.n_particles: self._next_particle = 0 gs.logger.debug(f"Emitted {n_particles} particles. Next particle index: {self._next_particle}.") else: gs.logger.debug("Droplet length is too short for current step. Skipping to next step.")
[docs] def emit_omni(self, source_radius=0.1, pos=(0.5, 0.5, 1.0), speed=1.0, particle_size=None): """ Use a sphere-shaped source to emit particles in all directions. Parameters: ---------- source_radius: float, optional The radius of the sphere source. Particles will be emitted from a shell with inner radius using '0.8 * source_radius' and outer radius using source_radius. pos: array_like, shape=(3,) The center of the sphere source. speed: float The speed of the emitted particles. particle_size: float | None The size (diameter) of the emitted particles. The actual number of particles emitted is determined by the volume of the sphere source and the size of the particles. If None, the solver's particle size is used. Note that this particle size only affects computation for number of particles emitted, not the actual size of the particles in simulation and rendering. """ assert self._entity is not None pos = np.asarray(pos, dtype=gs.np_float) if particle_size is None: particle_size = self._entity.particle_size positions_ = pu.shell_to_particles( p_size=particle_size, outer_radius=source_radius, inner_radius=source_radius * 0.4, sampler=self._entity.sampler, ) positions = pos + positions_ if not self._solver.boundary.is_inside(positions): gs.raise_exception("Emitted particles are outside the boundary.") dists = np.linalg.norm(positions_, axis=1) positions[dists < gs.EPS] = gs.EPS vels = (speed / (dists[:, None] + gs.EPS)) * positions_ n_particles = len(positions) if n_particles > self._entity.n_particles: gs.raise_exception( f"Number of particles to emit ({n_particles}) at the current step is larger than the maximum number " f"of particles ({self._entity.n_particles})." ) particles_idx = torch.arange( self._next_particle, self._next_particle + n_particles, dtype=gs.tc_int, device=gs.device ) self._entity.set_particles_pos(positions, particles_idx) self._entity.set_particles_vel(vels, particles_idx) self._entity.set_particles_active(gs.ACTIVE, particles_idx) self._next_particle += n_particles # recycle particles if self._next_particle + n_particles > self._entity.n_particles: self._next_particle = 0 gs.logger.debug(f"Emitted {n_particles} particles. Next particle index: {self._next_particle}.")
@property def uid(self): """The unique identifier of the emitter.""" return self._uid @property def entity(self): """The entity associated with the emitter.""" return self._entity @property def max_particles(self): """The maximum number of particles this emitter can emit.""" return self._max_particles @property def solver(self): """The solver used by the emitter's associated entity.""" return self._solver @property def next_particle(self): """The index of the next particle to be emitted.""" return self._next_particle