Data Generation for Custom Environments
Contents
Data Generation for Custom Environments#
In this section, we provide guidance on using MimicGen to generate data for custom tasks and simulation frameworks.
Note
We recommend going through the Getting Started tutorial first, so that you are familiar with the typical data generation workflow. We will refer back to the steps in the Data Generation Workflow Overview in this section.
Generating Data for New Tasks#
In this section, we will assume we are trying to generate data for a new task implemented in robosuite, and we will use the robosuite Stack Three task as a running example. The same instructions can be used for any task in any simulation framework, as long as an Environment Interface base class already exists for the simulation framework. See the Generating Data for New Simulation Frameworks) below for guidance on setting up a new simulation framework if this has not happened yet.
Step 1: Implement Task-Specific Environment Interface#
The first step is to subclass the appropriate base Environment Interface class – for robosuite, this is the RobosuiteInterface
class at the top of env_interfaces/robosuite.py
. We create a new class as below:
class MG_StackThree(RobosuiteInterface):
"""
Corresponds to robosuite StackThree task and variants.
"""
pass
The MG_EnvInterface
abstract base class in env_interfaces/base.py
(which RobosuiteInterface
inherits from) describes the methods that task-specific subclasses must implement. There are two important methods:
"""
These should be filled out by each simulation domain (e.g. nut assembly, coffee).
"""
@abc.abstractmethod
def get_object_poses(self):
"""
Gets the pose of each object relevant to MimicGen data generation in the current scene.
Returns:
object_poses (dict): dictionary that maps object name (str) to object pose matrix (4x4 np.array)
"""
raise NotImplementedError
@abc.abstractmethod
def get_subtask_term_signals(self):
"""
Gets a dictionary of binary flags for each subtask in a task. The flag is 1
when the subtask has been completed and 0 otherwise. MimicGen only uses this
when parsing source demonstrations at the start of data generation, and it only
uses the first 0 -> 1 transition in this signal to detect the end of a subtask.
Returns:
subtask_term_signals (dict): dictionary that maps subtask name to termination flag (0 or 1)
"""
raise NotImplementedError
Recall that MimicGen generates data by composing object-centric subtask segments together (see the paper for more information). During data generation, MimicGen requires a way to observe the pose of the relevant object at the start of each subtask. The get_object_poses
method will be used for this purpose - it should return a dictionary mapping object name to a pose matrix.
The RobosuiteInterface
base class offers a helper method get_object_pose(self, obj_name, obj_type)
to make retrieving object poses from robosuite easily - we use it below to get the poses of each cube in the StackThree
task.
def get_object_poses(self):
"""
Gets the pose of each object relevant to MimicGen data generation in the current scene.
Returns:
object_poses (dict): dictionary that maps object name (str) to object pose matrix (4x4 np.array)
"""
# three relevant objects - three cubes
return dict(
cubeA=self.get_object_pose(obj_name=self.env.cubeA.root_body, obj_type="body"),
cubeB=self.get_object_pose(obj_name=self.env.cubeB.root_body, obj_type="body"),
cubeC=self.get_object_pose(obj_name=self.env.cubeC.root_body, obj_type="body"),
)
Next we need to implement get_subtask_term_signals
. This function has only one purpose - it is used to provide Subtask Termination Signals for each timestep in the source demonstrations (this is part of what happens in scripts/prepare_src_dataset.py
). These signals are used to determine where each subtask ends and the next one starts – the first 0 to 1 transition in this signal during a source demonstration determines the end of the subtask.
The StackThree tasks consists of 4 object-centric subtasks:
1. grasping cubeA (motion relative to cubeA)
2. placing cubeA on cubeB (motion relative to cubeB)
3. grasping cubeC (motion relative to cubeC)
4. placing cubeC on cubeA (motion relative to cubeA)
To define the end of subtask 1 and 3, we can just check for a successful grasp, and for the end of subtask 2, we can check for a placement (re-using part of the success check for the robosuite StackThree task):
def get_subtask_term_signals(self):
"""
Gets a dictionary of binary flags for each subtask in a task. The flag is 1
when the subtask has been completed and 0 otherwise. MimicGen only uses this
when parsing source demonstrations at the start of data generation, and it only
uses the first 0 -> 1 transition in this signal to detect the end of a subtask.
Returns:
subtask_term_signals (dict): dictionary that maps subtask name to termination flag (0 or 1)
"""
signals = dict()
# first subtask is grasping cubeA (motion relative to cubeA)
signals["grasp_1"] = int(self.env._check_grasp(gripper=self.env.robots[0].gripper, object_geoms=self.env.cubeA))
# second subtask is placing cubeA on cubeB (motion relative to cubeB)
signals["stack_1"] = int(self.env._check_cubeA_stacked())
# third subtask is grasping cubeC (motion relative to cubeC)
signals["grasp_2"] = int(self.env._check_grasp(gripper=self.env.robots[0].gripper, object_geoms=self.env.cubeC))
# final subtask is placing cubeC on cubeA (motion relative to cubeA) - but final subtask signal is not needed
return signals
Warning
The final subtask in a task never requires a subtask termination signal, since its end is determined by the end of the source demonstration.
Note
Providing a proper implementation for the get_subtask_term_signals
function is entirely optional. In most cases it is easy to specify heuristics to define these subtask boundaries as we did above, but sometimes you may want to just directly annotate the boundaries between subtasks. We provide an annotation script (scripts/annotate_subtasks.py
) for this purpose. If you plan to do this, you can just return an empty dict for the get_subtask_term_signals
function.
Step 2: Implement Task-Specific Config#
The next step is to implement a task-specific Config object that inherits from the MG_Config
base class (configs/config.py
). There are three things for the subclass to implement:
@property
@classmethod
def NAME(cls):
# must be specified by subclasses
raise NotImplementedError
@property
@classmethod
def TYPE(cls):
# must be specified by subclasses
raise NotImplementedError
def task_config(self):
"""
This function populates the `config.task` attribute of the config,
which has settings for each object-centric subtask in a task.
"""
raise NotImplementedError
The NAME
and TYPE
are used to store the new subclass into the config registry, and chiefly determine where the auto-generated config template is stored in the repository (e.g. mimicgen/exps/templates/<TYPE>/<NAME>.json
).
The task_config
is consistent with TaskSpec objects – there is an entry for each subtask which consists of the arguments to the add_subtask
function in the TaskSpec object (configs/task_spec.py
):
def add_subtask(
self,
object_ref,
subtask_term_signal,
subtask_term_offset_range=None,
selection_strategy="random",
selection_strategy_kwargs=None,
action_noise=0.,
num_interpolation_steps=5,
num_fixed_steps=0,
apply_noise_during_interpolation=False,
):
We show the implementation for the StackThree config below:
class StackThree_Config(MG_Config):
"""
Corresponds to robosuite StackThree task and variants.
"""
NAME = "stack_three"
TYPE = "robosuite"
def task_config(self):
"""
This function populates the `config.task` attribute of the config,
which has settings for each object-centric subtask in a task. Each
dictionary should have kwargs for the @add_subtask method in the
@MG_TaskSpec object.
"""
self.task.task_spec.subtask_1 = dict(
object_ref="cubeA",
subtask_term_signal="grasp_1",
subtask_term_offset_range=(10, 20),
selection_strategy="nearest_neighbor_object",
selection_strategy_kwargs=dict(nn_k=3),
action_noise=0.05,
num_interpolation_steps=5,
num_fixed_steps=0,
apply_noise_during_interpolation=False,
)
self.task.task_spec.subtask_2 = dict(
object_ref="cubeB",
subtask_term_signal="stack_1",
subtask_term_offset_range=(10, 20),
selection_strategy="nearest_neighbor_object",
selection_strategy_kwargs=dict(nn_k=3),
action_noise=0.05,
num_interpolation_steps=5,
num_fixed_steps=0,
apply_noise_during_interpolation=False,
)
self.task.task_spec.subtask_3 = dict(
object_ref="cubeC",
subtask_term_signal="grasp_2",
subtask_term_offset_range=(10, 20),
selection_strategy="nearest_neighbor_object",
selection_strategy_kwargs=dict(nn_k=3),
action_noise=0.05,
num_interpolation_steps=5,
num_fixed_steps=0,
apply_noise_during_interpolation=False,
)
self.task.task_spec.subtask_4 = dict(
object_ref="cubeA",
subtask_term_signal=None,
subtask_term_offset_range=None,
selection_strategy="nearest_neighbor_object",
selection_strategy_kwargs=dict(nn_k=3),
action_noise=0.05,
num_interpolation_steps=5,
num_fixed_steps=0,
apply_noise_during_interpolation=False,
)
self.task.task_spec.do_not_lock_keys()
Notice that we set the object_ref
for each subtask to be consistent with the object names in the get_object_poses
method in the MG_StackThree
environment interface we implemented. We also set the subtask_term_signal
for each subtask to be consistent with the subtask signals in the get_subtask_term_signals
method in the MG_StackThree
class as well. Please see the TaskSpec page for more information on the other settings.
Note
If you used or plan to use scripts/annotate_subtasks.py
to manually annotate the end of each subtask in the source demos, you should use signal names that are consistent with the --signals
argument that you will pass to that script that give a name to each subtask. Internally, the annotations are stored as subtask termination signals with those names.
Note
You should make sure that the config class you implemented is being imported somewhere in your codebase to make sure it gets registered in the config registry. In the MimicGen codebase, we do this in mimicgen/configs/__init__.py
.
Finally, run scripts/generate_config_templates.py
to generate a config template for this new task. It should appear under mimicgen/exps/templates/<TYPE>/<NAME>.json
. Ensure that the default settings look correct. These settings can be overridden using config generators (see this tutorial).
Step 3: Execute Data Generation Workflow#
You are now all set to try data generation. You should be able to follow the steps documented in the Data Generation Workflow Overview with some minor changes.
Note
Now that you have followed these steps, you can generate datasets for any other variants of this task, as long as the TaskSpec does not change – e.g. different object placements, different robot arms, and different object instances. The Reproducing Experiments tutorial provides examples of all three variations.
Generating Data for New Simulation Frameworks#
Note
Before starting, you should ensure that a robomimic environment wrapper exists for the simulation framework you are using. See this link for guidance on how to create one. The environment metadata in the source hdf5 should point to this environment wrapper.
In this section, we will show how to apply MimicGen to new simulation frameworks. The key step is to implement an Environment Interface base class for the simulation framework. We will use robosuite as a running example in this section.
The MG_EnvInterface
abstract base class in env_interfaces/base.py
describes the methods that base subclasses for new simulators must implement. There are five important methods:
"""
These should be filled out by simulator subclasses (e.g. robosuite).
"""
@property
@classmethod
def INTERFACE_TYPE(self):
"""
Returns string corresponding to interface type. This is used to group
all subclasses together in the interface registry (for example, all robosuite
interfaces) and helps avoid name conflicts.
"""
raise NotImplementedError
@abc.abstractmethod
def get_robot_eef_pose(self):
"""
Get current robot end effector pose. Should be the same frame as used by the robot end-effector controller.
Returns:
pose (np.array): 4x4 eef pose matrix
"""
raise NotImplementedError
@abc.abstractmethod
def target_pose_to_action(self, target_pose, relative=True):
"""
Takes a target pose for the end effector controller and returns an action
(usually a normalized delta pose action) to try and achieve that target pose.
Args:
target_pose (np.array): 4x4 target eef pose
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
action (np.array): action compatible with env.step (minus gripper actuation)
"""
raise NotImplementedError
@abc.abstractmethod
def action_to_target_pose(self, action, relative=True):
"""
Converts action (compatible with env.step) to a target pose for the end effector controller.
Inverse of @target_pose_to_action. Usually used to infer a sequence of target controller poses
from a demonstration trajectory using the recorded actions.
Args:
action (np.array): environment action
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
target_pose (np.array): 4x4 target eef pose that @action corresponds to
"""
raise NotImplementedError
@abc.abstractmethod
def action_to_gripper_action(self, action):
"""
Extracts the gripper actuation part of an action (compatible with env.step).
Args:
action (np.array): environment action
Returns:
gripper_action (np.array): subset of environment action for gripper actuation
"""
raise NotImplementedError
The INTERFACE_TYPE
method is just used to make sure there are no class name conflicts in the environment interface registry. You should just make sure to choose a name unique to your simulation framework. For robosuite, we simply used "robosuite"
.
The get_robot_eef_pose
method returns the current pose of the robot end effector and corresponds to the same frame used by the end effector controller in the simulation environment. In robosuite, the Operational Space Controller uses a specific MuJoCo site, so we return its pose:
def get_robot_eef_pose(self):
"""
Get current robot end effector pose. Should be the same frame as used by the robot end-effector controller.
Returns:
pose (np.array): 4x4 eef pose matrix
"""
# OSC control frame is a MuJoCo site - just retrieve its current pose
return self.get_object_pose(
obj_name=self.env.robots[0].controller.eef_name,
obj_type="site",
)
The next two methods (target_pose_to_action
, action_to_target_pose
) are used to translate between simulator actions (e.g. those given to env.step
) and absolute target poses for the end effector controller. This typically consists of simple scaling factors and transformations between different rotation conventions (as described in Appendix N.1 of the MimicGen paper):
def target_pose_to_action(self, target_pose, relative=True):
"""
Takes a target pose for the end effector controller and returns an action
(usually a normalized delta pose action) to try and achieve that target pose.
Args:
target_pose (np.array): 4x4 target eef pose
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
action (np.array): action compatible with env.step (minus gripper actuation)
"""
# version check for robosuite - must be v1.2+, so that we're using the correct controller convention
assert (robosuite.__version__.split(".")[0] == "1")
assert (robosuite.__version__.split(".")[1] >= "2")
# target position and rotation
target_pos, target_rot = PoseUtils.unmake_pose(target_pose)
# current position and rotation
curr_pose = self.get_robot_eef_pose()
curr_pos, curr_rot = PoseUtils.unmake_pose(curr_pose)
# get maximum position and rotation action bounds
max_dpos = self.env.robots[0].controller.output_max[0]
max_drot = self.env.robots[0].controller.output_max[3]
if relative:
# normalized delta position action
delta_position = target_pos - curr_pos
delta_position = np.clip(delta_position / max_dpos, -1., 1.)
# normalized delta rotation action
delta_rot_mat = target_rot.dot(curr_rot.T)
delta_quat = T.mat2quat(delta_rot_mat)
delta_rotation = T.quat2axisangle(delta_quat)
delta_rotation = np.clip(delta_rotation / max_drot, -1., 1.)
return np.concatenate([delta_position, delta_rotation])
# absolute position and rotation action
target_quat = T.mat2quat(target_rot)
abs_rotation = T.quat2axisangle(target_quat)
return np.concatenate([target_pos, abs_rotation])
def action_to_target_pose(self, action, relative=True):
"""
Converts action (compatible with env.step) to a target pose for the end effector controller.
Inverse of @target_pose_to_action. Usually used to infer a sequence of target controller poses
from a demonstration trajectory using the recorded actions.
Args:
action (np.array): environment action
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
target_pose (np.array): 4x4 target eef pose that @action corresponds to
"""
# version check for robosuite - must be v1.2+, so that we're using the correct controller convention
assert (robosuite.__version__.split(".")[0] == "1")
assert (robosuite.__version__.split(".")[1] >= "2")
if (not relative):
# convert absolute action to absolute pose
target_pos = action[:3]
target_quat = T.axisangle2quat(action[3:6])
target_rot = T.quat2mat(target_quat)
else:
# get maximum position and rotation action bounds
max_dpos = self.env.robots[0].controller.output_max[0]
max_drot = self.env.robots[0].controller.output_max[3]
# unscale actions
delta_position = action[:3] * max_dpos
delta_rotation = action[3:6] * max_drot
# current position and rotation
curr_pose = self.get_robot_eef_pose()
curr_pos, curr_rot = PoseUtils.unmake_pose(curr_pose)
# get pose target
target_pos = curr_pos + delta_position
delta_quat = T.axisangle2quat(delta_rotation)
delta_rot_mat = T.quat2mat(delta_quat)
target_rot = delta_rot_mat.dot(curr_rot)
target_pose = PoseUtils.make_pose(target_pos, target_rot)
return target_pose
Finally, the action_to_gripper_action
extracts the part of the simulator action that corresponds to gripper actuation:
def action_to_gripper_action(self, action):
"""
Extracts the gripper actuation part of an action (compatible with env.step).
Args:
action (np.array): environment action
Returns:
gripper_action (np.array): subset of environment action for gripper actuation
"""
# last dimension is gripper action
return action[-1:]
Finally, you can follow the instructions in the Generating Data for New Tasks section to setup data generation for specific tasks in this simulation framework.