# Data Generation Classes
This section discusses the key classes related to data generation.
## Data Generator
The `DataGenerator` class (`datagen/data_generator.py`) is responsible for generating new demonstration trajectories. First, the internal `_load_dataset` method is used to parse the source dataset (using [Subtask Termination Signals](https://mimicgen.github.io/docs/tutorials/subtask_termination_signals.html)) into source subtask segments. Each segment is a sequence of [DatagenInfo](https://mimicgen.github.io/docs/modules/datagen.html#datagen-info) objects. Then, the `generate` method is called repeatedly (by the main script `scripts/generate_dataset.py`) to keep attempting to generate new trajectories using the source subtask segments. During each new attempt, the `select_source_demo` method is used to employ a [SelectionStrategy](https://mimicgen.github.io/docs/modules/datagen.html#selection-strategy) to pick a reference source subtask segment to transform. [WaypointTrajectory](https://mimicgen.github.io/docs/modules/datagen.html#waypoint) objects are used to transform and compose subtask segments together.
## Datagen Info
DatagenInfo objects keep track of important information used during data generation. These objects are added to source demonstrations with the `prepare_src_dataset.py` script, or provided directly by an [Environment Interface](https://mimicgen.github.io/docs/modules/env_interfaces.html) object.
The structure of the object is below:
```python
class DatagenInfo(object):
"""
Structure of information needed from an environment for data generation. To allow for
flexibility, not all information must be present.
"""
def __init__(
self,
eef_pose=None,
object_poses=None,
subtask_term_signals=None,
target_pose=None,
gripper_action=None,
):
"""
Args:
eef_pose (np.array or None): robot end effector poses of shape [..., 4, 4]
object_poses (dict or None): dictionary mapping object name to object poses
of shape [..., 4, 4]
subtask_term_signals (dict or None): dictionary mapping subtask name to a binary
indicator (0 or 1) on whether subtask has been completed. Each value in the
dictionary could be an int, float, or np.array of shape [..., 1].
target_pose (np.array or None): target end effector poses of shape [..., 4, 4]
gripper_action (np.array or None): gripper actions of shape [..., D] where D
is the dimension of the gripper actuation action for the robot arm
"""
```
## Selection Strategy
Note
See Appendix N.3 in the MimicGen paper for a more thorough explanation of source subtask segment selection and some further intuition on when to use different settings.
Each data generation attempt requires choosing one or more subtask segments from the source demonstrations to transform -- this is carried out by a SelectionStrategy instance:
```python
@six.add_metaclass(MG_SelectionStrategyMeta)
class MG_SelectionStrategy(object):
"""
Defines methods and functions for selection strategies to implement.
"""
def __init__(self):
pass
@property
@classmethod
def NAME(self):
"""
This name (str) will be used to register the selection strategy class in the global
registry.
"""
raise NotImplementedError
@abc.abstractmethod
def select_source_demo(
self,
eef_pose,
object_pose,
src_subtask_datagen_infos,
):
"""
Selects source demonstration index using the current robot pose, relevant object pose
for the current subtask, and relevant information from the source demonstrations for the
current subtask.
Args:
eef_pose (np.array): current 4x4 eef pose
object_pose (np.array): current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list): DatagenInfo instance for the relevant subtask segment
in the source demonstrations
Returns:
source_demo_ind (int): index of source demonstration - indicates which source subtask segment to use
"""
raise NotImplementedError
```
Every SelectionStrategy class must subclass this base class and implement the `NAME` and `select_source_demo` methods. The `NAME` field is used to register the SelectionStrategy class into the global registry, and `select_source_demo` implements the heuristic for selecting a source demonstration index.
Each data generation config json specifies how source segment selection should be done during data generation. First, `config.experiment.generation.select_src_per_subtask` determines whether to select a different source demonstration for each subtask during data generation, or keep the same source demonstration as the one used for the first subtask. This corresponds to the `per-subtask` parameter described in the "Selection Frequency" paragraph of Appendix N.3 in the paper.
The specific task config (`config.task.task_spec`), which corresponds to the [Task Spec](https://mimicgen.github.io/docs/modules/task_spec.html) object used in data generation, also specifies the selection strategy to use for each subtask via the `selection_strategy` parameter and the `selection_strategy_kwargs` parameter. The `selection_strategy` parameter corresponds to the `NAME` for the SelectionStrategy class, and the `selection_strategy_kwargs` correspond to any additional kwargs to specify when invoking the `select_source_demo` method.
Note
Note that if `config.experiment.generation.select_src_per_subtask` is False, only the first subtask's selection strategy matters, since the selected source demonstration will be used for the remainder of the data generation attempt.
As an example, the `NearestNeighborObjectStrategy` (see implementation below) can be specified by passing `nearest_neighbor_object` for the `selection_strategy` parameter and you can use the `selection_strategy_kwargs` parameter to specify a dictionary containing values for the `pos_weight`, `rot_weight`, and `nn_k` parameters.
```python
class NearestNeighborObjectStrategy(MG_SelectionStrategy):
"""
Pick source demonstration to be the one with the closest object pose to the object
in the current scene.
"""
# name for registering this class into registry
NAME = "nearest_neighbor_object"
def select_source_demo(
self,
eef_pose,
object_pose,
src_subtask_datagen_infos,
pos_weight=1.,
rot_weight=1.,
nn_k=3,
):
"""
Selects source demonstration index using the current robot pose, relevant object pose
for the current subtask, and relevant information from the source demonstrations for the
current subtask.
Args:
eef_pose (np.array): current 4x4 eef pose
object_pose (np.array): current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list): DatagenInfo instance for the relevant subtask segment
in the source demonstrations
pos_weight (float): weight on position for minimizing pose distance
rot_weight (float): weight on rotation for minimizing pose distance
nn_k (int): pick source demo index uniformly at randomly from the top @nn_k nearest neighbors
Returns:
source_demo_ind (int): index of source demonstration - indicates which source subtask segment to use
"""
```
## Waypoint
### Waypoint Class Variants
MimicGen uses a collection of convenience classes to represent waypoints and trajectories (`datagen/waypoint.py`).
The `Waypoint` class represents a single 6-DoF target pose and the gripper action for that timestep:
```python
class Waypoint(object):
"""
Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point.
"""
def __init__(self, pose, gripper_action, noise=None):
```
The `WaypointSequence` class represents a sequence of these `Waypoint` objects:
```python
class WaypointSequence(object):
"""
Represents a sequence of Waypoint objects.
"""
def __init__(self, sequence=None):
```
It can easily be instantiated from a collection of poses (e.g. `WaypointSequence.from_poses`):
```python
@classmethod
def from_poses(cls, poses, gripper_actions, action_noise):
"""
Instantiate a WaypointSequence object given a sequence of poses,
gripper actions, and action noise.
Args:
poses (np.array): sequence of pose matrices of shape (T, 4, 4)
gripper_actions (np.array): sequence of gripper actions
that should be applied at each timestep of shape (T, D).
action_noise (float or np.array): sequence of action noise
magnitudes that should be applied at each timestep. If a
single float is provided, the noise magnitude will be
constant over the trajectory.
"""
```
Finally, the `WaypointTrajectory` class is a sequence of the `WaypointSequence` objects, and is a convenient way to represent 6-DoF trajectories and execute them:
```python
class WaypointTrajectory(object):
"""
A sequence of WaypointSequence objects that corresponds to a full 6-DoF trajectory.
"""
```
`WaypointSequence` objects can be added directly to a `WaypointTrajectory` object:
```python
def add_waypoint_sequence(self, sequence):
"""
Directly append sequence to list (no interpolation).
Args:
sequence (WaypointSequence instance): sequence to add
"""
```
Interpolation segments can also be added easily using this helper method:
```python
def add_waypoint_sequence_for_target_pose(
self,
pose,
gripper_action,
num_steps,
skip_interpolation=False,
action_noise=0.,
):
"""
Adds a new waypoint sequence corresponding to a desired target pose. A new WaypointSequence
will be constructed consisting of @num_steps intermediate Waypoint objects. These can either
be constructed with linear interpolation from the last waypoint (default) or be a
constant set of target poses (set @skip_interpolation to True).
Args:
pose (np.array): 4x4 target pose
gripper_action (np.array): value for gripper action
num_steps (int): number of action steps when trying to reach this waypoint. Will
add intermediate linearly interpolated points between the last pose on this trajectory
and the target pose, so that the total number of steps is @num_steps.
skip_interpolation (bool): if True, keep the target pose fixed and repeat it @num_steps
times instead of using linearly interpolated targets.
action_noise (float): scale of random gaussian noise to add during action execution (e.g.
when @execute is called)
"""
```
The `merge` method is a thin wrapper around the above method, easily allowing for linear interpolation between two `WaypointTrajectory` objects:
```python
def merge(
self,
other,
num_steps_interp=None,
num_steps_fixed=None,
action_noise=0.,
):
"""
Merge this trajectory with another (@other).
Args:
other (WaypointTrajectory object): the other trajectory to merge into this one
num_steps_interp (int or None): if not None, add a waypoint sequence that interpolates
between the end of the current trajectory and the start of @other
num_steps_fixed (int or None): if not None, add a waypoint sequence that has constant
target poses corresponding to the first target pose in @other
action_noise (float): noise to use during the interpolation segment
"""
```
Finally, the `execute` method makes it easy to execute the waypoint sequences in the simulation environment:
```python
def execute(
self,
env,
env_interface,
render=False,
video_writer=None,
video_skip=5,
camera_names=None,
):
"""
Main function to execute the trajectory. Will use env_interface.target_pose_to_action to
convert each target pose at each waypoint to an action command, and pass that along to
env.step.
Args:
env (robomimic EnvBase instance): environment to use for executing trajectory
env_interface (MG_EnvInterface instance): environment interface for executing trajectory
render (bool): if True, render on-screen
video_writer (imageio writer): video writer
video_skip (int): determines rate at which environment frames are written to video
camera_names (list): determines which camera(s) are used for rendering. Pass more than
one to output a video with multiple camera views concatenated horizontally.
Returns:
results (dict): dictionary with the following items for the executed trajectory:
states (list): simulator state at each timestep
observations (list): observation dictionary at each timestep
datagen_infos (list): datagen_info at each timestep
actions (list): action executed at each timestep
success (bool): whether the trajectory successfully solved the task or not
"""
```
### Waypoint Class Usage during Data Generation
Each data generation attempt consists of executing a particular sequence of waypoints for each subtask. A `WaypointTrajectory` object is constructed and executed for each subtask in the `generate` method of the `DataGenerator`.
For a given subtask, a `WaypointTrajectory` object is initialized with a single pose (usually the current robot end effector pose, or the last target pose from the previous subtask execution attempt). Next, a reference source subtask segment is selected, and then transformed using the `transform_source_data_segment_using_object_pose` method from `utils/pose_utils.py`. It is then merged into the trajectory object with linear interpolation using the `merge` method. Finally, the `execute` method is used to carry out the subtask. This process repeats for each subtask