Skip to content

Commit

Permalink
Merge pull request #130 from zhengjian526/master
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jan 31, 2024
2 parents 07d5735 + ab27c06 commit 4fb707c
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linux_llvm_cov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
cd tests
./test_rest_rpc
llvm-profdata merge -sparse test_rest_rpc-*.profraw -o test_rest_rpc.profdata
llvm-cov show ./test_rest_rpc -instr-profile=test_rest_rpc.profdata -format=html -output-dir=../.coverage_llvm_cov -ignore-filename-regex="examples|thirdparty|tests" -show-instantiations=false
llvm-cov show ./test_rest_rpc -instr-profile=test_rest_rpc.profdata -format=html -output-dir=../.coverage_llvm_cov -ignore-filename-regex="string_view.hpp|nonstd_any.hpp|examples|thirdparty|tests" -show-instantiations=false
echo "Done!"
- name: Upload Coverage Results
Expand All @@ -49,7 +49,7 @@ jobs:
echo "Code Coverage Report" > tmp.log
echo "for detail, [goto summary](https://github.com/${{ github.repository_owner }}/${{ github.event.repository.name }}/actions/runs/${{github.run_id}}) download Artifacts `llvm-cov`" >> tmp.log
echo "\`\`\`" >> tmp.log
llvm-cov report ./test_rest_rpc -instr-profile=test_rest_rpc.profdata -ignore-filename-regex="examples|thirdparty|tests" -show-region-summary=false >> tmp.log
llvm-cov report ./test_rest_rpc -instr-profile=test_rest_rpc.profdata -ignore-filename-regex="string_view.hpp|nonstd_any.hpp|examples|thirdparty|tests" -show-region-summary=false >> tmp.log
echo "\`\`\`" >> tmp.log
- name: Create Comment
Expand Down
30 changes: 23 additions & 7 deletions include/rest_rpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ class rpc_server : private asio::noncopyable {
router_.register_handler<is_pub>(name, f, self);
}

void
set_error_callback(std::function<void(asio::error_code, string_view)> f) {
err_cb_ = std::move(f);
}

void set_conn_timeout_callback(std::function<void(int64_t)> callback) {
conn_timeout_callback_ = std::move(callback);
}
Expand Down Expand Up @@ -169,7 +174,11 @@ class rpc_server : private asio::noncopyable {
}
}
}

void error_callback(const asio::error_code &ec, string_view msg) {
if (err_cb_) {
err_cb_(ec, msg);
}
}
template <typename T>
void publish(std::string key, std::string token, T data) {
{
Expand All @@ -182,13 +191,19 @@ class rpc_server : private asio::noncopyable {
get_shared_data<T>(std::move(data));
std::unique_lock<std::mutex> lock(sub_mtx_);
auto range = sub_map_.equal_range(key + token);
for (auto it = range.first; it != range.second; ++it) {
auto conn = it->second.lock();
if (conn == nullptr || conn->has_closed()) {
continue;
}
if (range.first != range.second) {
for (auto it = range.first; it != range.second; ++it) {
auto conn = it->second.lock();
if (conn == nullptr || conn->has_closed()) {
continue;
}

conn->publish(key + token, *shared_data);
conn->publish(key + token, *shared_data);
}
} else {
error_callback(
asio::error::make_error_code(asio::error::invalid_argument),
"The subscriber of the key: " + key + " does not exist.");
}
}

Expand Down Expand Up @@ -255,6 +270,7 @@ class rpc_server : private asio::noncopyable {

asio::signal_set signals_;

std::function<void(asio::error_code, string_view)> err_cb_;
std::function<void(int64_t)> conn_timeout_callback_;
std::function<void(std::shared_ptr<connection>, std::string)>
on_net_err_callback_ = nullptr;
Expand Down
166 changes: 163 additions & 3 deletions tests/test_rest_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct dummy {
};

std::string echo(rpc_conn conn, const std::string &src) { return src; }
std::string echo_empty(rpc_conn conn, const std::string &src) { return ""; }

struct person {
int id;
Expand All @@ -37,10 +38,35 @@ void hello(rpc_conn conn, const std::string &str) {
std::cout << "hello " << str << std::endl;
}

// if you want to response later, you can use async model, you can control when
// to response
void delay_echo(rpc_conn conn, const std::string &src) {
auto sp = conn.lock();
sp->set_delay(true);
auto req_id = sp->request_id(); // note: you need keep the request id at that
// time, and pass it into the async thread
std::thread thd([conn, req_id, src] {
std::this_thread::sleep_for(std::chrono::seconds(1));
auto conn_sp = conn.lock();
if (conn_sp) {
conn_sp->pack_and_response(req_id, std::move(src));
}
});
thd.detach();
}

struct empty_obj {
MSGPACK_DEFINE()
};

empty_obj get_empty_obj(rpc_conn conn) { return {}; }

TEST_CASE("test_client_reconnect") {
rpc_client client;
client.enable_auto_reconnect(); // automatic reconnect
client.enable_auto_heartbeat(); // automatic heartbeat
client.set_connect_timeout(1000);
client.set_reconnect_count(10);
client.set_error_callback([](asio::error_code ec) {
std::cout << "line: " << __LINE__ << ", msg: " << ec.message() << std::endl;
});
Expand Down Expand Up @@ -141,6 +167,28 @@ TEST_CASE("test_client_sync_call_return_void") {
client.call<>("echo");
}

TEST_CASE("test_client_async_call_empty_obj") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("get_empty_obj", get_empty_obj);
server.async_run();
std::this_thread::sleep_for(std::chrono::milliseconds(200));

rpc_client client("127.0.0.1", 9000);
bool r = client.connect();
CHECK(r);

client.set_error_callback(
[](asio::error_code ec) { std::cout << ec.message() << std::endl; });

auto f = client.async_call<FUTURE>("get_empty_obj");
if (f.wait_for(std::chrono::milliseconds(50)) ==
std::future_status::timeout) {
std::cout << "timeout" << std::endl;
} else {
auto p = f.get().as<empty_obj>();
}
}

TEST_CASE("test_client_async_call") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("get_person", get_person);
Expand Down Expand Up @@ -180,6 +228,7 @@ TEST_CASE("test_client_async_call_not_connect") {
TEST_CASE("test_client_async_call_with_timeout") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("echo", echo);
server.register_handler("get_person", get_person);
server.async_run();
std::this_thread::sleep_for(std::chrono::milliseconds(200));

Expand All @@ -191,15 +240,41 @@ TEST_CASE("test_client_async_call_with_timeout") {
client.async_call<0>(
"echo",
[](const asio::error_code &ec, string_view data) {
std::cout << "error code " << ec << ", err msg: " << data << '\n';
if (ec)
std::cout << "error code: " << ec << ", err msg: " << data << '\n';
},
test);
client.async_call<>(
"echo",
[](const asio::error_code &ec, string_view data) {
std::cout << "error code " << ec << ", err msg: " << data << '\n';
[&client](const asio::error_code &ec, string_view data) {
std::cout << "req id : " << client.reqest_id() << '\n';
if (ec)
std::cout << "error code: " << ec << ", err msg: " << data << '\n';
},
test);
client.async_call<>(
"get_person",
[&client](const asio::error_code &ec, string_view data) {
if (ec) {
std::cout << "error code: " << ec << ", err msg: " << data << '\n';
return;
}
auto p = as<person>(data);
CHECK_EQ(p.id, 1);
CHECK_EQ(p.age, 20);
CHECK_EQ(p.name, "tom");
},
test);
auto f = client.async_call<FUTURE>("get_person");
if (f.wait_for(std::chrono::milliseconds(500)) ==
std::future_status::timeout) {
std::cout << "timeout" << std::endl;
} else {
auto p = f.get().as<person>();
CHECK_EQ(p.id, 1);
CHECK_EQ(p.age, 20);
CHECK_EQ(p.name, "tom");
}
}

TEST_CASE("test_client_subscribe") {
Expand Down Expand Up @@ -232,6 +307,45 @@ TEST_CASE("test_client_subscribe") {
thd.join();
}

TEST_CASE("test_client_subscribe_not_exist_key") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("publish",
[&server](rpc_conn conn, std::string key,
std::string token, std::string val) {
server.publish(std::move(key), std::move(val));
});
bool stop = false;
server.set_error_callback([&stop](asio::error_code ec, string_view msg) {
std::cout << "line: " << __LINE__ << ", msg: " << ec.message() << " -- "
<< msg << std::endl;
CHECK_EQ(ec, asio::error::invalid_argument);
stop = true;
});
server.set_conn_timeout_callback([](int64_t conn_id) {
std::cout << "connect id : " << conn_id << " timeout \n";
});
std::thread thd([&server, &stop] {
while (!stop) {
server.publish("key", "hello subscriber");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
server.async_run();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
rpc_client client;
bool r = client.connect("127.0.0.1", 9000);
CHECK(r);
client.set_error_callback([&stop](asio::error_code ec) {
std::cout << "line: " << __LINE__ << ", msg: " << ec.value() << " -- "
<< ec.message() << std::endl;
});
client.subscribe("key1", [&stop](string_view data) {
CHECK(data != "hello subscriber");
stop = true;
});
thd.join();
}

TEST_CASE("test_server_publish_encode_msg") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("publish",
Expand Down Expand Up @@ -294,6 +408,40 @@ TEST_CASE("test_client_subscribe_by_token") {
thd.join();
}

TEST_CASE("test_client_publish_and_subscribe_by_token") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("publish_by_token", [&server](rpc_conn conn,
std::string key,
std::string token,
std::string val) {
server.publish_by_token(std::move(key), std::move(token), std::move(val));
});
bool stop = false;
std::thread thd([&server, &stop] {
while (!stop) {
auto list = server.get_token_list();
for (auto &token : list) {
server.publish_by_token("key", token, "hello token subscriber");
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});
server.async_run();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::string client_token =
"048a796c8a3c6a6b7bd1223bf2c8cee05232e927b521984ba417cb2fca6df9d1";
rpc_client client;
bool r = client.connect("127.0.0.1", 9000);
CHECK(r);
client.publish_by_token("key", client_token, "hello token subscriber");
client.subscribe("key", client_token, [&stop](string_view data) {
std::cout << data << "\n";
CHECK_EQ(data, "hello token subscriber");
stop = true;
});
thd.join();
}

TEST_CASE("test_server_callback") {
rpc_server server(9000, std::thread::hardware_concurrency());
dummy d;
Expand Down Expand Up @@ -325,4 +473,16 @@ TEST_CASE("test_server_user_data") {
bool r = client.connect();
CHECK(r);
client.call<>("server_user_data");
}
TEST_CASE("test_server_delay_response") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("delay_echo", delay_echo);
server.async_run();
std::this_thread::sleep_for(std::chrono::milliseconds(200));

rpc_client client("127.0.0.1", 9000);
bool r = client.connect();
CHECK(r);
auto result = client.call<std::string>("delay_echo", "test_delay_echo");
CHECK_EQ(result, "test_delay_echo");
}

0 comments on commit 4fb707c

Please sign in to comment.