Skip to content

Commit

Permalink
fix: aimrt_py channel benchmark for large data packets (AimRT#54)
Browse files Browse the repository at this point in the history
* fix: ensure subscriber receives all messages before ending

Add a short sleep to guarantee the subscriber has enough time to process all published messages before signaling completion. This prevents potential missed messages during the benchmarking process.

* perf: optimize timestamp retrieval for benchmarking

Use high-resolution `perf_counter_ns` instead of `time.time` for better accuracy in message timestamps across publisher, subscriber, and RPC client modules.
  • Loading branch information
zhangyi1357 authored Oct 25, 2024
1 parent 3594abc commit 430f4f6
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/examples/py/pb_chn_bench/benchmark_publisher_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def StartSinglePlan(self, plan_id: int, plan: dict) -> None:

self.publish_complete_event.wait()

# wait for subscriber to receive all messages
time.sleep(1)

# publish end signal
end_signal = benchmark_pb2.BenchmarkSignal()
end_signal.status = benchmark_pb2.BenchmarkStatus.End
Expand All @@ -206,8 +209,7 @@ def PublishTask(self, publisher_wrapper: dict, plan: dict) -> None:

while send_count < plan['msg_count'] and self.run_flag:
msg.seq = send_count
msg.timestamp = int(time.time() * 1e9) # ns

msg.timestamp = time.perf_counter_ns()
aimrt_py.Publish(publisher, msg)

send_count += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def SignalCallback(self, signal_msg: benchmark_pb2.BenchmarkSignal) -> None:
aimrt_py.error(self.logger, f"Unknown signal status: {signal_msg.status}")

def MessageCallback(self, topic_index: int, benchmark_msg: benchmark_pb2.BenchmarkMessage) -> None:
recv_timestamp = time.time() * 1e9 # ns
recv_timestamp = time.perf_counter_ns()

topic_name = f"test_topic_{topic_index}"
self.topic_record_map[topic_name].msg_record_vec[benchmark_msg.seq].recv = True
Expand Down
6 changes: 3 additions & 3 deletions src/examples/py/pb_rpc_bench/benchmark_rpc_client_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def StartSinglePlan(self, plan_id: int, plan: dict) -> None:
self.completed_tasks = 0
self.total_tasks = plan['parallel']

start_time = time.time()
start_time = time.perf_counter_ns()

# start rpc tasks
self.perf_data = []
Expand All @@ -161,8 +161,8 @@ def StartSinglePlan(self, plan_id: int, plan: dict) -> None:
# wait for all tasks to complete
self.request_complete_event.wait()

end_time = time.time()
total_time_ms = (end_time - start_time) * 1e3
end_time = time.perf_counter_ns()
total_time_ms = (end_time - start_time) / 1e6

self.perf_data.sort()

Expand Down

0 comments on commit 430f4f6

Please sign in to comment.