Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INT-3045: Add ZeroMqChannel support #3355

Merged
merged 18 commits into from
Aug 11, 2020

Conversation

artembilan
Copy link
Member

JIRA: https://jira.spring.io/browse/INT-3045

Provide a SubscribableChannel implementation for ZeroMQ

The general idea is to let to have a distributed channel implementation
where every client can connect to a single server backed by the channel.

The logic in the channel is fully transparent for end-user and there is just
enough to send message to it and subscribe for receiving on the other side.
If PUB/SUB model is used, all the subscribes (even over the network) going to
receive the same published message.
In case of PUSH/PULL only one subscriber in the whole cluster is going to get
the published message

  • Use Reactor for better threading control
  • JeroMQ is not interruptible-friendly: use control sockets to stop proxy loop
  • Name Reactor's schedulers to avoid daemon threads

@artembilan
Copy link
Member Author

See zeromq/jeromq#853.

I haven't finished the TCP binding logic and there are needed many more tests.

But general idea is ready for review.

Thanks

@artembilan
Copy link
Member Author

Decided to go with a bit more design and introduced a ZeroMqProxy Spring-friendly component.

@artembilan
Copy link
Member Author

@sbcd90, @@oli-ver ,

I would appreciate your feedback over here.

@garyrussell ,

any chances that you can review this for upcoming release next week?

Thank you all!

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool

@artembilan artembilan changed the title [DO NOT MERGE YET] INT-3045: Add ZeroMqChannel support INT-3045: Add ZeroMqChannel support Aug 7, 2020
@artembilan artembilan requested a review from garyrussell August 7, 2020 22:05
@artembilan
Copy link
Member Author

So, from my perspective it is ready for the final review.

I'll add socketConfigures to the channel later.
Plus docs could be added separately.

Or just let me know your vision!

@oli-ver
Copy link
Contributor

oli-ver commented Aug 8, 2020

I would like to test this locally, but I struggle with how to get the snapshot version installed in my local maven repository. The Readme.md states to use ./gradlew install, but this task does not exist. When trying ./gradlew publishToMavenLocal instead unit tests do not compile and there are test failures so I tried ./gradlew publishToMavenLocal -x test -x compileTestJava. This fails during generatePomFileForMavenJavaPublication:

2020-08-08T18:31:16.940+0200 [ERROR] [org.gradle.internal.buildevents.BuildExceptionReporter] * What went wrong:
2020-08-08T18:31:16.941+0200 [ERROR] [org.gradle.internal.buildevents.BuildExceptionReporter] Execution failed for task ':spring-integration-zmq:generatePomFileForMavenJavaPublication'.
2020-08-08T18:31:16.941+0200 [ERROR] [org.gradle.internal.buildevents.BuildExceptionReporter] > Could not apply withXml() to generated POM
2020-08-08T18:31:16.941+0200 [ERROR] [org.gradle.internal.buildevents.BuildExceptionReporter]    > Cannot access first() element from an empty List

Is this perhaps already deployed in some publicly available maven repo?

@artembilan
Copy link
Member Author

Cannot access first() element from an empty List

Hm. I saw the same today locally as well.
I thought it is because I have a build cache already.
Let me re-run it with clean build!

I'll let you know what I find!

@artembilan
Copy link
Member Author

spring-integration-zmq

There is no such a module. I renamed it into spring-integration-zeromq.
Try with ./gradlew clean publishToMavenLocal

@artembilan
Copy link
Member Author

Pushed one fresh commit with extra dependency to let the tests to pass.

@oli-ver
Copy link
Contributor

oli-ver commented Aug 8, 2020

Pushed one fresh commit with extra dependency to let the tests to pass.

Thanks, seems to work now (at least after deleting ~/.m2, ~/.gradle and checking out the project fresh from git 😄)

$ ./gradlew clean publishToMavenLocal
[...]
BUILD SUCCESSFUL in 45s
494 actionable tasks: 380 executed, 114 from cache

I will try to set a sample app up and let you know how it worked.

@oli-ver
Copy link
Contributor

oli-ver commented Aug 9, 2020

I tried two different approaches to test that. One is to simply produce and consume messages in the same spring boot app, which works fine (implemented like in the unit test testSimpleSendAndReceive() but in a context of a spring app):

package com.example.demo;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.zeromq.channel.ZeroMqChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.GenericMessage;
import org.zeromq.ZContext;

@SpringBootApplication
@EnableIntegration
public class SpringIntegrationZmqSampleSender {

	public static void main(String[] args) throws IOException, InterruptedException {
		ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationZmqSampleSender.class, args);
		BlockingQueue<Message<?>> receivedMessages = new LinkedBlockingQueue<>();

		ZeroMqDemo zeroMqDemo = ctx.getBean(ZeroMqDemo.class);
		zeroMqDemo.subscribeMessages(receivedMessages::offer);

		for (int i = 1; i <= 100; i++) {
			zeroMqDemo.sendMessage(new GenericMessage<>("Hello World " + i));
		}

		Message<?> message;
		while ((message = receivedMessages.poll(10, TimeUnit.SECONDS)) != null) {
			processMessage(message);
		}

		System.out.println("Hit 'Enter' to close application");
		System.in.read();
		ctx.close();
	}

	static void processMessage(Message<?> message) {
		System.out.println("Received message as subscriber:" + message);
	}

	@Bean
	public ZContext zeroMqContext() {
		System.out.println("Created ZContext");
		return new ZContext();
	}

	@Bean
	@Autowired
	public ZeroMqChannel channel(ZContext zContext) {
		System.out.println("Created ZeroMqChannel");
		ZeroMqChannel channel = new ZeroMqChannel(zContext);
		channel.afterPropertiesSet();
		return channel;
	}

	@Bean
	public ZeroMqDemo zeroMqDemo() {
		return new ZeroMqDemo();
	}

	public class ZeroMqDemo {

		@Autowired
		ZeroMqChannel zmqChannel;

		void sendMessage(GenericMessage<String> message) {
			System.out.println("Sending message to inputChannel: " + message);
			zmqChannel.send(message);
		}

		void subscribeMessages(MessageHandler handler) {
			System.out.println("Handler subscribed to inputChannel: " + handler);
			zmqChannel.subscribe(handler);
		}

	}

}

The second approach is to start two independent apps: A message producer with a ZeroMQProxy connected to a fixed frontend and backend port and a message consumer receiving messages after connecting to these ports. From what I understood from the JavaDocs I need to configure and start a proxy for the producer, connect a channel on the producer side and send messages. Then I just need to connect a channel from the consumer using the same url:

Producer:

package com.example.demo;

import java.io.IOException;
import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.zeromq.ZeroMqProxy;
import org.springframework.integration.zeromq.channel.ZeroMqChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.TaskScheduler;
import org.zeromq.ZContext;

@SpringBootApplication
@EnableIntegration
public class SpringIntegrationZmqSampleProducer {

	private final static int FRONTEND_PORT = 1000;
	private final static int BACKEND_PORT = 2000;
	private final static int PERIOD = 2000;

	public static void main(String[] args) throws IOException, InterruptedException {
		ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationZmqSampleProducer.class, args);

		ZeroMqDemo zeroMqDemo = ctx.getBean(ZeroMqDemo.class);
		zeroMqDemo.scheduleMessages();

		System.out.println("Hit 'Enter' to close application");
		System.in.read();
		ctx.close();
	}

	static void processMessage(Message<?> message) {
		System.out.println("Received message as subscriber:" + message);
	}

	@Bean
	public ZContext zeroMqContext() {
		System.out.println("Created ZContext");
		return new ZContext();
	}

	@Bean
	@Autowired
	public ZeroMqChannel channel(ZContext zContext) {
		System.out.println("Created ZeroMqChannel");
		ZeroMqChannel channel = new ZeroMqChannel(zContext);
		channel.setConnectUrl("tcp://*:" + FRONTEND_PORT + ':' + BACKEND_PORT);
		channel.afterPropertiesSet();
		return channel;
	}

	@Bean
	@Autowired
	public ZeroMqProxy proxy(ZContext zContext) {
		System.out.println("Created ZeroMqProxy");
		ZeroMqProxy proxy = new ZeroMqProxy(zContext);
		proxy.setFrontendPort(FRONTEND_PORT);
		proxy.setBackendPort(BACKEND_PORT);
		proxy.afterPropertiesSet();
		proxy.start();
		return proxy;
	}

	@Bean
	public ZeroMqDemo zeroMqDemo() {
		return new ZeroMqDemo();
	}

	public class ZeroMqDemo {
		@Autowired
		private TaskScheduler taskSheduler;

		@Autowired
		ZeroMqChannel zmqChannel;

		private class MessageSenderTask implements Runnable {

			private String message;

			public MessageSenderTask(String message) {
				this.message = message;
			}

			public void run() {
				String messageToSend = message + " at " + new Date();
				sendMessage(new GenericMessage<>(messageToSend));
				System.out.println("Sent message to zeromq channel: " + messageToSend);
			}

		}

		void sendMessage(GenericMessage<String> message) {
			this.zmqChannel.send(message);
		}

		public void scheduleMessages() {
			this.taskSheduler.scheduleAtFixedRate(new MessageSenderTask("Hello World"), PERIOD);
		}

	}

}

Consumer:

package com.example.demo;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.zeromq.channel.ZeroMqChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.zeromq.ZContext;

@SpringBootApplication
@EnableIntegration
public class SpringIntegrationZmqSampleConsumer {

	private static final int FRONTEND_PORT = 1000;
	private static final int BACKEND_PORT = 2000;

	public static void main(String[] args) throws IOException, InterruptedException {
		ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationZmqSampleConsumer.class, args);
		BlockingQueue<Message<?>> receivedMessages = new LinkedBlockingQueue<>();

		ZeroMqDemo zeroMqDemo = ctx.getBean(ZeroMqDemo.class);
		zeroMqDemo.subscribeMessages(receivedMessages::offer);

		Message<?> message;
		while ((message = receivedMessages.poll(10, TimeUnit.SECONDS)) != null) {
			processMessage(message);
		}

		System.out.println("Hit 'Enter' to close application");
		System.in.read();
		ctx.close();
	}

	static void processMessage(Message<?> message) {
		System.out.println("Received message as subscriber:" + message);
	}

	@Bean
	public ZContext zeroMqContext() {
		System.out.println("Created ZContext");
		return new ZContext();
	}

	@Bean
	@Autowired
	public ZeroMqChannel channel(ZContext zContext) {
		System.out.println("Created ZeroMqChannel");
		ZeroMqChannel channel = new ZeroMqChannel(zContext);
		channel.setConnectUrl("tcp://127.0.0.1:" + FRONTEND_PORT + ":" + BACKEND_PORT);
		channel.afterPropertiesSet();
		return channel;
	}

	@Bean
	public ZeroMqDemo zeroMqDemo() {
		return new ZeroMqDemo();
	}

	public class ZeroMqDemo {

		@Autowired
		ZeroMqChannel zmqChannel;

		void subscribeMessages(MessageHandler handler) {
			System.out.println("Handler subscribed to inputChannel: " + handler);
			zmqChannel.subscribe(handler);
		}

	}

}

This works fine from my point of view.

Just one thing I noticed during implementation: Could it be helpful to have a channel.connectToProxy() method that encapsulates to set the url from the producer side?

Are these examples something worth adding to the spring-integration-samples project? I could prepare a pull request if you are interested.

@artembilan
Copy link
Member Author

Thank you for feedback, @oli-ver !

A couple remarks:

you should not call .afterPropertiesSet();and .start(); manually. This is what is done by the Spring Framework automatically on application context startup. See docs: https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-nature.

Both producer and consumer should have the same connectUrl: "tcp://127.0.0.1:" + FRONTEND_PORT + ":" + BACKEND_PORT. The channel doesn't do a binding over TCP any more.

It doesn't matter where the proxy is started. It is just an Intermediary in terms of ZeroMQ. So, you can even start it in a third application and this still should work.

Yes, I'll add setProxy on the channel as a connection alternative to the connectUrl. It really highly possible that end-users are going to have a proxy and message channel in the same app.

Another note: using this message channel implementation we can't call applications as producer and consumer. Since both of them can produce and consume messages over that shared proxy. It is also possible to spawn several instances of the same application (cluster) and see how messages travel from one instance to another. As I said: with such ZeroMQ proxy in the middle the solution with this channel really looks like interaction with the message broker. See here for analogy:

https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-channels
https://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms-channel
https://docs.spring.io/spring-integration/docs/current/reference/html/redis.html#redis-pub-sub-channel

Yes, sample contribution is great idea!

Let's see if we can come up with some solution when we have a proxy in one app, and several instances of another in a PUB/SUB mode to demonstrate how messages are distributed between those nodes in the cluster!

@artembilan
Copy link
Member Author

Pushed more changes.
Please, take a look into commit messages for more info.

Thank you for review!

@artembilan
Copy link
Member Author

@oli-ver ,

we would appreciate your feedback!
We have a release this Wednesday and would like to include this feature over there.

Thank you!

@oli-ver
Copy link
Contributor

oli-ver commented Aug 10, 2020

@artembilan ok thank you, I will have a look tomorrow.

@artembilan
Copy link
Member Author

Please, find some docs for review.

Thanks

== ZeroMQ Support

Spring Integration provides components to support https://zeromq.org/[ZeroMQ] communication in the application.
An implementation is based on well-supported Java API in https://github.com/zeromq/jeromq[JeroMQ] library.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
An implementation is based on well-supported Java API in https://github.com/zeromq/jeromq[JeroMQ] library.
The implementation is based on the well-supported Java API of the https://github.com/zeromq/jeromq[JeroMQ] library.


Spring Integration provides components to support https://zeromq.org/[ZeroMQ] communication in the application.
An implementation is based on well-supported Java API in https://github.com/zeromq/jeromq[JeroMQ] library.
All components encapsulates ZeroMQ socket lifecycles and manages threads for them internally making an interaction with these components as lock-free and thread-safe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
All components encapsulates ZeroMQ socket lifecycles and manages threads for them internally making an interaction with these components as lock-free and thread-safe.
All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making an interaction with these components lock-free and thread-safe.

[[zeromq-proxy]]
=== ZeroMQ Proxy

The `ZeroMqProxy` is Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The `ZeroMqProxy` is Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function].
The `ZeroMqProxy` is a Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function].

The `ZeroMqProxy` is Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function].
It encapsulates socket lifecycles and thread management.
The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API.
Along side with the standard `ZContext` it requires one of the well-know ZeroMQ proxy mode: SUB/PUB, PULL/PUSH or ROUTER/DEALER.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Along side with the standard `ZContext` it requires one of the well-know ZeroMQ proxy mode: SUB/PUB, PULL/PUSH or ROUTER/DEALER.
Alongside with the standard `ZContext` it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER.

It encapsulates socket lifecycles and thread management.
The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API.
Along side with the standard `ZContext` it requires one of the well-know ZeroMQ proxy mode: SUB/PUB, PULL/PUSH or ROUTER/DEALER.
This way an appropriate pair of ZeroMQ socket types are used for frontend and backend of the proxy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This way an appropriate pair of ZeroMQ socket types are used for frontend and backend of the proxy.
This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy.

This way an appropriate pair of ZeroMQ socket types are used for frontend and backend of the proxy.
See `ZeroMqProxy.Type` for details.

The `ZeroMqProxy` implements a `SmartLifecycle` to to create, bind, configure all the sockets and starts `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The `ZeroMqProxy` implements a `SmartLifecycle` to to create, bind, configure all the sockets and starts `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any).
The `ZeroMqProxy` implements a `SmartLifecycle` to create, bind and configure all the sockets and to start `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any).

JIRA: https://jira.spring.io/browse/INT-3045

Provide a `SubscribableChannel` implementation for ZeroMQ

The general idea is to let to have a distributed channel implementation
where every client can connect to a single server backed by the channel.

The logic in the channel is fully transparent for end-user and there is just
enough to send message to it and subscribe for receiving on the other side.
If PUB/SUB model is used, all the subscribes (even over the network) going to
receive the same published message.
In case of PUSH/PULL only one subscriber in the whole cluster is going to get
the published message

* Use Reactor for better threading control
* JeroMQ is not interruptible-friendly: use control sockets to stop proxy loop
* Name Reactor's schedulers to avoid daemon threads
* Fix Checkstyle violations
* Use `Mono.handle()` to receive data from the socket
* Implement TCP binding
* Add PUB/SUB tests
* Optimize socket create logic
* Add PUSH/PULL over TCP test
* Optimize socket create logic
* Add PUSH/PULL over TCP test
* Implement PUB/SUB over TCP
…d manage ZeroMq proxy

* Use this `ZeroMqProxy` logic as an external component for `ZeroMqChannel` testing
* Apply docs polishing
* Expose a capture socket on the proxy
* Implement `DisposableBean` in the `ZeroMqProxy` to destroy an internal executor service
* Add JavaDocs to `ZeroMqChannel`
* Add one more `ZeroMqChannel` to TCP test to be sure that proxy distribution works well
* Expose `ZeroMqChannel.setZeroMqProxy()` option for easier
configuration within the same application context
* Make `ZeroMqChannel` sockets configuration and connection
dependant on provided `ZeroMqProxy` (if any)
* Add `Consumer<ZMQ.Socket>` configuration callbacks to the `ZeroMqChannel`
* Expose `ZeroMqChannel.consumeDelay` option
* Some additions into a `reactive-streams.adoc`
* Fix typo in the `xmpp.adoc`
@artembilan
Copy link
Member Author

@oli-ver ,

pushed fixed to docs according your review.
Thank you!

Any feedback for the code as well?
Is an implementation OK with you?

src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
Co-authored-by: Gary Russell <grussell@vmware.com>
@artembilan artembilan requested a review from garyrussell August 11, 2020 16:04
@artembilan
Copy link
Member Author

I have applied all your reviews.
Anything else you see need to be fixed?

Thanks

Copy link
Contributor

@oli-ver oli-ver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine for me. I added some review comments and also retested using my demo code I posted yesterday.

The only thing that surprised me was that I needed to remove channel.afterPropertiesSet(); in the "producer" (i will find another name for that 😄) to not run into Caused by: java.lang.IllegalStateException: Or 'zeroMqProxy' or 'connectUrl' can be provided (or none), but not both.. You mentioned I should not call that manually, so I deleted both lines channel.afterPropertiesSet() and proxy.afterPropertiesSet(). Then I ran into

Caused by: java.lang.NullPointerException: null
	at org.springframework.integration.zeromq.ZeroMqProxy.start(ZeroMqProxy.java:257) ~[spring-integration-zeromq-5.4.0-SNAPSHOT.jar:5.4.0-SNAPSHOT]

When deleting channel.afterPropertiesSet();and leaving `proxy.afterPropertiesSet();" everything works fine.

So I suppose I need to check my demo code, but today I do not have time further working on this.


The control socket is exposed as a `SocketType.PAIR` with an inter-thread transport on the `"inproc://" + beanName + ".control"` address; it can be obtained via `getControlAddress()`.
Should be used with the same application from another `SocketType.PAIR` socket to send `ZMQ.PROXY_TERMINATE`, `ZMQ.PROXY_PAUSE` and/or `ZMQ.PROXY_RESUME` commands.
In fact `ZeroMqProxy` uses performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate `ZMQ.proxy()` loop and close all the bound sockets gracefully.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In fact `ZeroMqProxy` uses performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate `ZMQ.proxy()` loop and close all the bound sockets gracefully.
In fact `ZeroMqProxy` performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate `ZMQ.proxy()` loop and close all the bound sockets gracefully.

src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved
src/reference/asciidoc/zeromq.adoc Outdated Show resolved Hide resolved

The `ZeroMqChannel` is a `SubscribableChannel` which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction.
It can work in a PUB/SUB mode (defaults to PUSH/PULL); could be as local inter-thread channel (uses `PAIR` sockets) - the `connectUrl` is not provided.
In the distributed mode it has to be connected to externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In the distributed mode it has to be connected to externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy.
In the distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy.

It can work in a PUB/SUB mode (defaults to PUSH/PULL); could be as local inter-thread channel (uses `PAIR` sockets) - the `connectUrl` is not provided.
In the distributed mode it has to be connected to externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy.
The connect url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy.
In fact, for convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of connection string, if it is configured in the same application as proxy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In fact, for convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of connection string, if it is configured in the same application as proxy.
In fact, for convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of a connection string, if it is configured in the same application as proxy.

Thanks :)

if (this.zeroMqProxy != null) {
return Mono.just(this.zeroMqProxy.getBackendPort())
.filter((port) -> port > 0)
.repeatWhenEmpty((repeat) -> repeat.delayElements(Duration.ofMillis(100))) // NOSONAR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could use the consumeDelay also here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wait: It's another delay, only 100 millis. Perhaps worth another repeatDelay variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I don't think so. This is really just busy-spin until the proxy is started with selected port.
It is fully not related to the end-user logic.

* e.g. {@code tcp://localhost:6001:6002}
*/
public void setConnectUrl(@Nullable String connectUrl) {
if (connectUrl != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this perhaps be handeled by an Assertstatement? This way it's just silently ignored when someone tries to set the connectUrl to null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This option really can be as a null.
Imagine a Spring Boot configuration property when you just propagate it into this setter.
In one environment it can be empty (or missed) and will make this channel in a local mode, e.g. for testing.
In another env it is going to have some reasonable value for connection to the proxy.
Independently of the env your code around this channel configuration is going to be the same.

@oli-ver
Copy link
Contributor

oli-ver commented Aug 11, 2020

@oli-ver ,

pushed fixed to docs according your review.
Thank you!

Any feedback for the code as well?
Is an implementation OK with you?

Sorry you posted that while I was still working on that. I will start a review next time so you see that it's still in progress.

Co-authored-by: Oliver <oli-ver@users.noreply.github.com>
@garyrussell
Copy link
Contributor

Looks like the test hung on Travis.

…able()`

to really evaluate the current port state on every repeat
* Add finite `100` repeat number to avoid infinite blocking when proxy is not started at all
* Add `doOnError()` for proxy `Mono` to log `ERROR` when repeat is exhausted
@garyrussell garyrussell merged commit a76bb24 into spring-projects:master Aug 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants