-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16505: Fix lost source raw key and value in store caches and buffers #18739
base: trunk
Are you sure you want to change the base?
KAFKA-16505: Fix lost source raw key and value in store caches and buffers #18739
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @loicgreffier for the PR!
You did not consider buffers in this PR as I described in my comment. Could you come up with a test that confirms that we also have the same issue with buffers and then provide a fix?
@@ -259,6 +259,10 @@ public <K, V> void send(final String topic, | |||
|
|||
final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers); | |||
|
|||
// As many records could be in-flight, | |||
// freeing raw records in the context to reduce memory pressure | |||
freeContext(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this method is a bit misleading. It basically frees the raw record within the context, not the whole context. What about calling it freeRawInputRecordFromContext()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
@Override | ||
public boolean equals(final Object other) { | ||
return super.equals(other); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return super.hashCode(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are those needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we added new rawKey
and rawValue
attributes, SpotBugs requires to define the equals
function (https://spotbugs.readthedocs.io/en/stable/bugDescriptions.html#eq-class-doesn-t-override-equals-in-superclass-eq-doesnt-override-equals)
this.sourceRawKey = null; | ||
this.sourceRawValue = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also need to add these info to the serialize()
and deserialize()
so that the buffer values also get the source record. Here it gets a bit tricky, because you need to consider the case where a serialized record context does not contain the source record because it was written by a version of Streams that has not yet had the source record in the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed having "optional" raw key and value make the deserialization tricky.
Let's say we serialize the ProcessorRecordContext
in this order timestamp, offset, topic, partition, headers, rawKey, rawValue
. After deserializing the headers, the next bytes can be rawKey
and rawValue
or can be something else (e.g., priorValue
kafka/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
Line 73 in d7a5b87
final byte[] priorValue = getNullableSizePrefixedArray(buffer); |
Right now I'm considering serializing the rawKey
and rawValue
at the very end of the ByteBuffer
(i.e., right after here:
kafka/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
Line 119 in d7a5b87
addValue(buffer, newValue); |
rawKey
and rawValue
.
@cadonna Changes about buffers will be added to this PR. However, despite my tests using |
I believe, your test needs to flush the buffer so that the records are written to the changelog topic and then restore the buffer from the changelog topic by stopping and re-starting the app. Useful code:
|
f14bd2e
to
79f6395
Compare
@cadonna Thank you for the guidance. I could trigger the Please ignore my previous comment #18739 (comment) that brings too many changes. I've updated the serialization and deserialization. To take into consideration the optional rawKey and rawValue, I've added a char "marker" (i.e., Let me know your thoughts about this approach |
53bd6b9
to
425e638
Compare
630b392
to
93cdf62
Compare
@loicgreffier I discussed the need of writing raw key and raw value of the input record to the changelog topic for buffers with @mjsax and we had some concerns. Writing the input record to the changelog topic might significantly increase the storage requirements because we would need to write two records for each record in the buffer, the record itself and the corresponding input record. |
@cadonna I understand the concern. Should we restart the thread and discuss the possible alternatives?
|
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)