-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INT-3045: Add ZeroMqChannel
support
#3355
INT-3045: Add ZeroMqChannel
support
#3355
Conversation
See zeromq/jeromq#853. I haven't finished the TCP binding logic and there are needed many more tests. But general idea is ready for review. Thanks |
dae62f1
to
351f270
Compare
Decided to go with a bit more design and introduced a |
I would appreciate your feedback over here. any chances that you can review this for upcoming release next week? Thank you all! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
Outdated
Show resolved
Hide resolved
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
Outdated
Show resolved
Hide resolved
...ation-zeromq/src/main/java/org/springframework/integration/zeromq/channel/ZeroMqChannel.java
Show resolved
Hide resolved
eec1ff1
to
e2454e1
Compare
ZeroMqChannel
supportZeroMqChannel
support
So, from my perspective it is ready for the final review. I'll add Or just let me know your vision! |
I would like to test this locally, but I struggle with how to get the snapshot version installed in my local maven repository. The Readme.md states to use
Is this perhaps already deployed in some publicly available maven repo? |
Hm. I saw the same today locally as well. I'll let you know what I find! |
There is no such a module. I renamed it into |
Pushed one fresh commit with extra dependency to let the tests to pass. |
Thanks, seems to work now (at least after deleting ~/.m2, ~/.gradle and checking out the project fresh from git 😄)
I will try to set a sample app up and let you know how it worked. |
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
Outdated
Show resolved
Hide resolved
...ation-zeromq/src/main/java/org/springframework/integration/zeromq/channel/ZeroMqChannel.java
Outdated
Show resolved
Hide resolved
I tried two different approaches to test that. One is to simply produce and consume messages in the same spring boot app, which works fine (implemented like in the unit test testSimpleSendAndReceive() but in a context of a spring app):
The second approach is to start two independent apps: A message producer with a ZeroMQProxy connected to a fixed frontend and backend port and a message consumer receiving messages after connecting to these ports. From what I understood from the JavaDocs I need to configure and start a proxy for the producer, connect a channel on the producer side and send messages. Then I just need to connect a channel from the consumer using the same url: Producer:
Consumer:
This works fine from my point of view. Just one thing I noticed during implementation: Could it be helpful to have a Are these examples something worth adding to the spring-integration-samples project? I could prepare a pull request if you are interested. |
Thank you for feedback, @oli-ver ! A couple remarks: you should not call Both producer and consumer should have the same connectUrl: It doesn't matter where the proxy is started. It is just an Intermediary in terms of ZeroMQ. So, you can even start it in a third application and this still should work. Yes, I'll add Another note: using this message channel implementation we can't call applications as https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-channels Yes, sample contribution is great idea! Let's see if we can come up with some solution when we have a proxy in one app, and several instances of another in a PUB/SUB mode to demonstrate how messages are distributed between those nodes in the cluster! |
e5e3ec5
to
dc37be3
Compare
Pushed more changes. Thank you for review! |
dc37be3
to
20e962c
Compare
@oli-ver , we would appreciate your feedback! Thank you! |
@artembilan ok thank you, I will have a look tomorrow. |
Please, find some docs for review. Thanks |
src/reference/asciidoc/zeromq.adoc
Outdated
== ZeroMQ Support | ||
|
||
Spring Integration provides components to support https://zeromq.org/[ZeroMQ] communication in the application. | ||
An implementation is based on well-supported Java API in https://github.com/zeromq/jeromq[JeroMQ] library. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An implementation is based on well-supported Java API in https://github.com/zeromq/jeromq[JeroMQ] library. | |
The implementation is based on the well-supported Java API of the https://github.com/zeromq/jeromq[JeroMQ] library. |
src/reference/asciidoc/zeromq.adoc
Outdated
|
||
Spring Integration provides components to support https://zeromq.org/[ZeroMQ] communication in the application. | ||
An implementation is based on well-supported Java API in https://github.com/zeromq/jeromq[JeroMQ] library. | ||
All components encapsulates ZeroMQ socket lifecycles and manages threads for them internally making an interaction with these components as lock-free and thread-safe. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All components encapsulates ZeroMQ socket lifecycles and manages threads for them internally making an interaction with these components as lock-free and thread-safe. | |
All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making an interaction with these components lock-free and thread-safe. |
src/reference/asciidoc/zeromq.adoc
Outdated
[[zeromq-proxy]] | ||
=== ZeroMQ Proxy | ||
|
||
The `ZeroMqProxy` is Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The `ZeroMqProxy` is Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function]. | |
The `ZeroMqProxy` is a Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function]. |
src/reference/asciidoc/zeromq.adoc
Outdated
The `ZeroMqProxy` is Spring-friendly wrapper for the built-in `ZMQ.proxy()` http://zguide.zeromq.org/page:chapter2#toc15[function]. | ||
It encapsulates socket lifecycles and thread management. | ||
The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API. | ||
Along side with the standard `ZContext` it requires one of the well-know ZeroMQ proxy mode: SUB/PUB, PULL/PUSH or ROUTER/DEALER. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along side with the standard `ZContext` it requires one of the well-know ZeroMQ proxy mode: SUB/PUB, PULL/PUSH or ROUTER/DEALER. | |
Alongside with the standard `ZContext` it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER. |
src/reference/asciidoc/zeromq.adoc
Outdated
It encapsulates socket lifecycles and thread management. | ||
The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API. | ||
Along side with the standard `ZContext` it requires one of the well-know ZeroMQ proxy mode: SUB/PUB, PULL/PUSH or ROUTER/DEALER. | ||
This way an appropriate pair of ZeroMQ socket types are used for frontend and backend of the proxy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way an appropriate pair of ZeroMQ socket types are used for frontend and backend of the proxy. | |
This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy. |
src/reference/asciidoc/zeromq.adoc
Outdated
This way an appropriate pair of ZeroMQ socket types are used for frontend and backend of the proxy. | ||
See `ZeroMqProxy.Type` for details. | ||
|
||
The `ZeroMqProxy` implements a `SmartLifecycle` to to create, bind, configure all the sockets and starts `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The `ZeroMqProxy` implements a `SmartLifecycle` to to create, bind, configure all the sockets and starts `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any). | |
The `ZeroMqProxy` implements a `SmartLifecycle` to create, bind and configure all the sockets and to start `ZMQ.proxy()` in a dedicated thread from an `Executor` (if any). |
JIRA: https://jira.spring.io/browse/INT-3045 Provide a `SubscribableChannel` implementation for ZeroMQ The general idea is to let to have a distributed channel implementation where every client can connect to a single server backed by the channel. The logic in the channel is fully transparent for end-user and there is just enough to send message to it and subscribe for receiving on the other side. If PUB/SUB model is used, all the subscribes (even over the network) going to receive the same published message. In case of PUSH/PULL only one subscriber in the whole cluster is going to get the published message * Use Reactor for better threading control * JeroMQ is not interruptible-friendly: use control sockets to stop proxy loop * Name Reactor's schedulers to avoid daemon threads
* Fix Checkstyle violations * Use `Mono.handle()` to receive data from the socket
* Implement TCP binding * Add PUB/SUB tests
* Optimize socket create logic * Add PUSH/PULL over TCP test
* Optimize socket create logic * Add PUSH/PULL over TCP test * Implement PUB/SUB over TCP
…d manage ZeroMq proxy * Use this `ZeroMqProxy` logic as an external component for `ZeroMqChannel` testing
* Apply docs polishing * Expose a capture socket on the proxy * Implement `DisposableBean` in the `ZeroMqProxy` to destroy an internal executor service * Add JavaDocs to `ZeroMqChannel` * Add one more `ZeroMqChannel` to TCP test to be sure that proxy distribution works well
* Expose `ZeroMqChannel.setZeroMqProxy()` option for easier configuration within the same application context * Make `ZeroMqChannel` sockets configuration and connection dependant on provided `ZeroMqProxy` (if any) * Add `Consumer<ZMQ.Socket>` configuration callbacks to the `ZeroMqChannel` * Expose `ZeroMqChannel.consumeDelay` option
* Some additions into a `reactive-streams.adoc` * Fix typo in the `xmpp.adoc`
…default * More words into docs
8e0c2d1
to
642247c
Compare
@oli-ver , pushed fixed to docs according your review. Any feedback for the code as well? |
Co-authored-by: Gary Russell <grussell@vmware.com>
I have applied all your reviews. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine for me. I added some review comments and also retested using my demo code I posted yesterday.
The only thing that surprised me was that I needed to remove channel.afterPropertiesSet();
in the "producer" (i will find another name for that 😄) to not run into Caused by: java.lang.IllegalStateException: Or 'zeroMqProxy' or 'connectUrl' can be provided (or none), but not both.
. You mentioned I should not call that manually, so I deleted both lines channel.afterPropertiesSet()
and proxy.afterPropertiesSet()
. Then I ran into
Caused by: java.lang.NullPointerException: null
at org.springframework.integration.zeromq.ZeroMqProxy.start(ZeroMqProxy.java:257) ~[spring-integration-zeromq-5.4.0-SNAPSHOT.jar:5.4.0-SNAPSHOT]
When deleting channel.afterPropertiesSet();
and leaving `proxy.afterPropertiesSet();" everything works fine.
So I suppose I need to check my demo code, but today I do not have time further working on this.
src/reference/asciidoc/zeromq.adoc
Outdated
|
||
The control socket is exposed as a `SocketType.PAIR` with an inter-thread transport on the `"inproc://" + beanName + ".control"` address; it can be obtained via `getControlAddress()`. | ||
Should be used with the same application from another `SocketType.PAIR` socket to send `ZMQ.PROXY_TERMINATE`, `ZMQ.PROXY_PAUSE` and/or `ZMQ.PROXY_RESUME` commands. | ||
In fact `ZeroMqProxy` uses performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate `ZMQ.proxy()` loop and close all the bound sockets gracefully. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact `ZeroMqProxy` uses performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate `ZMQ.proxy()` loop and close all the bound sockets gracefully. | |
In fact `ZeroMqProxy` performs a `ZMQ.PROXY_TERMINATE` command when `stop()` is called for its lifecycle to terminate `ZMQ.proxy()` loop and close all the bound sockets gracefully. |
src/reference/asciidoc/zeromq.adoc
Outdated
|
||
The `ZeroMqChannel` is a `SubscribableChannel` which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction. | ||
It can work in a PUB/SUB mode (defaults to PUSH/PULL); could be as local inter-thread channel (uses `PAIR` sockets) - the `connectUrl` is not provided. | ||
In the distributed mode it has to be connected to externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the distributed mode it has to be connected to externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. | |
In the distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. |
src/reference/asciidoc/zeromq.adoc
Outdated
It can work in a PUB/SUB mode (defaults to PUSH/PULL); could be as local inter-thread channel (uses `PAIR` sockets) - the `connectUrl` is not provided. | ||
In the distributed mode it has to be connected to externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. | ||
The connect url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy. | ||
In fact, for convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of connection string, if it is configured in the same application as proxy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, for convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of connection string, if it is configured in the same application as proxy. | |
In fact, for convenience, the channel could be supplied with the `ZeroMqProxy` instance instead of a connection string, if it is configured in the same application as proxy. |
Thanks :)
spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java
Outdated
Show resolved
Hide resolved
...ation-zeromq/src/main/java/org/springframework/integration/zeromq/channel/ZeroMqChannel.java
Outdated
Show resolved
Hide resolved
if (this.zeroMqProxy != null) { | ||
return Mono.just(this.zeroMqProxy.getBackendPort()) | ||
.filter((port) -> port > 0) | ||
.repeatWhenEmpty((repeat) -> repeat.delayElements(Duration.ofMillis(100))) // NOSONAR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could use the consumeDelay also here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah wait: It's another delay, only 100 millis. Perhaps worth another repeatDelay variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... I don't think so. This is really just busy-spin until the proxy is started with selected port.
It is fully not related to the end-user logic.
...ation-zeromq/src/main/java/org/springframework/integration/zeromq/channel/ZeroMqChannel.java
Outdated
Show resolved
Hide resolved
* e.g. {@code tcp://localhost:6001:6002} | ||
*/ | ||
public void setConnectUrl(@Nullable String connectUrl) { | ||
if (connectUrl != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this perhaps be handeled by an Assert
statement? This way it's just silently ignored when someone tries to set the connectUrl to null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. This option really can be as a null
.
Imagine a Spring Boot configuration property when you just propagate it into this setter.
In one environment it can be empty (or missed) and will make this channel in a local mode, e.g. for testing.
In another env it is going to have some reasonable value for connection to the proxy.
Independently of the env your code around this channel configuration is going to be the same.
Sorry you posted that while I was still working on that. I will start a review next time so you see that it's still in progress. |
Co-authored-by: Oliver <oli-ver@users.noreply.github.com>
Looks like the test hung on Travis. |
…able()` to really evaluate the current port state on every repeat * Add finite `100` repeat number to avoid infinite blocking when proxy is not started at all * Add `doOnError()` for proxy `Mono` to log `ERROR` when repeat is exhausted
JIRA: https://jira.spring.io/browse/INT-3045
Provide a
SubscribableChannel
implementation for ZeroMQThe general idea is to let to have a distributed channel implementation
where every client can connect to a single server backed by the channel.
The logic in the channel is fully transparent for end-user and there is just
enough to send message to it and subscribe for receiving on the other side.
If PUB/SUB model is used, all the subscribes (even over the network) going to
receive the same published message.
In case of PUSH/PULL only one subscriber in the whole cluster is going to get
the published message