-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Redpanda Migrator components #3026
base: main
Are you sure you want to change the base?
Refactor Redpanda Migrator components #3026
Conversation
34421d0
to
081592f
Compare
a86bdbd
to
72237c4
Compare
d37239f
to
784ff42
Compare
log: res.Logger(), | ||
shutSig: shutdown.NewSignaller(), | ||
clientOpts: optsFn, | ||
topicLagGauge: res.Metrics().NewGauge("redpanda_lag", "topic", "partition"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I added the redpanda_migrator
input, I had both this gauge and the kafka_lag
metadata field. I don't know if we want any of these available by default. Also, should this gauge name be somehow derived from the actual input type (redpanda
, redpanda_common
, redpanda_migrator
, redpanda_migrator_offsets
)? It does get the label of the input if set, so maybe that's sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the label is enough. Do we really want this lag metric for all these inputs? Probably I would assume...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think it's a bit overkill and I don't recall now which conversation led to this pattern. I also emit the kafka_lag
metadata field with each message, so one could add a metric
processor in the pipeline which creates a gauge for topics as needed. One downside with this approach is if messages stop flowing completely, then this gauge wouldn't get any updates. I think the main idea was to make it easier for people to discover this metric, but it's not clear what the perf impact might be if we consume from thousands of topics, each having multiple partitions. Should I remove it? (cc @Jeffail)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like having the metric emitted here, it's relatively cheap, and extracting from meta is awkward enough no one is going to do it willingly.
784ff42
to
642fd09
Compare
34c5d16
to
5749553
Compare
@@ -76,18 +105,25 @@ type FranzReaderOrdered struct { | |||
|
|||
// NewFranzReaderOrderedFromConfig attempts to instantiate a new | |||
// FranzReaderOrdered reader from a parsed config. | |||
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error)) (*FranzReaderOrdered, error) { | |||
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, clientOptsFn clientOptsFn, recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn closeHookFn) (*FranzReaderOrdered, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor got a bit messy to use... It can be hard to tell which of these funcs is set to nil at the call site and one can easily mix them up. I'm thinking to maybe introduce functional options for it or maybe a struct which contain all the parameters. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I'd go with something like a FranzReaderHooks
type, where you can do stuff like NewFranzReaderHooks().WithRecordToMessageFn(foo).WithCloseHookFn(bar)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks! I was able to rework the reader to avoid introducing hooks, but might end up using this for the writer. Will think about it, but I'm pretty OK with how it looks now.
bcfeae2
to
e0838a5
Compare
@@ -114,8 +121,13 @@ func (f *FranzReaderOrdered) recordsToBatch(records []*kgo.Record) *batchWithRec | |||
var length uint64 | |||
var batch service.MessageBatch | |||
for _, r := range records { | |||
record, err := f.recordToMessageFn(r) | |||
if err != nil { | |||
f.log.Debugf("Failed to convert kafka record to message: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dropping records - that seems bad right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're intentionally dropping tombstone records. Otherwise, kafka.FranzRecordToMessageV1()
doesn't return any errors, but yeah, it's easy to misuse. Alternatively, I could set an error on the message, but I'm not aware of any other input which does that and, if the idea is to drop those messages anyway, it would be wasteful to need an extra processor for that. Not sure what's the best approach here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change the signature of the record to message function so that it's clearer messages are dropped in this case, so have it return (*service.Message, bool)
, where nil, false
is an explicit instruction to discard the record as if it didn't exist.
} | ||
|
||
// Consume messages from the `__consumer_offsets` topic | ||
d.Topics = []string{`__consumer_offsets`} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the plan for Redpanda Serverless, which doesn't support this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a ticket for that and for now it's still something which needs to be discussed. I don't know what we can do there, but both the redpanda_migrator_offsets
input and output don't currently work with RP Serverless.
seed_brokers: [ "127.0.0.1:9092" ] | ||
topics: [ "__consumer_offsets" ] | ||
topics: [ "foobar" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this foobar
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the topic filter since the redpanda_migrator_offsets
input reads messages from __consumer_offsets
and we need to drop updates about topics which we're not interested in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we don't want a filter by default? Or this should be configurable? I don't understand why this is hardcoded the way it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered on Slack. TL; DR: This is the test section of the template where I'm asserting that the topic filter for redpanda_migrator_offsets
is set to a specific topic. We don't want to send consumer group updates for all topics to the redpanda_migrator_offsets
output.
@@ -80,7 +99,7 @@ type FranzReaderOrdered struct { | |||
|
|||
// NewFranzReaderOrderedFromConfig attempts to instantiate a new | |||
// FranzReaderOrdered reader from a parsed config. | |||
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn RecordToMessageFn) (*FranzReaderOrdered, error) { | |||
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn func(res *service.Resources)) (*FranzReaderOrdered, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really loving all these extra hooks into this method... It would be great to wrap this method/reader instead (ala decorator pattern style) - can we do that? I think the net result would be cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I hate it too (see above), but I wasn't sure how to go about it without making this PR huge.
Which of the following would you prefer:
- Have exported fields on the reader and writer and set them after calling the
NewFranzReaderOrderedFromConfig()
andNewFranzWriterFromConfig()
constructors - Use the functional options pattern
- Pass a struct which contains all these parameters to the constructors
- Something else
I guess you're thinking of the first approach which I haven't considered initially and the functional options approach feels overkill. Embedding theFranzReaderOrdered
andFranzWriter
structs in new structs and overriding the relevant methods would be nice, but then I have to introduce some more exported helper methods... Let me know if you'd like me to explore this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was thinking as a diff you can apply on top of this PR: https://gist.github.com/rockwotj/48bddf4557a5210d6fa415a1cc736090. It requires exporting the kgo.Client, but I think that's an okay thing. I haven't seen any hooks yet that fundamentally cannot be implemented in a wrapper layer (decorator pattern).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool! Indeed, that does do the trick for the reader. I didn't think I can leverage this pattern initially, but, with enough patience, it does work out nicely :)
For the writer I'll have to do some refactoring to get rid of those hooks, but not sure if there's a clean way to avoid using a hook in WriteBatch
unless I add a separate API method that's called in WriteBatch
and then I can override that one. I'll think about it.
internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go
Outdated
Show resolved
Hide resolved
internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go
Outdated
Show resolved
Hide resolved
7888a52
to
9466860
Compare
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
- New `redpanda_migrator_offsets` input - Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
This is required in order to pull in twmb/franz-go#838 This is needed because the `redpanda_migrator` input needs to create all the matched topics during the first call to `ReadBatch()`. Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
- Move OnConnect topic creation logic to the output to avoid the circular dependency between the input and output (the input doesn't need to know about the output) - Clean up error handling Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
9466860
to
959a5db
Compare
I hijacked this PR to address several issues:
Added
redpanda_migrator_offsets
input.offset_topic
,offset_group
,offset_partition
,offset_commit_timestamp
andoffset_metadata
added to theredpanda_migrator_offsets
output.kafka_key
andmax_in_flight
for theredpanda_migrator_offsets
output are now deprecated.batching
for theredpanda_migrator
output is now deprecated.topic_lag_refresh_period
added to theredpanda
andredpanda_common
inputs.redpanda_lag
now emitted by theredpanda
andredpanda_common
inputs.kafka_lag
now emitted by theredpanda
andredpanda_common
inputs.Fixed
redpanda_migrator_bundle
output now skips schema ID translation whentranslate_schema_ids: false
andschema_registry
is configured.redpanda_migrator
output no longer rejects messages if it can't perform schema ID translation.redpanda_migrator
input no longer converts the kafka key to string.Changed
kafka_key
andmax_in_flight
fields of theredpanda_migrator_offsets
output are now deprecated.batch_size
andmulti_header
for theredpanda_migrator
input are now deprecated.redpanda_migrator_bundle
input and output now set labels for their subcomponents.redpanda_migrator
input no longer emits tombstone messages.Redpanda Migrator offset metadata
One quick way to test this is via the following config. Note how I overwrite
kafka_offset_metadata
tofoobar
in amapping
processor.