Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Redpanda Migrator components #3026

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

mihaitodor
Copy link
Collaborator

@mihaitodor mihaitodor commented Nov 21, 2024

I hijacked this PR to address several issues:

Added

  • New redpanda_migrator_offsets input.
  • Fields offset_topic, offset_group, offset_partition, offset_commit_timestamp and offset_metadata added to the redpanda_migrator_offsets output.
  • Fields kafka_key and max_in_flight for the redpanda_migrator_offsets output are now deprecated.
  • Fields batching for the redpanda_migrator output is now deprecated.
  • Field topic_lag_refresh_period added to the redpanda and redpanda_common inputs.
  • Metric redpanda_lag now emitted by the redpanda and redpanda_common inputs.
  • Metadata kafka_lag now emitted by the redpanda and redpanda_common inputs.

Fixed

  • The redpanda_migrator_bundle output now skips schema ID translation when translate_schema_ids: false and schema_registry is configured.
  • The redpanda_migrator output no longer rejects messages if it can't perform schema ID translation.
  • The redpanda_migrator input no longer converts the kafka key to string.

Changed

  • The kafka_key and max_in_flight fields of the redpanda_migrator_offsets output are now deprecated.
  • Fields batch_size and multi_header for the redpanda_migrator input are now deprecated.
    • The redpanda_migrator_bundle input and output now set labels for their subcomponents.
  • The redpanda_migrator input no longer emits tombstone messages.

Redpanda Migrator offset metadata

One quick way to test this is via the following config. Note how I overwrite kafka_offset_metadata to foobar in a mapping processor.

input:
  redpanda_migrator_bundle:
    redpanda_migrator:
      seed_brokers: [ "localhost:9092" ]
      topics:
        - '^[^_]' # Skip internal topics which start with `_`
      regexp_topics: true
      consumer_group: migrator_bundle
      start_from_oldest: true
      replication_factor_override: true
      replication_factor: -1

    schema_registry:
      url: http://localhost:8081
      include_deleted: true
      subject_filter: ""

output:
  processors:
    - switch:
        - check: metadata("input_label") == "redpanda_migrator_offsets_input"
          processors:
            - mapping: |
                meta kafka_offset_metadata = "foobar"
  redpanda_migrator_bundle:
    redpanda_migrator:
      seed_brokers: [ "localhost:9093" ]
      max_in_flight: 1
      replication_factor_override: true
      replication_factor: -1

    schema_registry:
      url: http://localhost:8082

@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from 34421d0 to 081592f Compare November 21, 2024 02:46
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 12 times, most recently from a86bdbd to 72237c4 Compare December 12, 2024 01:18
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 5 times, most recently from d37239f to 784ff42 Compare December 16, 2024 11:16
@mihaitodor mihaitodor changed the title Add Redpanda Migrator offset metadata Refactor Redpanda Migrator components Dec 16, 2024
@mihaitodor mihaitodor marked this pull request as ready for review December 16, 2024 11:21
log: res.Logger(),
shutSig: shutdown.NewSignaller(),
clientOpts: optsFn,
topicLagGauge: res.Metrics().NewGauge("redpanda_lag", "topic", "partition"),
Copy link
Collaborator Author

@mihaitodor mihaitodor Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I added the redpanda_migrator input, I had both this gauge and the kafka_lag metadata field. I don't know if we want any of these available by default. Also, should this gauge name be somehow derived from the actual input type (redpanda, redpanda_common, redpanda_migrator, redpanda_migrator_offsets)? It does get the label of the input if set, so maybe that's sufficient.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the label is enough. Do we really want this lag metric for all these inputs? Probably I would assume...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it's a bit overkill and I don't recall now which conversation led to this pattern. I also emit the kafka_lag metadata field with each message, so one could add a metric processor in the pipeline which creates a gauge for topics as needed. One downside with this approach is if messages stop flowing completely, then this gauge wouldn't get any updates. I think the main idea was to make it easier for people to discover this metric, but it's not clear what the perf impact might be if we consume from thousands of topics, each having multiple partitions. Should I remove it? (cc @Jeffail)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having the metric emitted here, it's relatively cheap, and extracting from meta is awkward enough no one is going to do it willingly.

@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from 784ff42 to 642fd09 Compare December 16, 2024 11:43
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 6 times, most recently from 34c5d16 to 5749553 Compare December 31, 2024 14:13
@mihaitodor mihaitodor requested review from Jeffail and removed request for Jeffail December 31, 2024 14:50
@@ -76,18 +105,25 @@ type FranzReaderOrdered struct {

// NewFranzReaderOrderedFromConfig attempts to instantiate a new
// FranzReaderOrdered reader from a parsed config.
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error)) (*FranzReaderOrdered, error) {
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, clientOptsFn clientOptsFn, recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn closeHookFn) (*FranzReaderOrdered, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor got a bit messy to use... It can be hard to tell which of these funcs is set to nil at the call site and one can easily mix them up. I'm thinking to maybe introduce functional options for it or maybe a struct which contain all the parameters. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I'd go with something like a FranzReaderHooks type, where you can do stuff like NewFranzReaderHooks().WithRecordToMessageFn(foo).WithCloseHookFn(bar).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks! I was able to rework the reader to avoid introducing hooks, but might end up using this for the writer. Will think about it, but I'm pretty OK with how it looks now.

@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from bcfeae2 to e0838a5 Compare January 3, 2025 18:29
@@ -114,8 +121,13 @@ func (f *FranzReaderOrdered) recordsToBatch(records []*kgo.Record) *batchWithRec
var length uint64
var batch service.MessageBatch
for _, r := range records {
record, err := f.recordToMessageFn(r)
if err != nil {
f.log.Debugf("Failed to convert kafka record to message: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dropping records - that seems bad right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're intentionally dropping tombstone records. Otherwise, kafka.FranzRecordToMessageV1() doesn't return any errors, but yeah, it's easy to misuse. Alternatively, I could set an error on the message, but I'm not aware of any other input which does that and, if the idea is to drop those messages anyway, it would be wasteful to need an extra processor for that. Not sure what's the best approach here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change the signature of the record to message function so that it's clearer messages are dropped in this case, so have it return (*service.Message, bool), where nil, false is an explicit instruction to discard the record as if it didn't exist.

internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
}

// Consume messages from the `__consumer_offsets` topic
d.Topics = []string{`__consumer_offsets`}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for Redpanda Serverless, which doesn't support this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a ticket for that and for now it's still something which needs to be discussed. I don't know what we can do there, but both the redpanda_migrator_offsets input and output don't currently work with RP Serverless.

seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
topics: [ "foobar" ]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this foobar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the topic filter since the redpanda_migrator_offsets input reads messages from __consumer_offsets and we need to drop updates about topics which we're not interested in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we don't want a filter by default? Or this should be configurable? I don't understand why this is hardcoded the way it is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered on Slack. TL; DR: This is the test section of the template where I'm asserting that the topic filter for redpanda_migrator_offsets is set to a specific topic. We don't want to send consumer group updates for all topics to the redpanda_migrator_offsets output.

@@ -80,7 +99,7 @@ type FranzReaderOrdered struct {

// NewFranzReaderOrderedFromConfig attempts to instantiate a new
// FranzReaderOrdered reader from a parsed config.
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn RecordToMessageFn) (*FranzReaderOrdered, error) {
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn func(res *service.Resources)) (*FranzReaderOrdered, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really loving all these extra hooks into this method... It would be great to wrap this method/reader instead (ala decorator pattern style) - can we do that? I think the net result would be cleaner.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I hate it too (see above), but I wasn't sure how to go about it without making this PR huge.

Which of the following would you prefer:

  • Have exported fields on the reader and writer and set them after calling the NewFranzReaderOrderedFromConfig() and NewFranzWriterFromConfig() constructors
  • Use the functional options pattern
  • Pass a struct which contains all these parameters to the constructors
  • Something else
    I guess you're thinking of the first approach which I haven't considered initially and the functional options approach feels overkill. Embedding the FranzReaderOrdered and FranzWriter structs in new structs and overriding the relevant methods would be nice, but then I have to introduce some more exported helper methods... Let me know if you'd like me to explore this approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was thinking as a diff you can apply on top of this PR: https://gist.github.com/rockwotj/48bddf4557a5210d6fa415a1cc736090. It requires exporting the kgo.Client, but I think that's an okay thing. I haven't seen any hooks yet that fundamentally cannot be implemented in a wrapper layer (decorator pattern).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool! Indeed, that does do the trick for the reader. I didn't think I can leverage this pattern initially, but, with enough patience, it does work out nicely :)

For the writer I'll have to do some refactoring to get rid of those hooks, but not sure if there's a clean way to avoid using a hook in WriteBatch unless I add a separate API method that's called in WriteBatch and then I can override that one. I'll think about it.

internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
internal/impl/kafka/franz_writer.go Outdated Show resolved Hide resolved
@mihaitodor mihaitodor requested a review from rockwotj January 5, 2025 21:23
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 2 times, most recently from 7888a52 to 9466860 Compare January 13, 2025 03:23
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
- New `redpanda_migrator_offsets` input
- Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output

Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
This is required in order to pull in twmb/franz-go#838

This is needed because the `redpanda_migrator` input needs to
create all the matched topics during the first call to
`ReadBatch()`.

Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
- Move OnConnect topic creation logic to the output to avoid the
circular dependency between the input and output (the input
doesn't need to know about the output)
- Clean up error handling

Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Mihai Todor <todormihai@gmail.com>
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from 9466860 to 959a5db Compare January 13, 2025 03:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants