Inability to create multiple Kafka Streams topologies in one application using @Produces method(s)Β #38318
Description
Describe the bug
The problem
Quarkus' guide Using Apache Kafka Streams describes a method of creating Kafka Streams topology using method annotated with @jakarta.enterprise.inject.Produces
.
However, this method does not seem to be working for more than one topology within the same application, as having more than one such method results in the application failing to start with the following exception seen in the logs:
java.lang.RuntimeException: Failed to start quarkus
...
Caused by: jakarta.enterprise.inject.AmbiguousResolutionException: Beans: [PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerA, id=6BFQ4NfZqRuS74kLy3x4LX22jgo], PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerB, id=na5ipbtejGCLdDVBsclVMcRoeQM]]
...
Analysis
Looking at the the stack trace (see full stack trace below), it appears that KafkaStreamsProducer
simply doesn't support more than one topology, and throws said exception when it detects more than one org.apache.kafka.streams.Topology
beans available - see topology.get
in KafkaStreamsProducer:106.
Following this logic, this is likely an enhancement request rather than a bug, but I'm raising it as a bug so that someone more familiar with the matter can kindly confirm whether this is a real issue or just my ignorance/wrong expectations regarding the proper configuration.
If it's the latter, please accept my apologies and a request to kindly clarify the recommended method of creating multiple topologies (be it in the docs or in a response to this StackOverflow question).
Having said that, seeing that Quarkus supports injection of multiple beans of the same type it seems not unreasonable to expect that it should be possible to have multiple @Produces Topology
methods available in one application.
Attempted fix
To address the exception, custom @jakarta.inject.Qualifier
-annotated annotations were applied to the methods producing topologies, following remarks from Quarkus' intro to CDI.
Unfortunately, the effect was that whilst the exception was no longer thrown, the topology producer methods were no longer invoked, leaving Kafka Streams not initialised.
This is understandable as KafkaStreamsProducer
's constructor only appears to expect a @Default
bean, and once annotated with custom qualifiers, none of the topology beans match this requirement.
Workaround
The only workaround that worked for me was to fall back to the 'legacy' method described in Quarkus' blog, i.e. 'manually' managing instances of StreamsBuilder
driven by application lifecycle events.
Expected behavior
To have multiple, working Kafka Streams topologies (multiple instances of class org.apache.kafka.streams.Topology
class) produced through @Produces
annotated methods.
Actual behavior
Having more than one @Produces Topology
methods results in the application failing to start with the following stack trace seen in the logs:
2024-01-21 16:13:37,136 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile [dev]): java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
at io.quarkus.runner.GeneratedMain.main(Unknown Source)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:113)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: jakarta.enterprise.inject.AmbiguousResolutionException: Beans: [PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerA, id=6BFQ4NfZqRuS74kLy3x4LX22jgo], PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerB, id=na5ipbtejGCLdDVBsclVMcRoeQM]]
at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:291)
at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:309)
at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:190)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer.<init>(KafkaStreamsProducer.java:106)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.doCreate(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.create(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.create(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:32)
at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
at io.quarkus.arc.impl.ComputingCacheContextInstances.computeIfAbsent(ComputingCacheContextInstances.java:19)
at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.get(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.get(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.doCreate(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.create(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.create(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:32)
at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
at io.quarkus.arc.impl.ComputingCacheContextInstances.computeIfAbsent(ComputingCacheContextInstances.java:19)
at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.get(Unknown Source)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.get(Unknown Source)
at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:553)
at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:533)
at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:566)
at io.quarkus.arc.impl.ArcContainerImpl.instance(ArcContainerImpl.java:338)
at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Observer_Synthetic_GBi-MxGEb8kh__7Q4XC-t4hECVU.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:157)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:108)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
... 11 more
How to Reproduce?
- Checkout the demo project https://github.com/ziemsky/quarkus-kstreams-topologies.
- Run the application with
./gradlew quarkusDev -i
. - Observe that the application fails to start, with
AmbiguousResolutionException
referencing producer classesTopologyProducerA
andTopologyProducerB
. - Uncomment
// @TopologyA
and// @TopologyB
lines in theTopologyProducer*
classes and run the application. - Observe that the application starts without exception, but topologies are not created and Kafka Streams not initialised.
In other words:
Have a basic Quarkus project with:
io.quarkus:quarkus-kafka-streams
extension included,- Two
@Produces Topology
methods in a class (or classes) annotated withApplicationScoped
. - Start the application and see that it fails to start with
AmbiguousResolutionException
. - Now add custom
@Qualifier
s to the topology beans and see that the application starts but topologies/steams are not activated.
Output of uname -a
or ver
Linux my-host-name 6.7.0-arch3-1 #1 SMP PREEMPT_DYNAMIC Sat, 13 Jan 2024 14:37:14 +0000 x86_64 GNU/Linux
Output of java -version
openjdk version "20.0.2" 2023-07-18 OpenJDK Runtime Environment (build 20.0.2+9-78) OpenJDK 64-Bit Server VM (build 20.0.2+9-78, mixed mode, sharing)
Quarkus version or git rev
3.6.6
Build tool (ie. output of mvnw --version
or gradlew --version
)
Gradle 8.3 - Build time: 2023-08-17 07:06:47 UTC Revision: 8afbf24b469158b714b36e84c6f4d4976c86fcd5 Kotlin: 1.9.0 Groovy: 3.0.17 Ant: Apache Ant(TM) version 1.10.13 compiled on January 4 2023 JVM: 20.0.2 (Oracle Corporation 20.0.2+9-78) OS: Linux 6.7.0-arch3-1 amd64
Additional information
No response
Activity