No rebalance after empty assignment in consumer groupΒ #1314
Description
Describe the bug
PartitionWatcher in the consumer group cannot track the creation of a topic. The problem occurs if the topic is created after JoinGroup, but before running PartitionWatcher.
I noticed that Kafka creates a topic asynchronously and the completion of this operation can occur at a time when consumer in the consumer group has already received an empty assignment, but the partitionWatcher has not started yet. Then partitionWatcher at the start counts the partitions of the created topic from Kafka, sees any partitions there (depends on what parameters the topic was created with) and enters a loop in which it polls Kafka and checks the current number of partitions in the topic with the initial one.
But my flow of working with Kafka does not imply adding partitions to a topic in runtime, and accordingly, a consumer in a consumer group will never start consuming data from a topic partition.
Kafka Version
- Kafka version: confluentinc/cp-kafka:7.6.0
- Kafka-go version: 0.4.47
To Reproduce
Resources to reproduce the behavior:
x-kafka-broker: &kafka-broker
image: confluentinc/cp-kafka:7.6.0
expose:
- 29092
- 9092
- 9093
deploy:
resources:
limits:
cpus: '2'
memory: 1024M
x-kafka-env: &kafka-env
CLUSTER_ID: Atx8sxWjSXuyC43GCCUpAA
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_PROCESS_ROLES: controller,broker
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
services:
kafka-node-1:
<<: *kafka-broker
hostname: kafka1
container_name: kafka-node-1
links:
- kafka-node-2
- kafka-node-3
environment:
<<: *kafka-env
KAFKA_NODE_ID: 1
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:9093,PLAINTEXT_HOST://kafka1:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://kafka1:9092
networks:
default:
ipv4_address: 172.18.0.11
kafka-node-2:
<<: *kafka-broker
hostname: kafka2
container_name: kafka-node-2
environment:
<<: *kafka-env
KAFKA_NODE_ID: 2
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://kafka2:29092,CONTROLLER://kafka2:9093,PLAINTEXT_HOST://kafka2:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://kafka2:9092
networks:
default:
ipv4_address: 172.18.0.12
kafka-node-3:
<<: *kafka-broker
hostname: kafka3
container_name: kafka-node-3
environment:
<<: *kafka-env
KAFKA_NODE_ID: 3
KAFKA_BROKER_ID: 3
KAFKA_LISTENERS: PLAINTEXT://kafka3:29092,CONTROLLER://kafka3:9093,PLAINTEXT_HOST://kafka3:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://kafka3:9092
networks:
default:
ipv4_address: 172.18.0.13
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
container_name: schema-registry
depends_on:
- kafka-node-1
- kafka-node-2
- kafka-node-3
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
ports:
- "8081:8081"
kafka-ui:
image: provectuslabs/kafka-ui:v0.4.0
container_name: kafka-ui
ports:
- "8080:8080"
depends_on:
- kafka-node-1
- kafka-node-2
- kafka-node-3
- schema-registry
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083
networks:
default:
name: kafka-test
ipam:
config:
- subnet: 172.18.0.0/24
gateway: 172.18.0.1
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"os/signal"
"sync"
"syscall"
"time")
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGQUIT)
defer cancel()
brokers := []string{"kafka1", "kafka2", "kafka3"}
wg := &sync.WaitGroup{}
for i := 0; i < 20; i++ {
topics := []string{fmt.Sprintf("topic-%d", i)}
group := fmt.Sprintf("group-%d", i)
config := kafka.ReaderConfig{
Brokers: brokers,
GroupTopics: topics,
GroupID: group,
WatchPartitionChanges: true,
PartitionWatchInterval: time.Second,
}
r := kafka.NewReader(config)
wg.Add(1)
go func(r *kafka.Reader, wg *sync.WaitGroup) {
defer func() {
r.Close()
wg.Done()
}()
r.ReadMessage(ctx)
}(r, wg)
}
<-ctx.Done()
wg.Wait()
}
Expected Behavior
All created consumer groups assigned with topics
Observed Behavior
Some consumer groups do not have topics. Consumers don't read messages. In my flow using Kafka, these consumers can only receive their messages after reloading the application.
Often times, pasting the logging output from a kafka.Reader or kafka.Writer will
provide useful details to help maintainers investigate the issue and provide a
fix. If possible, providing stack traces or CPU/memory profiles may also contain
valuable information to understand the conditions that triggered the issue.
Additional Context
Rarely reproduced with a single Kafka instance. Most often with a cluster.
Screenshot from kafka-ui