Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16505: Fix lost source raw key and value in store caches and buffers #18739

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

loicgreffier
Copy link
Contributor

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community streams and removed triage PRs from the community labels Jan 29, 2025
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loicgreffier for the PR!

You did not consider buffers in this PR as I described in my comment. Could you come up with a test that confirms that we also have the same issue with buffers and then provide a fix?

@@ -259,6 +259,10 @@ public <K, V> void send(final String topic,

final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

// As many records could be in-flight,
// freeing raw records in the context to reduce memory pressure
freeContext(context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method is a bit misleading. It basically frees the raw record within the context, not the whole context. What about calling it freeRawInputRecordFromContext()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +84 to +93

@Override
public boolean equals(final Object other) {
return super.equals(other);
}

@Override
public int hashCode() {
return super.hashCode();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we added new rawKey and rawValue attributes, SpotBugs requires to define the equals function (https://spotbugs.readthedocs.io/en/stable/bugDescriptions.html#eq-class-doesn-t-override-equals-in-superclass-eq-doesnt-override-equals)

Comment on lines +53 to +55
this.sourceRawKey = null;
this.sourceRawValue = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to add these info to the serialize() and deserialize() so that the buffer values also get the source record. Here it gets a bit tricky, because you need to consider the case where a serialized record context does not contain the source record because it was written by a version of Streams that has not yet had the source record in the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed having "optional" raw key and value make the deserialization tricky.

Let's say we serialize the ProcessorRecordContext in this order timestamp, offset, topic, partition, headers, rawKey, rawValue. After deserializing the headers, the next bytes can be rawKey and rawValue or can be something else (e.g., priorValue

final byte[] priorValue = getNullableSizePrefixedArray(buffer);
)

Right now I'm considering serializing the rawKey and rawValue at the very end of the ByteBuffer (i.e., right after here:

). Thus, after deserializing all the non-optional fields if there is some bytes remaining in the buffer, it should be the rawKey and rawValue.

@loicgreffier
Copy link
Contributor Author

@cadonna Changes about buffers will be added to this PR. However, despite my tests using suppress(), I did not manage to lose the rawKey and rawValue for now. I'm always receiving a value for these fields in the processingExceptionHandler.

@cadonna
Copy link
Member

cadonna commented Jan 31, 2025

@cadonna Changes about buffers will be added to this PR. However, despite my tests using suppress(), I did not manage to lose the rawKey and rawValue for now. I'm always receiving a value for these fields in the processingExceptionHandler.

I believe, your test needs to flush the buffer so that the records are written to the changelog topic and then restore the buffer from the changelog topic by stopping and re-starting the app.

Useful code:

@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch from f14bd2e to 79f6395 Compare February 1, 2025 23:01
@loicgreffier loicgreffier changed the title KAFKA-16505: Fix source raw key and value in store caches KAFKA-16505: Fix lost source raw key and value in store caches and buffers Feb 1, 2025
@loicgreffier
Copy link
Contributor Author

@cadonna Thank you for the guidance. I could trigger the InMemoryTimeOrderedKeyValueChangeBuffer#flush and InMemoryTimeOrderedKeyValueChangeBuffer#restoreBatch and confirm that the rawKey and rawValue are lost.

Please ignore my previous comment #18739 (comment) that brings too many changes.

I've updated the serialization and deserialization. To take into consideration the optional rawKey and rawValue, I've added a char "marker" (i.e., k for rawKey, v for rawValue) to identify the presence of these values without mixing with any other possible bytes.

Let me know your thoughts about this approach

@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch from 53bd6b9 to 425e638 Compare February 2, 2025 14:47
@loicgreffier loicgreffier force-pushed the KAFKA-16505-RawKeyValue-Store-Cache branch from 630b392 to 93cdf62 Compare February 2, 2025 15:02
@cadonna
Copy link
Member

cadonna commented Feb 5, 2025

@loicgreffier I discussed the need of writing raw key and raw value of the input record to the changelog topic for buffers with @mjsax and we had some concerns. Writing the input record to the changelog topic might significantly increase the storage requirements because we would need to write two records for each record in the buffer, the record itself and the corresponding input record.

@loicgreffier
Copy link
Contributor Author

loicgreffier commented Feb 5, 2025

@cadonna I understand the concern. Should we restart the thread and discuss the possible alternatives?

  • Make the sourceRawKey/sourceRawValue a conscious trade against memory with a new configuration?
  • Drop the sourceRawKey/sourceRawValue and use the current record instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants