Skip to content

Commit

Permalink
net: lib: coap_client: check poll() condition before retrying CoAP msg
Browse files Browse the repository at this point in the history
Refactor the CoAP retry handling into the handle_poll() function,
so that we only try to send retries if the socket reports POLLOUT.

Also move the receiving into same loop, so when poll() reports POLLIN
we recv() the message and handle it before proceeding to other sockets.

Also fix tests to handle POLLOUT flag and add support for testing
multiple clients.

Signed-off-by: Seppo Takalo <seppo.takalo@nordicsemi.no>
  • Loading branch information
SeppoTakalo authored and mmahadevan108 committed Oct 31, 2024
1 parent 46b7c84 commit 4c6dd4c
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 147 deletions.
2 changes: 0 additions & 2 deletions include/zephyr/net/coap_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,12 @@ struct coap_client {
int fd;
struct sockaddr address;
socklen_t socklen;
bool response_ready;
struct k_mutex lock;
uint8_t send_buf[MAX_COAP_MSG_LEN];
uint8_t recv_buf[MAX_COAP_MSG_LEN];
struct coap_client_internal_request requests[CONFIG_COAP_CLIENT_MAX_REQUESTS];
struct coap_option echo_option;
bool send_echo;
int socket_error;
};
/** @endcond */

Expand Down
204 changes: 83 additions & 121 deletions subsys/net/lib/coap/coap_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ static int num_clients;
static K_SEM_DEFINE(coap_client_recv_sem, 0, 1);
static atomic_t coap_client_recv_active;

static bool timeout_expired(struct coap_client_internal_request *internal_req);
static void cancel_requests_with(struct coap_client *client, int error);
static int recv_response(struct coap_client *client, struct coap_packet *response, bool *truncated);
static int handle_response(struct coap_client *client, const struct coap_packet *response,
bool response_truncated);


static int send_request(int sock, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
Expand Down Expand Up @@ -127,27 +134,26 @@ static bool has_ongoing_exchange(struct coap_client *client)
return false;
}

static struct coap_client_internal_request *get_free_request(struct coap_client *client)
static bool has_timeout_expired(struct coap_client *client)
{
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
if (client->requests[i].request_ongoing == false &&
exchange_lifetime_exceeded(&client->requests[i])) {
return &client->requests[i];
if (timeout_expired(&client->requests[i])) {
return true;
}
}

return NULL;
return false;
}

static bool has_ongoing_requests(void)
static struct coap_client_internal_request *get_free_request(struct coap_client *client)
{
for (int i = 0; i < num_clients; i++) {
if (has_ongoing_request(clients[i])) {
return true;
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
if (client->requests[i].request_ongoing == false &&
exchange_lifetime_exceeded(&client->requests[i])) {
return &client->requests[i];
}
}

return false;
return NULL;
}

static bool has_ongoing_exchanges(void)
Expand Down Expand Up @@ -498,86 +504,91 @@ static int resend_request(struct coap_client *client,
return ret;
}

static int coap_client_resend_handler(void)
static void coap_client_resend_handler(struct coap_client *client)
{
int ret = 0;

for (int i = 0; i < num_clients; i++) {
k_mutex_lock(&clients[i]->lock, K_FOREVER);
k_mutex_lock(&client->lock, K_FOREVER);

for (int j = 0; j < CONFIG_COAP_CLIENT_MAX_REQUESTS; j++) {
if (timeout_expired(&clients[i]->requests[j])) {
ret = resend_request(clients[i], &clients[i]->requests[j]);
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
if (timeout_expired(&client->requests[i])) {
ret = resend_request(client, &client->requests[i]);
if (ret < 0) {
report_callback_error(&client->requests[i], ret);
reset_internal_request(&client->requests[i]);
}
}

k_mutex_unlock(&clients[i]->lock);
}

return ret;
k_mutex_unlock(&client->lock);
}

static int handle_poll(void)
{
int ret = 0;

while (1) {
struct zsock_pollfd fds[CONFIG_COAP_CLIENT_MAX_INSTANCES] = {0};
int nfds = 0;

/* Use periodic timeouts */
for (int i = 0; i < num_clients; i++) {
fds[i].fd = clients[i]->fd;
fds[i].events = ZSOCK_POLLIN;
fds[i].revents = 0;
nfds++;
}
struct zsock_pollfd fds[CONFIG_COAP_CLIENT_MAX_INSTANCES] = {0};
int nfds = 0;

ret = zsock_poll(fds, nfds, COAP_PERIODIC_TIMEOUT);
/* Use periodic timeouts */
for (int i = 0; i < num_clients; i++) {
fds[i].fd = clients[i]->fd;
fds[i].events = (has_ongoing_exchange(clients[i]) ? ZSOCK_POLLIN : 0) |
(has_timeout_expired(clients[i]) ? ZSOCK_POLLOUT : 0);
fds[i].revents = 0;
nfds++;
}

if (ret < 0) {
LOG_ERR("Error in poll:%d", errno);
errno = 0;
return ret;
} else if (ret == 0) {
/* Resend all the expired pending messages */
ret = coap_client_resend_handler();
ret = zsock_poll(fds, nfds, COAP_PERIODIC_TIMEOUT);

if (ret < 0) {
LOG_ERR("Error resending request: %d", ret);
}
if (ret < 0) {
ret = -errno;
LOG_ERR("Error in poll:%d", ret);
return ret;
} else if (ret == 0) {
return 0;
}

if (!has_ongoing_requests()) {
return ret;
}
for (int i = 0; i < nfds; i++) {
if (fds[i].revents & ZSOCK_POLLOUT) {
coap_client_resend_handler(clients[i]);
}
if (fds[i].revents & ZSOCK_POLLIN) {
struct coap_packet response;
bool response_truncated = false;

} else {
for (int i = 0; i < nfds; i++) {
k_mutex_lock(&clients[i]->lock, K_FOREVER);

if (fds[i].revents & ZSOCK_POLLERR) {
LOG_ERR("Error in poll for socket %d", fds[i].fd);
clients[i]->socket_error = -EIO;
}
if (fds[i].revents & ZSOCK_POLLHUP) {
LOG_ERR("Error in poll: POLLHUP for socket %d", fds[i].fd);
clients[i]->socket_error = -ENOTCONN;
}
if (fds[i].revents & ZSOCK_POLLNVAL) {
LOG_ERR("Error in poll: POLLNVAL - fd %d not open",
fds[i].fd);
clients[i]->socket_error = -EINVAL;
}
if (fds[i].revents & ZSOCK_POLLIN) {
clients[i]->response_ready = true;
}
ret = recv_response(clients[i], &response, &response_truncated);
if (ret < 0) {
LOG_ERR("Error receiving response");
cancel_requests_with(clients[i], -EIO);
k_mutex_unlock(&clients[i]->lock);
continue;
}

ret = handle_response(clients[i], &response, response_truncated);
if (ret < 0) {
LOG_ERR("Error handling response");
}

return 0;
k_mutex_unlock(&clients[i]->lock);
}
if (fds[i].revents & ZSOCK_POLLERR) {
LOG_ERR("Error in poll for socket %d", fds[i].fd);
cancel_requests_with(clients[i], -EIO);
}
if (fds[i].revents & ZSOCK_POLLHUP) {
LOG_ERR("Error in poll: POLLHUP for socket %d", fds[i].fd);
cancel_requests_with(clients[i], -EIO);
}
if (fds[i].revents & ZSOCK_POLLNVAL) {
LOG_ERR("Error in poll: POLLNVAL - fd %d not open", fds[i].fd);
cancel_requests_with(clients[i], -EIO);
}
}

return ret;
return 0;
}

static bool token_compare(struct coap_client_internal_request *internal_req,
Expand Down Expand Up @@ -895,14 +906,13 @@ static int handle_response(struct coap_client *client, const struct coap_packet
}
}
fail:
client->response_ready = false;
if (ret < 0 || !internal_req->is_observe) {
internal_req->request_ongoing = false;
}
return ret;
}

void coap_client_cancel_requests(struct coap_client *client)
static void cancel_requests_with(struct coap_client *client, int error)
{
k_mutex_lock(&client->lock, K_FOREVER);

Expand All @@ -914,33 +924,20 @@ void coap_client_cancel_requests(struct coap_client *client)
* do not reenter it. In that case, the user knows their
* request was cancelled anyway.
*/
report_callback_error(&client->requests[i], -ECANCELED);
client->requests[i].request_ongoing = false;
client->requests[i].is_observe = false;
report_callback_error(&client->requests[i], error);
reset_internal_request(&client->requests[i]);
}
}
atomic_clear(&coap_client_recv_active);
k_mutex_unlock(&client->lock);

/* Wait until after zsock_poll() can time out and return. */
k_sleep(K_MSEC(COAP_PERIODIC_TIMEOUT));
}

static void signal_socket_error(struct coap_client *cli)
void coap_client_cancel_requests(struct coap_client *client)
{
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
struct coap_client_internal_request *req = &cli->requests[i];

if (!req->request_ongoing) {
continue;
}

req->request_ongoing = false;
if (req->coap_request.cb) {
req->coap_request.cb(cli->socket_error, 0, NULL, 0,
true, req->coap_request.user_data);
}
}
cancel_requests_with(client, -ECANCELED);
/* Wait until after zsock_poll() can time out and return. */
k_sleep(K_MSEC(COAP_PERIODIC_TIMEOUT));
}

void coap_client_recv(void *coap_cl, void *a, void *b)
Expand All @@ -957,41 +954,6 @@ void coap_client_recv(void *coap_cl, void *a, void *b)
goto idle;
}

for (int i = 0; i < num_clients; i++) {
if (clients[i]->response_ready) {
struct coap_packet response;
bool response_truncated = false;

k_mutex_lock(&clients[i]->lock, K_FOREVER);

ret = recv_response(clients[i], &response, &response_truncated);
if (ret < 0) {
LOG_ERR("Error receiving response");
clients[i]->response_ready = false;
k_mutex_unlock(&clients[i]->lock);
if (ret == -EOPNOTSUPP) {
LOG_ERR("Socket misconfigured.");
goto idle;
}
continue;
}

ret = handle_response(clients[i], &response, response_truncated);
if (ret < 0) {
LOG_ERR("Error handling response");
}

clients[i]->response_ready = false;
k_mutex_unlock(&clients[i]->lock);
}

if (clients[i]->socket_error) {
signal_socket_error(clients[i]);
clients[i]->socket_error = 0;
}

}

/* There are more messages coming */
if (has_ongoing_exchanges()) {
continue;
Expand Down
1 change: 1 addition & 0 deletions tests/net/lib/coap_client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ add_compile_definitions(CONFIG_COAP_CLIENT_MAX_REQUESTS=2)
add_compile_definitions(CONFIG_COAP_CLIENT_MAX_INSTANCES=2)
add_compile_definitions(CONFIG_COAP_MAX_RETRANSMIT=4)
add_compile_definitions(CONFIG_COAP_BACKOFF_PERCENT=200)
add_compile_definitions(CONFIG_COAP_LOG_LEVEL=4)
Loading

0 comments on commit 4c6dd4c

Please sign in to comment.