forked from humans-to-robots-motion/tp-rmp
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Init experiment pipeline, implement PD controller
Signed-off-by: An Thai Le <an.thai.le97@gmail.com>
- Loading branch information
Showing
22 changed files
with
190 additions
and
5 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import sys | ||
import matplotlib.pyplot as plt | ||
import matplotlib | ||
from os.path import join, dirname, abspath | ||
matplotlib.rcParams['pdf.fonttype'] = 42 | ||
matplotlib.rcParams['ps.fonttype'] = 42 | ||
matplotlib.rcParams['font.size'] = 8 | ||
|
||
ROOT_DIR = join(dirname(abspath(__file__)), '..') | ||
sys.path.append(ROOT_DIR) | ||
from tprmp.utils.loading import load_demos, load_demos_2d # noqa | ||
from tprmp.visualization.demonstration import _plot_traj_global # noqa | ||
|
||
DATA_DIR = join(ROOT_DIR, 'data', 'tasks', 'test', 'demos') | ||
dt = 0.01 | ||
N1, N2 = 3, 5 | ||
limits = [0., 4.5] | ||
demo_names = ['C', 'C1', 'G', 'hat', 'hat1', 'I', 'I1', 'J', 'L', 'L1', 'P', 'S', 'S1', 'S2', 'U'] | ||
fig, axs = plt.subplots(N1, N2) | ||
|
||
for n, name in enumerate(demo_names): | ||
data_file = join(DATA_DIR, name + '.p') | ||
demos = load_demos_2d(data_file, dt=dt) | ||
i, j = int(n / N2), n % N2 | ||
plt.sca(axs[i, j]) | ||
axs[i, j].set_aspect('equal') | ||
_plot_traj_global(demos, limits=limits, legend=False, three_d=False, new_ax=False) | ||
last = demos[0].traj[:, -1] | ||
plt.scatter(last[0], last[1], marker='*', color='k', s=30) | ||
# fig.tight_layout() | ||
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import sys | ||
import argparse | ||
import numpy as np | ||
from os.path import join, dirname, abspath | ||
import logging | ||
import matplotlib.pyplot as plt | ||
logging.basicConfig() | ||
logging.getLogger().setLevel(logging.INFO) | ||
|
||
ROOT_DIR = join(dirname(abspath(__file__)), '..') | ||
sys.path.append(ROOT_DIR) | ||
from tprmp.utils.loading import load_demos_2d # noqa | ||
from tprmp.visualization.demonstration import plot_demo # noqa | ||
from tprmp.models.pd import PDController # noqa | ||
|
||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, | ||
description='Example run: python test_pd_controller.py test.p') | ||
parser.add_argument('--task', help='The task folder', type=str, default='test') | ||
parser.add_argument('--demo', help='The data file', type=str, default='S.p') | ||
args = parser.parse_args() | ||
|
||
DATA_DIR = join(ROOT_DIR, 'data', 'tasks', args.task, 'demos') | ||
data_file = join(DATA_DIR, args.demo) | ||
# parameters | ||
T = 2000 | ||
dt = 0.01 | ||
dt_update = 0.01 | ||
ratio = int(dt_update / dt) | ||
limits = [0., 4.5] | ||
Kp = 2 * np.eye(2) | ||
Kd = np.sqrt(2) * np.eye(2) | ||
# load data | ||
demos = load_demos_2d(data_file, dt=dt) | ||
sample = demos[0] | ||
# plot_demo(demos, only_global=False, new_fig=True, new_ax=True, three_d=False, limits=limits, show=True) | ||
model = PDController(sample.manifold, Kp=Kp, Kd=Kd) | ||
x, dx = sample.traj[:, 0], sample._d_traj[:, 0] | ||
traj = [x] | ||
plt.ion() | ||
fig = plt.figure() | ||
ax = fig.add_subplot(111) | ||
ax.set_xlim([limits[0], limits[1]]) | ||
ax.set_ylim([limits[0], limits[1]]) | ||
ax.set_aspect('equal') | ||
ax.plot(sample.traj[0], sample.traj[1], color="b", linestyle='--', alpha=0.6) | ||
line1, = ax.plot(x[0], x[1], marker="o", color="b", markersize=1, linestyle='None', alpha=1.) | ||
line2, = ax.plot(x[0], x[1], marker="o", color="b", markersize=1, linestyle='None', alpha=0.3) | ||
model.update_targets(x, np.zeros_like(dx)) | ||
for t in range(1, T): | ||
if t % ratio == 0: | ||
i = int(t / ratio) | ||
if i < sample.traj.shape[1]: | ||
model.update_targets(sample.traj[:, i], np.zeros_like(dx)) | ||
ddx = model.retrieve(x, dx) | ||
dx = ddx * dt + dx | ||
x = dx * dt + x | ||
traj.append(x) | ||
line1.set_xdata(x[0]) | ||
line1.set_ydata(x[1]) | ||
line2.set_xdata(np.array(traj)[:, 0]) | ||
line2.set_ydata(np.array(traj)[:, 1]) | ||
fig.canvas.draw() | ||
fig.canvas.flush_events() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import numpy as np | ||
import logging | ||
|
||
|
||
class PDController(object): | ||
''' | ||
Simple PD controller | ||
''' | ||
logger = logging.getLogger(__name__) | ||
|
||
def __init__(self, manifold, **kwargs): | ||
self.manifold = manifold | ||
self.Kp = kwargs.get('Kp', np.eye(self.manifold.dim_T)) | ||
self.Kd = kwargs.get('Kd', 0.1 * np.eye(self.manifold.dim_T)) | ||
self.xd = None | ||
self.dxd = None | ||
|
||
def update_targets(self, xd, dxd): | ||
assert xd.shape[0] == self.manifold.dim_M | ||
assert dxd.shape[0] == self.manifold.dim_T | ||
self.xd = xd | ||
self.dxd = dxd | ||
|
||
def retrieve(self, x, dx): | ||
return -(self.Kp @ self.manifold.log_map(x, base=self.xd) + self.Kd @ (dx - self.dxd)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import numpy as np | ||
import os | ||
import logging | ||
from os.path import join, dirname, realpath | ||
|
||
from tprmp.models.tp_rmp import TPRMP | ||
from tprmp.utils.loading import load_demos, load_demos_2d | ||
|
||
_path_file = dirname(realpath(__file__)) | ||
DATA_PATH = join(_path_file, '..', '..', 'data', 'tasks') | ||
|
||
|
||
class Experiment(object): | ||
logger = logging.getLogger(__name__) | ||
|
||
def __init__(self, **kwargs): | ||
self.verbose = kwargs.get('verbose', False) | ||
self.task = kwargs.get('task', 'test') | ||
self.demo_names = kwargs.get('demo_names', ['I']) | ||
self.demo_type = kwargs.get('demo_type', '2D') | ||
self.tag = kwargs.get('tag', None) | ||
self.dt = kwargs.get('dt', 0.01) | ||
self.save_model = kwargs.get('save_model', True) | ||
self.experiment_path = kwargs.get('experiment_path', join(DATA_PATH, self.task, 'experiments')) | ||
self.demo_path = kwargs.get('demo_path', join(DATA_PATH, self.task, 'demos')) | ||
os.makedirs(self.data_path, exist_ok=True) | ||
# training params | ||
self.test_comps = kwargs.get('test_comps', [3, 5, 7, 9]) | ||
self.displace_var = kwargs.get('displace_var', 0.01 * np.arange(10)) | ||
self.stiff_scale = kwargs.get('stiff_scale', 1.) | ||
self.mass_scale = kwargs.get('mass_scale', 1.) | ||
self.var_scale = kwargs.get('var_scale', 1.) | ||
self.delta = kwargs.get('delta', 2.) | ||
self.models = [] | ||
self.demos = [] | ||
# records data | ||
self.tracking_error = [] | ||
self.goal_error = [] | ||
self.success_rate = [] | ||
self.rmpflow_goal_error = [] | ||
self.rmpflow_success_rate = [] | ||
|
||
def load_demos(self): | ||
for i, n in enumerate(self.demo_names): | ||
data_file = join(self.demo_path, n + '.p') | ||
if self.demo_type == '2D': | ||
self.demos[i] = load_demos_2d(data_file, dt=self.dt) | ||
elif self.demo_type == '6D': | ||
self.demos[i] = load_demos(data_file, tag=self.tag) | ||
|
||
def train(self): | ||
for i, n in enumerate(self.demo_names): | ||
for num_comp in self.test_comps: | ||
model = TPRMP(num_comp=num_comp, name=self.task, stiff_scale=self.stiff_scale, mass_scale=self.mass_scale, var_scale=self.var_scale, delta=self.delta) | ||
model.train(self.demos[i]) | ||
model.save(name=n + '_' + str(num_comp) + '.p') | ||
|
||
def tracking_experiment(self): | ||
pass | ||
|
||
def adaptation_experiment(self, disturb=False, moving=False): | ||
pass | ||
|
||
def composable_experiment(self, moving=False): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters