-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add mechanism to manage offsets through KafkaConnector and KafkaMirrorMaker2 resources #10563
Conversation
b493fcb
to
50dbb76
Compare
KafkaConnector and KafkaMirrorMaker2 resources Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
50dbb76
to
fa913c5
Compare
/azp run regression |
Azure Pipelines successfully started running 1 pipeline(s). |
private Map<String, Object> additionalProperties; | ||
|
||
@Description("Reference to the ConfigMap where the list of offsets will be written to.") | ||
@JsonProperty(value = "toConfigMap", required = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you just name it toConfigMap
/ getToConfigMap
/ setToConfigMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to provide a more descriptive method name just to make the code more readable, happy to change it though if you think it's better to stay closer to the actual property name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally find it much better when the names follow the API names. It makes it easiest to align the Java and the CRD models, it makes lot less confusion when checking some YAML against the Java code etc. I also think we usually don't map the property like this unless it conflicts with some Java keywords. So I personally would change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I personally would be also fine if we change the API against the proposal and name the field simply configMap
. It might make the name less weird and possibly allow to reuse the same class for both listing and setting the offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've switched it to setToConfigMap
and getToConfigMap
api/src/main/java/io/strimzi/api/kafka/model/connector/AlterOffsets.java
Outdated
Show resolved
Hide resolved
api/src/main/java/io/strimzi/api/kafka/model/common/ResourceReference.java
Outdated
Show resolved
Hide resolved
*/ | ||
package io.strimzi.api.kafka.model.connector; | ||
|
||
public enum KafkaConnectorOffsetsAnnotation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I can accept that we have the annotations enum in the api
. But it probably should have only the valid values and not none
or unknown
, or? Why is that needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added them to use when processing the annotation in the code, but in reality I'm only using none
as the default when fetching the annotation so I'll remove unknown
. I placed this class in api
as I figured it's part of the API, but it's actually only used from the cluster-operator
module, so should I actually move it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if it should be in the api
. I'm personally not a big fan of it. But we already do have KafkaRebalanceAnnotation
as an example of this being in the api
module before. So I felt like I should not insist on this being moved (funny enough, it also includes the same options you have there which I did not notice yesterday 🙄).
I think the logic of having it in the API is that users might use it to set the annotation programmatically. But then it would be better if it contained the real values that the user is expected to use and not the none
and unknown
values.
So, I don't know ... I would do it differently. But it does not feel fair to insist you change it if you are happy with it 🤷.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point Jakub instead. I was thinking about the same (while working on auto-rebalancing) but while i.e. refresh, approve, stop are needed by the users, the none and unknown are there just as internal values for the rebalance logic. Maybe Kate could address this here (now) and we should do the same on the rebalance operator side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ppatierno I've removed unknown
since I don't use that internally. When you say "Maybe Kate could address this here (now)" do you mean moving the class to cluster-operator
module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unknown
was removed but none
is still there which is not a valid annotation, it's just the absence of it.
IIRC the idea of having these annotation in the api module was because of providing enums to a user who is developing an automation on handling a KafkaRebalance
resource, so instead of writing a string, they can use our enum. Of course, having none
(and unknown
) brings confusion. So or we put these none
somewhere else leaving the enum in the api module or we just remove it from the api module (and we should do the same for the rebalancing operator) leaving the user to use the api module but then just strings when it comes to apply these annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved it to io.strimzi.operator.cluster.operator.resource
in the cluster-operator
module, if you're both happy with that I'll mark this thread as resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the whole annotation class? Would io.strimzi.operator.cluster.model
be better? Or at least io.strimzi.operator.cluster.operator.assembly
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree on the model package.
cluster-operator/src/main/java/io/strimzi/operator/cluster/model/ConfigMapUtils.java
Outdated
Show resolved
Hide resolved
abstract KafkaConnectorOffsetsAnnotation getConnectorOffsetsAnnotation(CustomResource resource, String connectorName); | ||
|
||
/** | ||
* Patches the custom resource to remove the connector-offsets annotation | ||
* | ||
* @param reconciliation Reconciliation marker | ||
* @param resource Custom resource from which the annotation should be removed | ||
* | ||
* @return Future that indicates the operation completion | ||
*/ | ||
@SuppressWarnings({ "rawtypes" }) | ||
abstract Future<Void> removeConnectorOffsetsAnnotation(Reconciliation reconciliation, CustomResource resource); | ||
|
||
/** | ||
* Returns the key to use for either writing connector offsets to a ConfigMap or fetching connector offsets | ||
* from a ConfigMap. | ||
* | ||
* @param connectorName Name of the connector that is being managed. | ||
* | ||
* @return The String to use when interacting with ConfigMap resources. | ||
*/ | ||
abstract String getConnectorOffsetsConfigMapEntryKey(String connectorName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The names with Annotation
in singular sound strange if you then actually use multiple of them. Maybe we can try to find some more neutral names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about getConnectorOffsetsAnnotation
-> getConnectorOffsetsOperation
or getConnectorOffsetsAction
?
And removeConnectorOffsetsAnnotation
-> `removeConnectorOffsetsAnnotations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could even rename the enum KafkaConnectorOffsetsAnnotation
to KafkaConnectorOffsetsOperation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think calling it operation or action would make it a bit more clear. Not sure I would change the KafkaConnectorOffsetsAnnotation
name as that is the enum with the possible values, or? So that makes sense to and with Annotation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed them
...tor/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java
Outdated
Show resolved
Hide resolved
...tor/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java
Outdated
Show resolved
Hide resolved
...tor/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java
Outdated
Show resolved
Hide resolved
...tor/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a SME but the code looks good apart from Jakub's comments. Thanks.
Moreover, I was thinking about STs we should write to cover such feature. The first one (basic one), which came to my mind was to test all operations (i.e., list, alter and reset offsets). One test for Connector and one for MM2. WDYT?
if (!(path instanceof String state)) { | ||
return Future.failedFuture("JSON response lacked $.connector.state"); | ||
} | ||
if (!"STOPPED".equals(state)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some Enum
instead of this magic constant? I assume you can use io.strimzi.api.kafka.model.common.ConnectorState
? :) The same applies in previous parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky part here is the ConnectorState
enum values are based on the CRD field which is lowercase, whereas the state returned from the Connect API is upper case. @scholzj @mimaison what do you think of adding a fromStatus
method to io.strimzi.api.kafka.model.common.ConnectorState
that we can call directly here to convert the state to the ConnectorState
enum? I noticed that the code styles dislikes using toUpperCase
and toLowerCase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky part here is the ConnectorState enum values are based on the CRD field which is lowercase, whereas the state returned from the Connect API is upper case. @scholzj @mimaison what do you think of adding a fromStatus method to io.strimzi.api.kafka.model.common.ConnectorState that we can call directly here to convert the state to the ConnectorState enum?
I'm not sure I would add special method like that to the api
module. But maybe you can simply convert it to lowercase in the forValue
method? We do that in other places ... ProcessRoles
, NodeAddressType
etc.
I noticed that the code styles dislikes using toUpperCase and toLowerCase
You should be able to use them, but you should specify the locale:
toLowerCase(Locale.ENGLISH)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I misunderstood the code style error then, I've updated it to do the toLowerCase
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
I forgot to build and push the Helm changes from switching to |
...io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorMockTest.java
Outdated
Show resolved
Hide resolved
operator-common/src/main/java/io/strimzi/operator/common/Annotations.java
Outdated
Show resolved
Hide resolved
...tor/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java
Outdated
Show resolved
Hide resolved
...io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorMockTest.java
Show resolved
Hide resolved
api/src/main/java/io/strimzi/api/kafka/model/connector/AbstractConnectorSpec.java
Outdated
Show resolved
Hide resolved
api/src/main/java/io/strimzi/api/kafka/model/connector/AbstractConnectorSpec.java
Outdated
Show resolved
Hide resolved
api/src/main/java/io/strimzi/api/kafka/model/connector/AlterOffsets.java
Show resolved
Hide resolved
*/ | ||
package io.strimzi.api.kafka.model.connector; | ||
|
||
public enum KafkaConnectorOffsetsAnnotation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point Jakub instead. I was thinking about the same (while working on auto-rebalancing) but while i.e. refresh, approve, stop are needed by the users, the none and unknown are there just as internal values for the rebalance logic. Maybe Kate could address this here (now) and we should do the same on the rebalance operator side.
.compose(offsets -> generateListOffsetsConfigMap(configMapName, connectorName, resource, offsets)) | ||
.compose(configMap -> configMapOperations.reconcile(reconciliation, resource.getMetadata().getNamespace(), configMapName, configMap)) | ||
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource)) | ||
.compose(v -> Future.succeededFuture(conditions), throwable -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be onSuccess
and onFailure
be used to make it more readable? No need to change just an idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using onSuccess
and onFailure
doesn't work because in the failure case we still want to return a succeeded future, so we need to pass a mapper, whereas onFailure
takes a handler, so the method would still return a failed future. Similarly for onSuccess
we have to map the result, not just register a handler.
I could change it to use map
and otherwise
?
return apiClient.getConnectorOffsets(reconciliation, host, port, connectorName)
.compose(offsets -> generateListOffsetsConfigMap(configMapName, connectorName, resource, offsets))
.compose(configMap -> configMapOperations.reconcile(reconciliation, resource.getMetadata().getNamespace(), configMapName, configMap))
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
.map(v -> conditions)
.otherwise(throwable -> {
// Don't fail reconciliation on error from listing offsets - add a warning and repeat list on next reconcile
String message = "Encountered error listing connector offsets. " + throwable.getMessage();
LOGGER.warnCr(reconciliation, message);
conditions.add(StatusUtils.buildWarningCondition("ListOffsets", message));
return conditions;
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it, I think it's more readable but I will leave the final call to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated it
.compose(v -> getOffsetsForAlterRequest(configMapNamespace, configMapName, getConnectorOffsetsConfigMapEntryKey(connectorName))) | ||
.compose(offsets -> apiClient.alterConnectorOffsets(reconciliation, host, port, connectorName, offsets)) | ||
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource)) | ||
.compose(v -> Future.succeededFuture(conditions), throwable -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto as above.
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
…odel Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
/azp run regression |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks Kate! 👍
Thanks for the PR @katheris |
Type of change
Select the type of your PR
Description
Add mechanism to manage offsets through KafkaConnector and KafkaMirrorMaker2 resources.
Users can now list, alter and reset offsets for their connectors using CRDs.
This is done via annotations to trigger the action and ConfigMaps for the list output and alter input.
This PR implements the agreed approaches in proposals 076 and 083
Checklist
Please go through this checklist and make sure all applicable tasks have been done