-
Notifications
You must be signed in to change notification settings - Fork 0
/
service_test.py
155 lines (137 loc) · 6.06 KB
/
service_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import open3d as o3d
from src.utilt import *
from pyquaternion import Quaternion as Quat
from scipy.spatial.transform import Rotation as Rot
import time
import torch
from random import choice
from src.grasp_utilt import view_pre_grasp_trajectory
from graspnetAPI import GraspGroup
from graspnetAPI.utils.utils import plot_gripper_pro_max
import rospy
from sensor_msgs.msg import PointCloud2,PointField
from geometry_msgs.msg import PoseArray
from graspnet_pkg.srv import GraspNetList,GraspNetListResponse
from graspnet_pkg.msg import GraspMsg
from msg_srv.srv import GraspAffordance, GraspTrajectory,GraspAffordanceResponse,GraspTrajectoryResponse
from sensor_msgs import point_cloud2
from geometry_msgs.msg import Point, Quaternion,Pose
def xyzl_array_to_pointcloud2(points, stamp=None, frame_id=None):
'''
Numpy to PointCloud2
Create a sensor_msgs.PointCloud2 from an array
of points (x, y, z, l)
'''
msg = PointCloud2()
if stamp:
msg.header.stamp = stamp
if frame_id:
msg.header.frame_id = frame_id
if len(points.shape) == 3:
msg.height = points.shape[1]
msg.width = points.shape[0]
else:
msg.height = 1
msg.width = len(points)
msg.fields = [
PointField('x', 0, PointField.FLOAT32, 1),
PointField('y', 4, PointField.FLOAT32, 1),
PointField('z', 8, PointField.FLOAT32, 1),
# PointField('intensity', 12, PointField.FLOAT32, 1)
]
msg.is_bigendian = False
msg.point_step = 12
msg.row_step = 12 * points.shape[0]
msg.is_dense = int(np.isfinite(points).all())
msg.data = np.asarray(points, np.float32).tostring()
return msg
def getGraspNetService(pc_msg):
rospy.wait_for_service("GraspNet")
try:
srv_handle = rospy.ServiceProxy("GraspNet", GraspNetList)
rep = srv_handle(pc_msg)
gg_list = rep.gg
except rospy.ServiceException as e:
print("Service call failed : %s" % e)
return None
return gg_list
def convertGraspMsgtoNumpy(gg_list):
# grasp_score, grasp_width, grasp_height, grasp_depth, rotation_matrix, grasp_center, obj_ids
gg_array = []
for gg in gg_list:
grasp_score = gg.grasp_score
grasp_width = gg.grasp_width
grasp_height = gg.grasp_height
grasp_depth = gg.grasp_depth
pre_ = [grasp_score, grasp_width, grasp_height, grasp_depth]
rotation = gg.rotation
grasp_center = gg.grasp_center
obj_ids = gg.obj_ids
quat = Quat(rotation.w, rotation.x, rotation.y, rotation.z)
rotation_matrix = quat.rotation_matrix.flatten().tolist()
grasp_center = [grasp_center.x, grasp_center.y, grasp_center.z]
gg_array.append(pre_ + rotation_matrix + grasp_center + [obj_ids])
gg_array = np.array(gg_array)
return gg_array
def getAffordanceService(pc_msg,gg):
rospy.wait_for_service("Affordance")
try:
srv_handle = rospy.ServiceProxy("Affordance",GraspAffordance)
result = srv_handle(pointcloud=pc_msg,gg=gg)
except rospy.ServiceException as e:
print("Service call failed : %s" % e)
return result
def getTrajectoryService(pc_msg,gg):
rospy.wait_for_service("Trajectory")
try:
srv_handle = rospy.ServiceProxy("Trajectory",GraspTrajectory)
trajectory = srv_handle(gg=gg,pointcloud=pc_msg)
except rospy.ServiceException as e:
print("Service call failed : %s" % e)
return trajectory
if __name__ == '__main__':
# real data
file_path = "./real_data/microwaves/021/"
file = np.load("./L515_test/without_train/0000/xyz0.21_remove.npz")
pointcloud = file["point_cloud"]
pc_msg = xyzl_array_to_pointcloud2(pointcloud,frame_id='pc_base')
gg_list = getGraspNetService(pc_msg)
gg = convertGraspMsgtoNumpy(gg_list)
grasp_group = GraspGroup(gg)
axis_pcd = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1, origin=[0, 0, 0])
cloud = o3d.geometry.PointCloud()
cloud.points = o3d.utility.Vector3dVector(pointcloud)
grippers = grasp_group.to_open3d_geometry_list()
# o3d.visualization.draw_geometries([cloud, axis_pcd, *grippers])
affordance_result = getAffordanceService(pc_msg,gg_list)
print(affordance_result)
for i in range(len(gg_list)):
if i in affordance_result.result:
grippers[i].paint_uniform_color([0.0, 1.0, 0.0])
else:
grippers[i].paint_uniform_color([1.0, 0.0, 0.0])
# o3d.visualization.draw_geometries([cloud, axis_pcd, *grippers])
for _index in affordance_result.result:
gg = gg_list[_index]
trajectory = getTrajectoryService(pc_msg, gg)
axis_pcd = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1, origin=[0, 0, 0])
cloud = o3d.geometry.PointCloud()
cloud.points = o3d.utility.Vector3dVector(pointcloud)
cloud.paint_uniform_color([0.5, 0.5, 0.5])
grasps = []
grasps.append(plot_gripper_pro_max(center=[gg.grasp_center.x, gg.grasp_center.y, gg.grasp_center.z],
R=np.array(pb.getMatrixFromQuaternion(
[gg.rotation.x, gg.rotation.y, gg.rotation.z, gg.rotation.w])).reshape((3,3)),
width=gg.grasp_width,
depth=gg.grasp_depth,
score=1))
for _pose in trajectory.trajectory.poses:
grasps.append(plot_gripper_pro_max(center=[_pose.position.x,_pose.position.y,_pose.position.z],
R=np.array(pb.getMatrixFromQuaternion(
[_pose.orientation.x,_pose.orientation.y,_pose.orientation.z,_pose.orientation.w])).reshape((3,3)),
width=gg.grasp_width,
depth=gg.grasp_depth,
score=gg.grasp_score))
o3d.visualization.draw_geometries([cloud, *grasps, axis_pcd])
# import pdb;pdb.set_trace()