Skip to content

Commit

Permalink
[Example] Efficient Trajectory Sampling with CompletedTrajRepertoire
Browse files Browse the repository at this point in the history
ghstack-source-id: 4d5c587c69230aa8f3a1b9b6fe19f52fa683d703
Pull Request resolved: #2642
  • Loading branch information
vmoens committed Dec 10, 2024
1 parent 2511c04 commit b840a77
Showing 1 changed file with 89 additions and 0 deletions.
89 changes: 89 additions & 0 deletions examples/replay-buffers/filter-imcomplete-trajs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""Efficient Trajectory Sampling with CompletedTrajRepertoire
This example demonstrates how to design a custom transform that filters trajectories during sampling,
ensuring that only completed trajectories are present in sampled batches. This can be particularly useful
when dealing with environments where some trajectories might be corrupted or never reach a done state,
which could skew the learning process or lead to biased models. For instance, in robotics or autonomous
driving, a trajectory might be interrupted due to external factors such as hardware failures or human
intervention, resulting in incomplete or inconsistent data. By filtering out these incomplete trajectories,
we can improve the quality of the training data and increase the robustness of our models.
"""

import torch
from tensordict import TensorDictBase
from torchrl.data import LazyTensorStorage, ReplayBuffer
from torchrl.envs import GymEnv, TrajCounter, Transform


class CompletedTrajectoryRepertoire(Transform):
"""
A transform that keeps track of completed trajectories and filters them out during sampling.
"""

def __init__(self):
super().__init__()
self.completed_trajectories = set()
self.repertoire_tensor = torch.zeros((), dtype=torch.int64)

def _update_repertoire(self, tensordict: TensorDictBase) -> None:
"""Updates the repertoire of completed trajectories."""
done = tensordict["next", "terminated"].squeeze(-1)
traj = tensordict["next", "traj_count"][done].view(-1)
if traj.numel():
self.completed_trajectories = self.completed_trajectories.union(
traj.tolist()
)
self.repertoire_tensor = torch.tensor(
list(self.completed_trajectories), dtype=torch.int64
)

def _inv_call(self, tensordict: TensorDictBase) -> TensorDictBase:
"""Updates the repertoire of completed trajectories during insertion."""
self._update_repertoire(tensordict)
return tensordict

def forward(self, tensordict: TensorDictBase) -> TensorDictBase:
"""Filters out incomplete trajectories during sampling."""
traj = tensordict["next", "traj_count"]
traj = traj.unsqueeze(-1)
has_traj = (traj == self.repertoire_tensor).any(-1)
has_traj = has_traj.view(tensordict.shape)
return tensordict[has_traj]


def main():
# Create a CartPole environment with trajectory counting
env = GymEnv("CartPole-v1").append_transform(TrajCounter())

# Create a replay buffer with the completed trajectory repertoire transform
buffer = ReplayBuffer(
storage=LazyTensorStorage(1_000_000), transform=CompletedTrajectoryRepertoire()
)

# Roll out the environment for 1000 steps
while True:
rollout = env.rollout(1000, break_when_any_done=False)
if not rollout["next", "done"][-1].item():
break

# Extend the replay buffer with the rollout
buffer.extend(rollout)

# Get the last trajectory count
last_traj_count = rollout[-1]["next", "traj_count"].item()
print(f"Incomplete trajectory: {last_traj_count}")

# Sample from the replay buffer 10 times
for _ in range(10):
sample_traj_counts = buffer.sample(32)["next", "traj_count"].unique()
print(f"Sampled trajectories: {sample_traj_counts}")
assert last_traj_count not in sample_traj_counts


if __name__ == "__main__":
main()

1 comment on commit b840a77

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'CPU Benchmark Results'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: b840a77 Previous: 2511c04 Ratio
benchmarks/test_replaybuffer_benchmark.py::test_rb_populate[TensorDictReplayBuffer-ListStorage-RandomSampler-400] 38.529810321622904 iter/sec (stddev: 0.15315565125592198) 221.28009810919153 iter/sec (stddev: 0.0007415159877604631) 5.74

This comment was automatically generated by workflow using github-action-benchmark.

CC: @vmoens

Please sign in to comment.