Skip to content

Slow (timeout) shutdown and rebalancing with withRebalanceSafeCommits Β #1132

Closed
@jgordijn

Description

I tried out the new withRebalanceSafeCommits feature and it has unexpected behavior:

  1. Shutdown takes long time now. Result is that all partitions handled by the stopping application are stuck
    It seems that on shutdown the stream is stopped, but the rebalanceListener is waiting until the last message is committed. As the stream is stopped, this will never happen and it takes 3 min (3/5 of the maxPollInterval) to stop.
  2. Joining the group takes a long time.
    When I start the second application it seems that rebalancing is started. However it still takes 3 minutes to join.

Maybe it has something to do with the slow (100ms) processing per message, but having a 10 message poll should mitigate this. I would thus expect rebalancing to happen in (worst case) 1 sec (10x100ms).

package listener

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import zio.kafka.consumer.{Consumer, ConsumerSettings, RebalanceListener, Subscription}
import zio.kafka.serde.Serde
import zio.stream.ZStream
import zio.{ZIO, ZIOAppDefault, ZLayer, durationInt}

object Main extends ZIOAppDefault {
  private val LOG = LoggerFactory.getLogger(Main.getClass)

  private val TOPIC = "topic1"

  def consumer: ZStream[Consumer, Throwable, Nothing] = Consumer
    .plainStream(Subscription.topics(TOPIC), Serde.string, Serde.string)
    .tap(r =>
      ZIO.succeed(LOG.info(
        s"${r.value} p:${r.offset.partition}, o:${r.offset.offset}"
      ))
    )
    .mapZIO(ZIO.succeed(_).delay(100.millis))
    .map(_.offset)
    .aggregateAsync(Consumer.offsetBatches)
    .tap(offsets =>
      ZIO.succeed(LOG.info(s"Going to commit: ${offsets.offsets.map { case (k, v) => s"${k.partition()}: ${v.offset()}" }}"))
    )
    .mapZIO(_.commit)
    .drain

  def consumerLayer = {
    val value: Map[String, AnyRef] = Map[String, AnyRef](
      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "60000",
      ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG -> "250"
    )
    ZLayer.scoped(
      Consumer.make(
        ConsumerSettings(List("localhost:29092"))
          .withGroupId("tester")
          .withMaxPollRecords(10)
          .withRebalanceSafeCommits(true)
          .withOffsetRetrieval(
            Consumer.OffsetRetrieval.Auto(Consumer.AutoOffsetStrategy.Earliest)
          )
          .withProperties(
            value
          )
          .withRebalanceListener(RebalanceListener.apply(
            (assigned, _) => {
              val text = s"Assigned Partitions: ${assigned.map(_.partition())}"
              ZIO.succeed(LOG.info(text))
            },
            (revoked, _) => {
              val text = s"Revoked Partitions: ${revoked.map(_.partition())}"
              ZIO.succeed(LOG.info(text))
            }
          ))
      )
    )
  }

  override def run = {
    (for {
      _ <- ZIO.succeed(LOG.info(s"Starting"))
      _ <- consumer.runDrain
      _ <- ZIO.succeed(LOG.info("DONE"))
    } yield ())
      .provide(consumerLayer)
  }
}

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions