Data Generation Classes#

This section discusses the key classes related to data generation.

Data Generator#

The DataGenerator class (datagen/data_generator.py) is responsible for generating new demonstration trajectories. First, the internal _load_dataset method is used to parse the source dataset (using Subtask Termination Signals) into source subtask segments. Each segment is a sequence of DatagenInfo objects. Then, the generate method is called repeatedly (by the main script scripts/generate_dataset.py) to keep attempting to generate new trajectories using the source subtask segments. During each new attempt, the select_source_demo method is used to employ a SelectionStrategy to pick a reference source subtask segment to transform. WaypointTrajectory objects are used to transform and compose subtask segments together.

Datagen Info#

DatagenInfo objects keep track of important information used during data generation. These objects are added to source demonstrations with the prepare_src_dataset.py script, or provided directly by an Environment Interface object.

The structure of the object is below:

class DatagenInfo(object):
    """
    Structure of information needed from an environment for data generation. To allow for
    flexibility, not all information must be present.
    """
    def __init__(
        self,
        eef_pose=None,
        object_poses=None,
        subtask_term_signals=None,
        target_pose=None,
        gripper_action=None,
    ):
        """
        Args:
            eef_pose (np.array or None): robot end effector poses of shape [..., 4, 4]
            object_poses (dict or None): dictionary mapping object name to object poses
                of shape [..., 4, 4]
            subtask_term_signals (dict or None): dictionary mapping subtask name to a binary 
                indicator (0 or 1) on whether subtask has been completed. Each value in the
                dictionary could be an int, float, or np.array of shape [..., 1].
            target_pose (np.array or None): target end effector poses of shape [..., 4, 4]
            gripper_action (np.array or None): gripper actions of shape [..., D] where D
                is the dimension of the gripper actuation action for the robot arm
        """

Selection Strategy#

Note

See Appendix N.3 in the MimicGen paper for a more thorough explanation of source subtask segment selection and some further intuition on when to use different settings.

Each data generation attempt requires choosing one or more subtask segments from the source demonstrations to transform – this is carried out by a SelectionStrategy instance:

@six.add_metaclass(MG_SelectionStrategyMeta)
class MG_SelectionStrategy(object):
    """
    Defines methods and functions for selection strategies to implement.
    """
    def __init__(self):
        pass

    @property
    @classmethod
    def NAME(self):
        """
        This name (str) will be used to register the selection strategy class in the global
        registry.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def select_source_demo(
        self,
        eef_pose,
        object_pose,
        src_subtask_datagen_infos,
    ):
        """
        Selects source demonstration index using the current robot pose, relevant object pose
        for the current subtask, and relevant information from the source demonstrations for the
        current subtask.

        Args:
            eef_pose (np.array): current 4x4 eef pose
            object_pose (np.array): current 4x4 object pose, for the object in this subtask
            src_subtask_datagen_infos (list): DatagenInfo instance for the relevant subtask segment
                in the source demonstrations

        Returns:
            source_demo_ind (int): index of source demonstration - indicates which source subtask segment to use
        """
        raise NotImplementedError

Every SelectionStrategy class must subclass this base class and implement the NAME and select_source_demo methods. The NAME field is used to register the SelectionStrategy class into the global registry, and select_source_demo implements the heuristic for selecting a source demonstration index.

Each data generation config json specifies how source segment selection should be done during data generation. First, config.experiment.generation.select_src_per_subtask determines whether to select a different source demonstration for each subtask during data generation, or keep the same source demonstration as the one used for the first subtask. This corresponds to the per-subtask parameter described in the “Selection Frequency” paragraph of Appendix N.3 in the paper.

The specific task config (config.task.task_spec), which corresponds to the Task Spec object used in data generation, also specifies the selection strategy to use for each subtask via the selection_strategy parameter and the selection_strategy_kwargs parameter. The selection_strategy parameter corresponds to the NAME for the SelectionStrategy class, and the selection_strategy_kwargs correspond to any additional kwargs to specify when invoking the select_source_demo method.

Note

Note that if config.experiment.generation.select_src_per_subtask is False, only the first subtask’s selection strategy matters, since the selected source demonstration will be used for the remainder of the data generation attempt.

As an example, the NearestNeighborObjectStrategy (see implementation below) can be specified by passing nearest_neighbor_object for the selection_strategy parameter and you can use the selection_strategy_kwargs parameter to specify a dictionary containing values for the pos_weight, rot_weight, and nn_k parameters.

class NearestNeighborObjectStrategy(MG_SelectionStrategy):
    """
    Pick source demonstration to be the one with the closest object pose to the object 
    in the current scene.
    """

    # name for registering this class into registry
    NAME = "nearest_neighbor_object"

    def select_source_demo(
        self,
        eef_pose,
        object_pose,
        src_subtask_datagen_infos,
        pos_weight=1.,
        rot_weight=1.,
        nn_k=3,
    ):
        """
        Selects source demonstration index using the current robot pose, relevant object pose
        for the current subtask, and relevant information from the source demonstrations for the
        current subtask.

        Args:
            eef_pose (np.array): current 4x4 eef pose
            object_pose (np.array): current 4x4 object pose, for the object in this subtask
            src_subtask_datagen_infos (list): DatagenInfo instance for the relevant subtask segment
                in the source demonstrations
            pos_weight (float): weight on position for minimizing pose distance
            rot_weight (float): weight on rotation for minimizing pose distance
            nn_k (int): pick source demo index uniformly at randomly from the top @nn_k nearest neighbors

        Returns:
            source_demo_ind (int): index of source demonstration - indicates which source subtask segment to use
        """

Waypoint#

Waypoint Class Variants#

MimicGen uses a collection of convenience classes to represent waypoints and trajectories (datagen/waypoint.py).

The Waypoint class represents a single 6-DoF target pose and the gripper action for that timestep:

class Waypoint(object):
    """
    Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point.
    """
    def __init__(self, pose, gripper_action, noise=None):

The WaypointSequence class represents a sequence of these Waypoint objects:

class WaypointSequence(object):
    """
    Represents a sequence of Waypoint objects.
    """
    def __init__(self, sequence=None):

It can easily be instantiated from a collection of poses (e.g. WaypointSequence.from_poses):

    @classmethod
    def from_poses(cls, poses, gripper_actions, action_noise):
        """
        Instantiate a WaypointSequence object given a sequence of poses, 
        gripper actions, and action noise.

        Args:
            poses (np.array): sequence of pose matrices of shape (T, 4, 4)
            gripper_actions (np.array): sequence of gripper actions
                that should be applied at each timestep of shape (T, D).
            action_noise (float or np.array): sequence of action noise
                magnitudes that should be applied at each timestep. If a 
                single float is provided, the noise magnitude will be
                constant over the trajectory.
        """

Finally, the WaypointTrajectory class is a sequence of the WaypointSequence objects, and is a convenient way to represent 6-DoF trajectories and execute them:

class WaypointTrajectory(object):
    """
    A sequence of WaypointSequence objects that corresponds to a full 6-DoF trajectory.
    """

WaypointSequence objects can be added directly to a WaypointTrajectory object:

    def add_waypoint_sequence(self, sequence):
        """
        Directly append sequence to list (no interpolation).

        Args:
            sequence (WaypointSequence instance): sequence to add
        """

Interpolation segments can also be added easily using this helper method:

    def add_waypoint_sequence_for_target_pose(
        self,
        pose,
        gripper_action,
        num_steps,
        skip_interpolation=False,
        action_noise=0.,
    ):
        """
        Adds a new waypoint sequence corresponding to a desired target pose. A new WaypointSequence
        will be constructed consisting of @num_steps intermediate Waypoint objects. These can either
        be constructed with linear interpolation from the last waypoint (default) or be a
        constant set of target poses (set @skip_interpolation to True).

        Args:
            pose (np.array): 4x4 target pose

            gripper_action (np.array): value for gripper action

            num_steps (int): number of action steps when trying to reach this waypoint. Will
                add intermediate linearly interpolated points between the last pose on this trajectory
                and the target pose, so that the total number of steps is @num_steps.

            skip_interpolation (bool): if True, keep the target pose fixed and repeat it @num_steps
                times instead of using linearly interpolated targets.

            action_noise (float): scale of random gaussian noise to add during action execution (e.g.
                when @execute is called)
        """

The merge method is a thin wrapper around the above method, easily allowing for linear interpolation between two WaypointTrajectory objects:

    def merge(
        self,
        other,
        num_steps_interp=None,
        num_steps_fixed=None,
        action_noise=0.,
    ):
        """
        Merge this trajectory with another (@other).

        Args:
            other (WaypointTrajectory object): the other trajectory to merge into this one

            num_steps_interp (int or None): if not None, add a waypoint sequence that interpolates
                between the end of the current trajectory and the start of @other

            num_steps_fixed (int or None): if not None, add a waypoint sequence that has constant 
                target poses corresponding to the first target pose in @other

            action_noise (float): noise to use during the interpolation segment
        """

Finally, the execute method makes it easy to execute the waypoint sequences in the simulation environment:

    def execute(
        self, 
        env,
        env_interface, 
        render=False, 
        video_writer=None, 
        video_skip=5, 
        camera_names=None,
    ):
        """
        Main function to execute the trajectory. Will use env_interface.target_pose_to_action to
        convert each target pose at each waypoint to an action command, and pass that along to
        env.step.

        Args:
            env (robomimic EnvBase instance): environment to use for executing trajectory
            env_interface (MG_EnvInterface instance): environment interface for executing trajectory
            render (bool): if True, render on-screen
            video_writer (imageio writer): video writer
            video_skip (int): determines rate at which environment frames are written to video
            camera_names (list): determines which camera(s) are used for rendering. Pass more than
                one to output a video with multiple camera views concatenated horizontally.

        Returns:
            results (dict): dictionary with the following items for the executed trajectory:
                states (list): simulator state at each timestep
                observations (list): observation dictionary at each timestep
                datagen_infos (list): datagen_info at each timestep
                actions (list): action executed at each timestep
                success (bool): whether the trajectory successfully solved the task or not
        """

Waypoint Class Usage during Data Generation#

Each data generation attempt consists of executing a particular sequence of waypoints for each subtask. A WaypointTrajectory object is constructed and executed for each subtask in the generate method of the DataGenerator.

For a given subtask, a WaypointTrajectory object is initialized with a single pose (usually the current robot end effector pose, or the last target pose from the previous subtask execution attempt). Next, a reference source subtask segment is selected, and then transformed using the transform_source_data_segment_using_object_pose method from utils/pose_utils.py. It is then merged into the trajectory object with linear interpolation using the merge method. Finally, the execute method is used to carry out the subtask. This process repeats for each subtask