Skip to content

Commit

Permalink
add navigation 2d app
Browse files Browse the repository at this point in the history
  • Loading branch information
kohonda committed Jan 10, 2024
1 parent b7507d2 commit 9eac0bc
Show file tree
Hide file tree
Showing 6 changed files with 712 additions and 1 deletion.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,15 @@ source .venv/bin/activate
pip3 install -e .[dev]
```

## Play with Gym Classic Control
## Examples

### Navigation 2D
```bash
python3 app/navigation2d.py
```
<p align="center">
<img src="./media/navigation_2d_42.gif" width="500" alt="navigation2d">
</p>

### Pendulum
```bash
Expand Down
75 changes: 75 additions & 0 deletions app/navigation2d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import torch

import time

# import gymnasium
import fire
import tqdm

from controller.mppi import MPPI
from envs.navigation_2d import Navigation2DEnv


def main(save_mode: bool = False):
env = Navigation2DEnv()

# solver
solver = MPPI(
horizon=50,
num_samples=100000,
dim_state=3,
dim_control=2,
dynamics=env.dynamics,
stage_cost=env.stage_cost,
terminal_cost=env.terminal_cost,
u_min=env.u_min,
u_max=env.u_max,
sigmas=torch.tensor([0.5, 0.5]),
lambda_=1,
)

state = env.reset()
max_steps = 500
average_time = 0
for i in range(max_steps):
start = time.time()
with torch.no_grad():
action_seq, state_seq = solver.forward(state=state)
end = time.time()
average_time += (end - start) / max_steps

state, is_goal_reached = env.step(action_seq[0, :])

is_collisions = env.collision_check(state=state_seq)

top_samples, top_weights = solver.get_top_samples(num_samples=100)

if save_mode:
env.render(
predicted_trajectory=state_seq,
is_collisions=is_collisions,
top_samples=(top_samples, top_weights),
mode="rgb_array",
)
# progress bar
if i == 0:
pbar = tqdm.tqdm(total=max_steps, desc="recording video")
pbar.update(1)

else:
env.render(
predicted_trajectory=state_seq,
is_collisions=is_collisions,
top_samples=(top_samples, top_weights),
mode="human",
)
if is_goal_reached:
print("Goal Reached!")
break

print("average solve time: {}".format(average_time * 1000), " [ms]")
env.close() # close window ans save video


if __name__ == "__main__":
fire.Fire(main)
Binary file added media/navigation_2d_42.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file added src/envs/__init__.py
Empty file.
300 changes: 300 additions & 0 deletions src/envs/navigation_2d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
"""
Kohei Honda, 2023.
"""

from __future__ import annotations

from typing import Tuple, Union
from matplotlib import pyplot as plt

import torch
import numpy as np
import os


from moviepy.video.io.ImageSequenceClip import ImageSequenceClip

from envs.obstacle_map_2d import ObstacleMap, generate_random_obstacles


@torch.jit.script
def angle_normalize(x):
return ((x + torch.pi) % (2 * torch.pi)) - torch.pi


class Navigation2DEnv:
def __init__(
self,
device=torch.device("cuda"),
dtype=torch.float32,
seed: int = 42,
) -> None:
# device and dtype
if torch.cuda.is_available() and device == torch.device("cuda"):
self._device = torch.device("cuda")
else:
self._device = torch.device("cpu")
self._dtype = dtype

self._obstacle_map = ObstacleMap(
map_size=(20, 20), cell_size=0.1, device=self._device, dtype=self._dtype
)
self._seed = seed

generate_random_obstacles(
obstacle_map=self._obstacle_map,
random_x_range=(-7.5, 7.5),
random_y_range=(-7.5, 7.5),
num_circle_obs=7,
radius_range=(1, 1),
num_rectangle_obs=7,
width_range=(2, 2),
height_range=(2, 2),
max_iteration=1000,
seed=seed,
)
self._obstacle_map.convert_to_torch()

self._start_pos = torch.tensor(
[-9.0, -9.0], device=self._device, dtype=self._dtype
)
self._goal_pos = torch.tensor(
[9.0, 9.0], device=self._device, dtype=self._dtype
)

self._robot_state = torch.zeros(3, device=self._device, dtype=self._dtype)
self._robot_state[:2] = self._start_pos
self._robot_state[2] = angle_normalize(
torch.atan2(
self._goal_pos[1] - self._start_pos[1],
self._goal_pos[0] - self._start_pos[0],
)
)

# u: [v, omega] (m/s, rad/s)
self.u_min = torch.tensor([-1.0, -1.0], device=self._device, dtype=self._dtype)
self.u_max = torch.tensor([1.0, 1.0], device=self._device, dtype=self._dtype)

def reset(self) -> torch.Tensor:
"""
Reset robot state.
Returns:
torch.Tensor: shape (3,) [x, y, theta]
"""
self._robot_state[:2] = self._start_pos
self._robot_state[2] = angle_normalize(
torch.atan2(
self._goal_pos[1] - self._start_pos[1],
self._goal_pos[0] - self._start_pos[0],
)
)

self._fig = plt.figure(layout="tight")
self._ax = self._fig.add_subplot()
self._ax.set_xlim(self._obstacle_map.x_lim)
self._ax.set_ylim(self._obstacle_map.y_lim)
self._ax.set_aspect("equal")

self._rendered_frames = []

return self._robot_state

def step(self, u: torch.Tensor) -> Tuple[torch.Tensor, bool]:
"""
Update robot state based on differential drive dynamics.
Args:
u (torch.Tensor): control batch tensor, shape (2) [v, omega]
Returns:
Tuple[torch.Tensor, bool]: Tuple of robot state and is goal reached.
"""
u = torch.clamp(u, self.u_min, self.u_max)

self._robot_state = self.dynamics(
state=self._robot_state.unsqueeze(0), action=u.unsqueeze(0)
).squeeze(0)

# goal check
goal_threshold = 0.5
is_goal_reached = (
torch.norm(self._robot_state[:2] - self._goal_pos) < goal_threshold
)

return self._robot_state, is_goal_reached

def render(
self,
predicted_trajectory: torch.Tensor = None,
is_collisions: torch.Tensor = None,
top_samples: Tuple[torch.Tensor, torch.Tensor] = None,
mode: str = "human",
) -> None:
self._ax.set_xlabel("x [m]")
self._ax.set_ylabel("y [m]")

# obstacle map
self._obstacle_map.render(self._ax)

# start and goal
self._ax.scatter(
self._start_pos[0].item(),
self._start_pos[1].item(),
marker="o",
color="red",
)
self._ax.scatter(
self._goal_pos[0].item(),
self._goal_pos[1].item(),
marker="o",
color="green",
)

# robot
self._ax.scatter(
self._robot_state[0].item(),
self._robot_state[1].item(),
marker="o",
color="blue",
)

# visualize top samples with different alpha based on weights
if top_samples is not None:
top_samples, top_weights = top_samples
top_samples = top_samples.cpu().numpy()
top_weights = top_weights.cpu().numpy()
top_weights = 0.7 * top_weights / np.max(top_weights)
top_weights = np.clip(top_weights, 0.1, 0.7)
for i in range(top_samples.shape[0]):
self._ax.plot(
top_samples[i, :, 0],
top_samples[i, :, 1],
color="black",
alpha=top_weights[i],
)

# predicted trajectory
if predicted_trajectory is not None:
# if is collision color is red
colors = np.array(["orange"] * predicted_trajectory.shape[1])
if is_collisions is not None:
is_collisions = is_collisions.cpu().numpy()
is_collisions = np.any(is_collisions, axis=0)
colors[is_collisions] = "red"

self._ax.scatter(
predicted_trajectory[0, :, 0].cpu().numpy(),
predicted_trajectory[0, :, 1].cpu().numpy(),
color=colors,
marker="o",
s=10,
)

if mode == "human":
# online rendering
plt.pause(0.001)
plt.cla()
elif mode == "rgb_array":
# offline rendering for video
# TODO: high resolution rendering
self._fig.canvas.draw()
data = np.frombuffer(self._fig.canvas.tostring_rgb(), dtype=np.uint8)
data = data.reshape(self._fig.canvas.get_width_height()[::-1] + (3,))
plt.cla()
self._rendered_frames.append(data)

def close(self, path: str = None) -> None:
if path is None:
# mkdir video if not exists

if not os.path.exists("video"):
os.mkdir("video")
path = "video/" + "navigation_2d_" + str(self._seed) + ".gif"

if len(self._rendered_frames) > 0:
# save animation
clip = ImageSequenceClip(self._rendered_frames, fps=10)
# clip.write_videofile(path, fps=10)
clip.write_gif(path, fps=10)

# save first frame as png
plt.imsave(
"video/" + "navigation_2d_" + str(self._seed) + ".png",
self._rendered_frames[0],
)

def dynamics(
self, state: torch.Tensor, action: torch.Tensor, delta_t: float = 0.1
) -> torch.Tensor:
"""
Update robot state based on differential drive dynamics.
Args:
state (torch.Tensor): state batch tensor, shape (batch_size, 3) [x, y, theta]
action (torch.Tensor): control batch tensor, shape (batch_size, 2) [v, omega]
delta_t (float): time step interval [s]
Returns:
torch.Tensor: shape (batch_size, 3) [x, y, theta]
"""
x = state[:, 0].view(-1, 1)
y = state[:, 1].view(-1, 1)
theta = state[:, 2].view(-1, 1)
v = torch.clamp(action[:, 0].view(-1, 1), self.u_min[0], self.u_max[0])
omega = torch.clamp(action[:, 1].view(-1, 1), self.u_min[1], self.u_max[1])
theta = angle_normalize(theta)

new_x = x + v * torch.cos(theta) * delta_t
new_y = y + v * torch.sin(theta) * delta_t
new_theta = angle_normalize(theta + omega * delta_t)

# clamp x and y to the map boundary
x_lim = torch.tensor(
self._obstacle_map.x_lim, device=self._device, dtype=self._dtype
)
y_lim = torch.tensor(
self._obstacle_map.y_lim, device=self._device, dtype=self._dtype
)
clamped_x = torch.clamp(new_x, x_lim[0], x_lim[1])
clamped_y = torch.clamp(new_y, y_lim[0], y_lim[1])

return torch.cat([clamped_x, clamped_y, new_theta], dim=1)

def stage_cost(self, state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
"""
Calculate stage cost.
Args:
state (torch.Tensor): state batch tensor, shape (batch_size, 3) [x, y, theta]
action (torch.Tensor): control batch tensor, shape (batch_size, 2) [v, omega]
Returns:
torch.Tensor: shape (batch_size,)
"""
goal_cost = torch.norm(state[:, :2] - self._goal_pos, dim=1)

pos_batch = state[:, :2].unsqueeze(1) # (batch_size, 1, 2)

obstacle_cost = self._obstacle_map.compute_cost(pos_batch).squeeze(
1
) # (batch_size,)

cost = goal_cost + 10000 * obstacle_cost

return cost

def terminal_cost(self, state: torch.Tensor) -> torch.Tensor:
"""
Calculate terminal cost.
Args:
x (torch.Tensor): state batch tensor, shape (batch_size, 3) [x, y, theta]
Returns:
torch.Tensor: shape (batch_size,)
"""
return self.stage_cost(state=state, action=torch.zeros_like(state[:, :2]))

def collision_check(self, state: torch.Tensor) -> torch.Tensor:
"""
Args:
state (torch.Tensor): state batch tensor, shape (batch_size, traj_size , 3) [x, y, theta]
Returns:
torch.Tensor: shape (batch_size,)
"""
pos_batch = state[:, :, :2]
is_collisions = self._obstacle_map.compute_cost(pos_batch).squeeze(1)
return is_collisions
Loading

0 comments on commit 9eac0bc

Please sign in to comment.