Skip to content

Inability to create multiple Kafka Streams topologies in one application using @Produces method(s)Β #38318

Open
@ziemsky

Description

Describe the bug

The problem

Quarkus' guide Using Apache Kafka Streams describes a method of creating Kafka Streams topology using method annotated with @jakarta.enterprise.inject.Produces.

However, this method does not seem to be working for more than one topology within the same application, as having more than one such method results in the application failing to start with the following exception seen in the logs:

java.lang.RuntimeException: Failed to start quarkus
        ...
Caused by: jakarta.enterprise.inject.AmbiguousResolutionException: Beans: [PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerA, id=6BFQ4NfZqRuS74kLy3x4LX22jgo], PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerB, id=na5ipbtejGCLdDVBsclVMcRoeQM]]
        ...

Analysis

Looking at the the stack trace (see full stack trace below), it appears that KafkaStreamsProducer simply doesn't support more than one topology, and throws said exception when it detects more than one org.apache.kafka.streams.Topology beans available - see topology.get in KafkaStreamsProducer:106.

Following this logic, this is likely an enhancement request rather than a bug, but I'm raising it as a bug so that someone more familiar with the matter can kindly confirm whether this is a real issue or just my ignorance/wrong expectations regarding the proper configuration.

If it's the latter, please accept my apologies and a request to kindly clarify the recommended method of creating multiple topologies (be it in the docs or in a response to this StackOverflow question).

Having said that, seeing that Quarkus supports injection of multiple beans of the same type it seems not unreasonable to expect that it should be possible to have multiple @Produces Topology methods available in one application.

Attempted fix

To address the exception, custom @jakarta.inject.Qualifier-annotated annotations were applied to the methods producing topologies, following remarks from Quarkus' intro to CDI.

Unfortunately, the effect was that whilst the exception was no longer thrown, the topology producer methods were no longer invoked, leaving Kafka Streams not initialised.

This is understandable as KafkaStreamsProducer's constructor only appears to expect a @Default bean, and once annotated with custom qualifiers, none of the topology beans match this requirement.

Workaround

The only workaround that worked for me was to fall back to the 'legacy' method described in Quarkus' blog, i.e. 'manually' managing instances of StreamsBuilder driven by application lifecycle events.

Expected behavior

To have multiple, working Kafka Streams topologies (multiple instances of class org.apache.kafka.streams.Topology class) produced through @Produces annotated methods.

Actual behavior

Having more than one @Produces Topology methods results in the application failing to start with the following stack trace seen in the logs:

2024-01-21 16:13:37,136 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile [dev]): java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:113)
        at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: jakarta.enterprise.inject.AmbiguousResolutionException: Beans: [PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerA, id=6BFQ4NfZqRuS74kLy3x4LX22jgo], PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerB, id=na5ipbtejGCLdDVBsclVMcRoeQM]]
        at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:291)
        at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:309)
        at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:190)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer.<init>(KafkaStreamsProducer.java:106)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.doCreate(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.create(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.create(Unknown Source)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:32)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.ComputingCacheContextInstances.computeIfAbsent(ComputingCacheContextInstances.java:19)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.get(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.get(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.doCreate(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.create(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.create(Unknown Source)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:32)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.ComputingCacheContextInstances.computeIfAbsent(ComputingCacheContextInstances.java:19)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.get(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.get(Unknown Source)
        at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:553)
        at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:533)
        at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:566)
        at io.quarkus.arc.impl.ArcContainerImpl.instance(ArcContainerImpl.java:338)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Observer_Synthetic_GBi-MxGEb8kh__7Q4XC-t4hECVU.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:157)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:108)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        ... 11 more

How to Reproduce?

  • Checkout the demo project https://github.com/ziemsky/quarkus-kstreams-topologies.
  • Run the application with ./gradlew quarkusDev -i.
  • Observe that the application fails to start, with AmbiguousResolutionException referencing producer classes TopologyProducerA and TopologyProducerB.
  • Uncomment // @TopologyA and // @TopologyB lines in the TopologyProducer* classes and run the application.
  • Observe that the application starts without exception, but topologies are not created and Kafka Streams not initialised.

In other words:

Have a basic Quarkus project with:

  • io.quarkus:quarkus-kafka-streams extension included,
  • Two @Produces Topology methods in a class (or classes) annotated with ApplicationScoped.
  • Start the application and see that it fails to start with AmbiguousResolutionException.
  • Now add custom @Qualifiers to the topology beans and see that the application starts but topologies/steams are not activated.

Output of uname -a or ver

Linux my-host-name 6.7.0-arch3-1 #1 SMP PREEMPT_DYNAMIC Sat, 13 Jan 2024 14:37:14 +0000 x86_64 GNU/Linux

Output of java -version

openjdk version "20.0.2" 2023-07-18 OpenJDK Runtime Environment (build 20.0.2+9-78) OpenJDK 64-Bit Server VM (build 20.0.2+9-78, mixed mode, sharing)

Quarkus version or git rev

3.6.6

Build tool (ie. output of mvnw --version or gradlew --version)

Gradle 8.3 - Build time: 2023-08-17 07:06:47 UTC Revision: 8afbf24b469158b714b36e84c6f4d4976c86fcd5 Kotlin: 1.9.0 Groovy: 3.0.17 Ant: Apache Ant(TM) version 1.10.13 compiled on January 4 2023 JVM: 20.0.2 (Oracle Corporation 20.0.2+9-78) OS: Linux 6.7.0-arch3-1 amd64

Additional information

No response

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions