-
Notifications
You must be signed in to change notification settings - Fork 326
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Example] Efficient Trajectory Sampling with CompletedTrajRepertoire
ghstack-source-id: 4d5c587c69230aa8f3a1b9b6fe19f52fa683d703 Pull Request resolved: #2642
- Loading branch information
Showing
1 changed file
with
89 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# Copyright (c) Meta Platforms, Inc. and affiliates. | ||
# | ||
# This source code is licensed under the MIT license found in the | ||
# LICENSE file in the root directory of this source tree. | ||
|
||
"""Efficient Trajectory Sampling with CompletedTrajRepertoire | ||
This example demonstrates how to design a custom transform that filters trajectories during sampling, | ||
ensuring that only completed trajectories are present in sampled batches. This can be particularly useful | ||
when dealing with environments where some trajectories might be corrupted or never reach a done state, | ||
which could skew the learning process or lead to biased models. For instance, in robotics or autonomous | ||
driving, a trajectory might be interrupted due to external factors such as hardware failures or human | ||
intervention, resulting in incomplete or inconsistent data. By filtering out these incomplete trajectories, | ||
we can improve the quality of the training data and increase the robustness of our models. | ||
""" | ||
|
||
import torch | ||
from tensordict import TensorDictBase | ||
from torchrl.data import LazyTensorStorage, ReplayBuffer | ||
from torchrl.envs import GymEnv, TrajCounter, Transform | ||
|
||
|
||
class CompletedTrajectoryRepertoire(Transform): | ||
""" | ||
A transform that keeps track of completed trajectories and filters them out during sampling. | ||
""" | ||
|
||
def __init__(self): | ||
super().__init__() | ||
self.completed_trajectories = set() | ||
self.repertoire_tensor = torch.zeros((), dtype=torch.int64) | ||
|
||
def _update_repertoire(self, tensordict: TensorDictBase) -> None: | ||
"""Updates the repertoire of completed trajectories.""" | ||
done = tensordict["next", "terminated"].squeeze(-1) | ||
traj = tensordict["next", "traj_count"][done].view(-1) | ||
if traj.numel(): | ||
self.completed_trajectories = self.completed_trajectories.union( | ||
traj.tolist() | ||
) | ||
self.repertoire_tensor = torch.tensor( | ||
list(self.completed_trajectories), dtype=torch.int64 | ||
) | ||
|
||
def _inv_call(self, tensordict: TensorDictBase) -> TensorDictBase: | ||
"""Updates the repertoire of completed trajectories during insertion.""" | ||
self._update_repertoire(tensordict) | ||
return tensordict | ||
|
||
def forward(self, tensordict: TensorDictBase) -> TensorDictBase: | ||
"""Filters out incomplete trajectories during sampling.""" | ||
traj = tensordict["next", "traj_count"] | ||
traj = traj.unsqueeze(-1) | ||
has_traj = (traj == self.repertoire_tensor).any(-1) | ||
has_traj = has_traj.view(tensordict.shape) | ||
return tensordict[has_traj] | ||
|
||
|
||
def main(): | ||
# Create a CartPole environment with trajectory counting | ||
env = GymEnv("CartPole-v1").append_transform(TrajCounter()) | ||
|
||
# Create a replay buffer with the completed trajectory repertoire transform | ||
buffer = ReplayBuffer( | ||
storage=LazyTensorStorage(1_000_000), transform=CompletedTrajectoryRepertoire() | ||
) | ||
|
||
# Roll out the environment for 1000 steps | ||
while True: | ||
rollout = env.rollout(1000, break_when_any_done=False) | ||
if not rollout["next", "done"][-1].item(): | ||
break | ||
|
||
# Extend the replay buffer with the rollout | ||
buffer.extend(rollout) | ||
|
||
# Get the last trajectory count | ||
last_traj_count = rollout[-1]["next", "traj_count"].item() | ||
print(f"Incomplete trajectory: {last_traj_count}") | ||
|
||
# Sample from the replay buffer 10 times | ||
for _ in range(10): | ||
sample_traj_counts = buffer.sample(32)["next", "traj_count"].unique() | ||
print(f"Sampled trajectories: {sample_traj_counts}") | ||
assert last_traj_count not in sample_traj_counts | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
b840a77
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible performance regression was detected for benchmark 'CPU Benchmark Results'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold
2
.benchmarks/test_replaybuffer_benchmark.py::test_rb_populate[TensorDictReplayBuffer-ListStorage-RandomSampler-400]
38.529810321622904
iter/sec (stddev: 0.15315565125592198
)221.28009810919153
iter/sec (stddev: 0.0007415159877604631
)5.74
This comment was automatically generated by workflow using github-action-benchmark.
CC: @vmoens