Skip to content

Commit

Permalink
Rework ZeroMQMH test for Awaitility
Browse files Browse the repository at this point in the history
Turns out PUB socket doesn't care if there are subscribers to it or not.
The sent message may be just lost in between.

* Resend message in the test until it is received by subscriber
* Use `await().untilAsserted()` to iterate the logic at most 10 seconds
  • Loading branch information
artembilan committed Jun 22, 2021
1 parent dfd5775 commit cd7465a
Showing 1 changed file with 10 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,18 @@ void testMessageHandlerForPubSub() {
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
messageHandler.afterPropertiesSet();

ZMQ.Poller poller = CONTEXT.createPoller(1);
poller.register(subSocket, ZMQ.Poller.POLLIN);

Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();
messageHandler.handleMessage(testMessage).subscribe();

while (true) {
poller.poll(10000);
if (poller.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(subSocket);
assertThat(msg).isNotNull();
assertThat(msg.unwrap().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
Message<?> capturedMessage = new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
assertThat(capturedMessage).isEqualTo(testMessage);
msg.destroy();
break;
}
}

poller.unregister(subSocket);
poller.close();
await().untilAsserted(() -> {
messageHandler.handleMessage(testMessage).subscribe();
ZMsg msg = ZMsg.recvMsg(subSocket);
assertThat(msg).isNotNull();
assertThat(msg.unwrap().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
Message<?> capturedMessage = new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
assertThat(capturedMessage).isEqualTo(testMessage);
msg.destroy();
});

messageHandler.destroy();
subSocket.close();
}
Expand Down

0 comments on commit cd7465a

Please sign in to comment.