Skip to content

Commit

Permalink
add more specs (#2374)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Dec 24, 2024
1 parent 57c6c9c commit 8554f00
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

# When Karafka looses a given partition but later gets it back, it should pick it up from the last
# offset committed without any problems

setup_karafka do |config|
config.concurrency = 4
config.kafka[:'transactional.id'] = SecureRandom.uuid
end

class Consumer < Karafka::BaseConsumer
def consume
partition = messages.metadata.partition

transaction do
messages.each do |message|
# We store offsets only until revoked
return unless mark_as_consumed!(message)

DT[partition] << message.offset
end
end

return if @marked

# For partition we have lost this will run twice
DT[:partitions] << partition
@marked = true
end
end

draw_routes do
topic DT.topic do
config(partitions: 2)
consumer Consumer
end
end

Thread.new do
loop do
2.times do
produce(DT.topic, '1', partition: 0)
produce(DT.topic, '1', partition: 1)
end

sleep(0.5)
rescue StandardError
nil
end
end

# We need a second producer to trigger a rebalance
consumer = setup_rdkafka_consumer

other = Thread.new do
sleep(10)

consumer.subscribe(DT.topic)

consumer.each do |message|
DT[:jumped] << message

Karafka.producer.transaction do
Karafka.producer.transaction_mark_as_consumed(
consumer,
message
)
end

break
end

consumer.close
end

start_karafka_and_wait_until do
other.join && DT[:partitions].size >= 3
end

revoked_partition = DT[:jumped].first.partition
jumped_offset = DT[:jumped].first.offset

assert_equal 1, DT[:jumped].size

# This single one should have been consumed by a different process
assert_equal false, DT.data[revoked_partition].include?(jumped_offset)

# We should have all the others in proper order and without any other missing
previous = nil

DT.data[revoked_partition].each do |offset|
unless previous
previous = offset
next
end

# +2 because of the transactional overhead
previous = jumped_offset if previous + 2 == jumped_offset

assert_equal previous + 2, offset

previous = offset
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# frozen_string_literal: true

# When a consumer goes into a non-cooperative-sticky rebalance and gets the partitions back,
# it should not have duplicated data.

setup_karafka do |config|
config.max_messages = 1_000
config.kafka[:'transactional.id'] = SecureRandom.uuid
config.kafka[:'partition.assignment.strategy'] = 'cooperative-sticky'
end

DT[:all] = {}
DT[:data] = {}

class Consumer < Karafka::BaseConsumer
def initialized
@buffer = []
end

def consume
DT[:running] = true

transaction do
messages.each do |message|
DT[:all][partition] ||= []
DT[:all][partition] << message.offset

DT[:data][partition] ||= []
DT[:data][partition] << message.raw_payload.to_i

raise if @buffer.include?(message.offset)

@buffer << message.offset
produce_async(topic: DT.topics[1], payload: message.raw_payload)
end

unless DT.key?(:marked)
mark_as_consumed(messages.last)
DT[:marked] = true
end
end
end
end

draw_routes do
topic DT.topics[0] do
manual_offset_management(true)
consumer Consumer
config(partitions: 2)
end

topic DT.topics[1] do
active false
end
end

Thread.new do
base = -1

loop do
accu = []

100.times { accu << base += 1 }

accu.map!(&:to_s)

produce_many(DT.topic, accu, partition: 0)
produce_many(DT.topic, accu, partition: 1)

sleep(rand)
rescue WaterDrop::Errors::ProducerClosedError, Rdkafka::ClosedProducerError
break
end
end

other = Thread.new do
loop do
consumer = setup_rdkafka_consumer('partition.assignment.strategy': 'cooperative-sticky')
consumer.subscribe(DT.topic)
consumer.each { break }

2.times { consumer.poll(1_000) }

consumer.close

DT[:attempts] << true

break if DT[:attempts].size >= 3
end
end

start_karafka_and_wait_until do
DT[:attempts].size >= 3
end

other.join

# This ensures we do not skip over offsets
# The +1 range is for the transactional marker
DT[:all].each do |partition, offsets|
sorted = offsets.uniq.sort
previous = sorted.first - 1

sorted.each do |offset|
assert(
(offset - previous) <= 2,
[previous, offset, partition]
)

previous = offset
end
end

# This ensures we do not skip over messages
DT[:data].each do |partition, counters|
sorted = counters.uniq.sort
previous = sorted.first - 1

sorted.each do |count|
assert_equal(
previous + 1,
count,
[previous, count, partition]
)

previous = count
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# frozen_string_literal: true

# When a consumer goes into a non-cooperative-sticky rebalance and gets the partitions back,
# it should not have duplicated data.

setup_karafka do |config|
config.max_messages = 1_000
config.kafka[:'transactional.id'] = SecureRandom.uuid
end

DT[:all] = {}
DT[:data] = {}

class Consumer < Karafka::BaseConsumer
def initialized
@buffer = []
end

def consume
DT[:running] = true

transaction do
messages.each do |message|
DT[:all][partition] ||= []
DT[:all][partition] << message.offset

DT[:data][partition] ||= []
DT[:data][partition] << message.raw_payload.to_i

raise if @buffer.include?(message.offset)

@buffer << message.offset
produce_async(topic: DT.topics[1], payload: message.raw_payload)
end

unless DT.key?(:marked)
mark_as_consumed(messages.last)
DT[:marked] = true
end
end
end
end

draw_routes do
topic DT.topics[0] do
manual_offset_management(true)
consumer Consumer
config(partitions: 2)
end

topic DT.topics[1] do
active false
end
end

Thread.new do
base = -1

loop do
accu = []

100.times { accu << base += 1 }

accu.map!(&:to_s)

produce_many(DT.topic, accu, partition: 0)
produce_many(DT.topic, accu, partition: 1)

sleep(rand)
rescue WaterDrop::Errors::ProducerClosedError, Rdkafka::ClosedProducerError
break
end
end

other = Thread.new do
loop do
consumer = setup_rdkafka_consumer
consumer.subscribe(DT.topic)
consumer.each { break }

2.times { consumer.poll(1_000) }

consumer.close

DT[:attempts] << true

break if DT[:attempts].size >= 4
end
end

start_karafka_and_wait_until do
DT[:attempts].size >= 4
end

other.join

# This ensures we do not skip over offsets
# The +1 range is for the transactional marker
DT[:all].each do |partition, offsets|
sorted = offsets.uniq.sort
previous = sorted.first - 1

sorted.each do |offset|
assert(
(offset - previous) <= 2,
[previous, offset, partition]
)

previous = offset
end
end

# This ensures we do not skip over messages
DT[:data].each do |partition, counters|
sorted = counters.uniq.sort
previous = sorted.first - 1

sorted.each do |count|
assert_equal(
previous + 1,
count,
[previous, count, partition]
)

previous = count
end
end

0 comments on commit 8554f00

Please sign in to comment.