Data Generation Classes
Contents
Data Generation Classes#
This section discusses the key classes related to data generation.
Data Generator#
The DataGenerator
class (datagen/data_generator.py
) is responsible for generating new demonstration trajectories. First, the internal _load_dataset
method is used to parse the source dataset (using Subtask Termination Signals) into source subtask segments. Each segment is a sequence of DatagenInfo objects. Then, the generate
method is called repeatedly (by the main script scripts/generate_dataset.py
) to keep attempting to generate new trajectories using the source subtask segments. During each new attempt, the select_source_demo
method is used to employ a SelectionStrategy to pick a reference source subtask segment to transform. WaypointTrajectory objects are used to transform and compose subtask segments together.
Datagen Info#
DatagenInfo objects keep track of important information used during data generation. These objects are added to source demonstrations with the prepare_src_dataset.py
script, or provided directly by an Environment Interface object.
The structure of the object is below:
class DatagenInfo(object):
"""
Structure of information needed from an environment for data generation. To allow for
flexibility, not all information must be present.
"""
def __init__(
self,
eef_pose=None,
object_poses=None,
subtask_term_signals=None,
target_pose=None,
gripper_action=None,
):
"""
Args:
eef_pose (np.array or None): robot end effector poses of shape [..., 4, 4]
object_poses (dict or None): dictionary mapping object name to object poses
of shape [..., 4, 4]
subtask_term_signals (dict or None): dictionary mapping subtask name to a binary
indicator (0 or 1) on whether subtask has been completed. Each value in the
dictionary could be an int, float, or np.array of shape [..., 1].
target_pose (np.array or None): target end effector poses of shape [..., 4, 4]
gripper_action (np.array or None): gripper actions of shape [..., D] where D
is the dimension of the gripper actuation action for the robot arm
"""
Selection Strategy#
Note
See Appendix N.3 in the MimicGen paper for a more thorough explanation of source subtask segment selection and some further intuition on when to use different settings.
Each data generation attempt requires choosing one or more subtask segments from the source demonstrations to transform – this is carried out by a SelectionStrategy instance:
@six.add_metaclass(MG_SelectionStrategyMeta)
class MG_SelectionStrategy(object):
"""
Defines methods and functions for selection strategies to implement.
"""
def __init__(self):
pass
@property
@classmethod
def NAME(self):
"""
This name (str) will be used to register the selection strategy class in the global
registry.
"""
raise NotImplementedError
@abc.abstractmethod
def select_source_demo(
self,
eef_pose,
object_pose,
src_subtask_datagen_infos,
):
"""
Selects source demonstration index using the current robot pose, relevant object pose
for the current subtask, and relevant information from the source demonstrations for the
current subtask.
Args:
eef_pose (np.array): current 4x4 eef pose
object_pose (np.array): current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list): DatagenInfo instance for the relevant subtask segment
in the source demonstrations
Returns:
source_demo_ind (int): index of source demonstration - indicates which source subtask segment to use
"""
raise NotImplementedError
Every SelectionStrategy class must subclass this base class and implement the NAME
and select_source_demo
methods. The NAME
field is used to register the SelectionStrategy class into the global registry, and select_source_demo
implements the heuristic for selecting a source demonstration index.
Each data generation config json specifies how source segment selection should be done during data generation. First, config.experiment.generation.select_src_per_subtask
determines whether to select a different source demonstration for each subtask during data generation, or keep the same source demonstration as the one used for the first subtask. This corresponds to the per-subtask
parameter described in the “Selection Frequency” paragraph of Appendix N.3 in the paper.
The specific task config (config.task.task_spec
), which corresponds to the Task Spec object used in data generation, also specifies the selection strategy to use for each subtask via the selection_strategy
parameter and the selection_strategy_kwargs
parameter. The selection_strategy
parameter corresponds to the NAME
for the SelectionStrategy class, and the selection_strategy_kwargs
correspond to any additional kwargs to specify when invoking the select_source_demo
method.
Note
Note that if config.experiment.generation.select_src_per_subtask
is False, only the first subtask’s selection strategy matters, since the selected source demonstration will be used for the remainder of the data generation attempt.
As an example, the NearestNeighborObjectStrategy
(see implementation below) can be specified by passing nearest_neighbor_object
for the selection_strategy
parameter and you can use the selection_strategy_kwargs
parameter to specify a dictionary containing values for the pos_weight
, rot_weight
, and nn_k
parameters.
class NearestNeighborObjectStrategy(MG_SelectionStrategy):
"""
Pick source demonstration to be the one with the closest object pose to the object
in the current scene.
"""
# name for registering this class into registry
NAME = "nearest_neighbor_object"
def select_source_demo(
self,
eef_pose,
object_pose,
src_subtask_datagen_infos,
pos_weight=1.,
rot_weight=1.,
nn_k=3,
):
"""
Selects source demonstration index using the current robot pose, relevant object pose
for the current subtask, and relevant information from the source demonstrations for the
current subtask.
Args:
eef_pose (np.array): current 4x4 eef pose
object_pose (np.array): current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list): DatagenInfo instance for the relevant subtask segment
in the source demonstrations
pos_weight (float): weight on position for minimizing pose distance
rot_weight (float): weight on rotation for minimizing pose distance
nn_k (int): pick source demo index uniformly at randomly from the top @nn_k nearest neighbors
Returns:
source_demo_ind (int): index of source demonstration - indicates which source subtask segment to use
"""
Waypoint#
Waypoint Class Variants#
MimicGen uses a collection of convenience classes to represent waypoints and trajectories (datagen/waypoint.py
).
The Waypoint
class represents a single 6-DoF target pose and the gripper action for that timestep:
class Waypoint(object):
"""
Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point.
"""
def __init__(self, pose, gripper_action, noise=None):
The WaypointSequence
class represents a sequence of these Waypoint
objects:
class WaypointSequence(object):
"""
Represents a sequence of Waypoint objects.
"""
def __init__(self, sequence=None):
It can easily be instantiated from a collection of poses (e.g. WaypointSequence.from_poses
):
@classmethod
def from_poses(cls, poses, gripper_actions, action_noise):
"""
Instantiate a WaypointSequence object given a sequence of poses,
gripper actions, and action noise.
Args:
poses (np.array): sequence of pose matrices of shape (T, 4, 4)
gripper_actions (np.array): sequence of gripper actions
that should be applied at each timestep of shape (T, D).
action_noise (float or np.array): sequence of action noise
magnitudes that should be applied at each timestep. If a
single float is provided, the noise magnitude will be
constant over the trajectory.
"""
Finally, the WaypointTrajectory
class is a sequence of the WaypointSequence
objects, and is a convenient way to represent 6-DoF trajectories and execute them:
class WaypointTrajectory(object):
"""
A sequence of WaypointSequence objects that corresponds to a full 6-DoF trajectory.
"""
WaypointSequence
objects can be added directly to a WaypointTrajectory
object:
def add_waypoint_sequence(self, sequence):
"""
Directly append sequence to list (no interpolation).
Args:
sequence (WaypointSequence instance): sequence to add
"""
Interpolation segments can also be added easily using this helper method:
def add_waypoint_sequence_for_target_pose(
self,
pose,
gripper_action,
num_steps,
skip_interpolation=False,
action_noise=0.,
):
"""
Adds a new waypoint sequence corresponding to a desired target pose. A new WaypointSequence
will be constructed consisting of @num_steps intermediate Waypoint objects. These can either
be constructed with linear interpolation from the last waypoint (default) or be a
constant set of target poses (set @skip_interpolation to True).
Args:
pose (np.array): 4x4 target pose
gripper_action (np.array): value for gripper action
num_steps (int): number of action steps when trying to reach this waypoint. Will
add intermediate linearly interpolated points between the last pose on this trajectory
and the target pose, so that the total number of steps is @num_steps.
skip_interpolation (bool): if True, keep the target pose fixed and repeat it @num_steps
times instead of using linearly interpolated targets.
action_noise (float): scale of random gaussian noise to add during action execution (e.g.
when @execute is called)
"""
The merge
method is a thin wrapper around the above method, easily allowing for linear interpolation between two WaypointTrajectory
objects:
def merge(
self,
other,
num_steps_interp=None,
num_steps_fixed=None,
action_noise=0.,
):
"""
Merge this trajectory with another (@other).
Args:
other (WaypointTrajectory object): the other trajectory to merge into this one
num_steps_interp (int or None): if not None, add a waypoint sequence that interpolates
between the end of the current trajectory and the start of @other
num_steps_fixed (int or None): if not None, add a waypoint sequence that has constant
target poses corresponding to the first target pose in @other
action_noise (float): noise to use during the interpolation segment
"""
Finally, the execute
method makes it easy to execute the waypoint sequences in the simulation environment:
def execute(
self,
env,
env_interface,
render=False,
video_writer=None,
video_skip=5,
camera_names=None,
):
"""
Main function to execute the trajectory. Will use env_interface.target_pose_to_action to
convert each target pose at each waypoint to an action command, and pass that along to
env.step.
Args:
env (robomimic EnvBase instance): environment to use for executing trajectory
env_interface (MG_EnvInterface instance): environment interface for executing trajectory
render (bool): if True, render on-screen
video_writer (imageio writer): video writer
video_skip (int): determines rate at which environment frames are written to video
camera_names (list): determines which camera(s) are used for rendering. Pass more than
one to output a video with multiple camera views concatenated horizontally.
Returns:
results (dict): dictionary with the following items for the executed trajectory:
states (list): simulator state at each timestep
observations (list): observation dictionary at each timestep
datagen_infos (list): datagen_info at each timestep
actions (list): action executed at each timestep
success (bool): whether the trajectory successfully solved the task or not
"""
Waypoint Class Usage during Data Generation#
Each data generation attempt consists of executing a particular sequence of waypoints for each subtask. A WaypointTrajectory
object is constructed and executed for each subtask in the generate
method of the DataGenerator
.
For a given subtask, a WaypointTrajectory
object is initialized with a single pose (usually the current robot end effector pose, or the last target pose from the previous subtask execution attempt). Next, a reference source subtask segment is selected, and then transformed using the transform_source_data_segment_using_object_pose
method from utils/pose_utils.py
. It is then merged into the trajectory object with linear interpolation using the merge
method. Finally, the execute
method is used to carry out the subtask. This process repeats for each subtask