Skip to content

Commit

Permalink
Rework ZeroMqMessageProducerTests for XPUB
Browse files Browse the repository at this point in the history
* Use an `XPUB` socket to receive subscriptions before publishing.
This makes the test more robust and less blocked for that `Thread.sleep()`
  • Loading branch information
artembilan committed Jun 22, 2021
1 parent 7072cf1 commit 353e99a
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ void testMessageProducerForPair() {
}

@Test
void testMessageProducerForPubSubReceiveRaw() throws InterruptedException {
void testMessageProducerForPubSubReceiveRaw() {
String socketAddress = "inproc://messageProducer.test";
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.PUB);
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB);
socket.bind(socketAddress);
socket.setReceiveTimeOut(10_000);

FluxMessageChannel outputChannel = new FluxMessageChannel();

Expand Down Expand Up @@ -126,17 +127,15 @@ void testMessageProducerForPubSubReceiveRaw() throws InterruptedException {
messageProducer.afterPropertiesSet();
messageProducer.start();

// Give it some time to connect and subscribe
Thread.sleep(2000);
assertThat(socket.recv()).isNotNull();

ZMsg msg = ZMsg.newStringMsg("test");
msg.wrap(new ZFrame("testTopic"));
msg.send(socket);

messageProducer.subscribeToTopics("other");

// Give it some time to connect and subscribe
Thread.sleep(2000);
assertThat(socket.recv()).isNotNull();

msg = ZMsg.newStringMsg("test");
msg.wrap(new ZFrame("otherTopic"));
Expand Down

0 comments on commit 353e99a

Please sign in to comment.